暫無描述
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

NativeStream.cs 33KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. using System;
  2. using System.Diagnostics;
  3. using Unity.Collections.LowLevel.Unsafe;
  4. using Unity.Burst;
  5. using Unity.Jobs;
  6. using UnityEngine.Assertions;
  7. using System.Runtime.CompilerServices;
  8. namespace Unity.Collections
  9. {
  10. /// <summary>
  11. /// A set of untyped, append-only buffers. Allows for concurrent reading and concurrent writing without synchronization.
  12. /// </summary>
  13. /// <remarks>
  14. /// As long as each individual buffer is written in one thread and read in one thread, multiple
  15. /// threads can read and write the stream concurrently, *e.g.*
  16. /// while thread *A* reads from buffer *X* of a stream, thread *B* can read from
  17. /// buffer *Y* of the same stream.
  18. ///
  19. /// Each buffer is stored as a chain of blocks. When a write exceeds a buffer's current capacity, another block
  20. /// is allocated and added to the end of the chain. Effectively, expanding the buffer never requires copying the existing
  21. /// data (unlike with <see cref="NativeList{T}"/>, for example).
  22. ///
  23. /// **All writing to a stream should be completed before the stream is first read. Do not write to a stream after the first read.**
  24. /// Violating these rules won't *necessarily* cause any problems, but they are the intended usage pattern.
  25. ///
  26. /// Writing is done with <see cref="NativeStream.Writer"/>, and reading is done with <see cref="NativeStream.Reader"/>.
  27. /// An individual reader or writer cannot be used concurrently across threads: each thread must use its own.
  28. ///
  29. /// The data written to an individual buffer can be heterogeneous in type, and the data written
  30. /// to different buffers of a stream can be entirely different in type, number, and order. Just make sure
  31. /// that the code reading from a particular buffer knows what to expect to read from it.
  32. /// </remarks>
  33. [NativeContainer]
  34. [GenerateTestsForBurstCompatibility]
  35. public unsafe struct NativeStream : INativeDisposable
  36. {
  37. UnsafeStream m_Stream;
  38. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  39. AtomicSafetyHandle m_Safety;
  40. internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<NativeStream>();
  41. #endif
  42. /// <summary>
  43. /// Initializes and returns an instance of NativeStream.
  44. /// </summary>
  45. /// <param name="bufferCount">The number of buffers to give the stream. You usually want
  46. /// one buffer for each thread that will read or write the stream.</param>
  47. /// <param name="allocator">The allocator to use.</param>
  48. public NativeStream(int bufferCount, AllocatorManager.AllocatorHandle allocator)
  49. {
  50. AllocateBlock(out this, allocator);
  51. m_Stream.AllocateForEach(bufferCount);
  52. }
  53. /// <summary>
  54. /// Creates and schedules a job to allocate a new stream.
  55. /// </summary>
  56. /// <remarks>The stream can be used on the main thread after completing the returned job or used in other jobs that depend upon the returned job.
  57. ///
  58. /// Using a job to allocate the buffers can be more efficient, particularly for a stream with many buffers.
  59. /// </remarks>
  60. /// <typeparam name="T">Ignored.</typeparam>
  61. /// <param name="stream">Outputs the new stream.</param>
  62. /// <param name="bufferCount">A list whose length determines the number of buffers in the stream.</param>
  63. /// <param name="dependency">A job handle. The new job will depend upon this handle.</param>
  64. /// <param name="allocator">The allocator to use.</param>
  65. /// <returns>The handle of the new job.</returns>
  66. [GenerateTestsForBurstCompatibility(GenericTypeArguments = new[] { typeof(int) })]
  67. public static JobHandle ScheduleConstruct<T>(out NativeStream stream, NativeList<T> bufferCount, JobHandle dependency, AllocatorManager.AllocatorHandle allocator)
  68. where T : unmanaged
  69. {
  70. AllocateBlock(out stream, allocator);
  71. var jobData = new ConstructJobList { List = (UntypedUnsafeList*)bufferCount.GetUnsafeList(), Container = stream };
  72. return jobData.Schedule(dependency);
  73. }
  74. /// <summary>
  75. /// Creates and schedules a job to allocate a new stream.
  76. /// </summary>
  77. /// <remarks>The stream can be used...
  78. /// - after completing the returned job
  79. /// - or in other jobs that depend upon the returned job.
  80. ///
  81. /// Allocating the buffers in a job can be more efficient, particularly for a stream with many buffers.
  82. /// </remarks>
  83. /// <param name="stream">Outputs the new stream.</param>
  84. /// <param name="bufferCount">An array whose value at index 0 determines the number of buffers in the stream.</param>
  85. /// <param name="dependency">A job handle. The new job will depend upon this handle.</param>
  86. /// <param name="allocator">The allocator to use.</param>
  87. /// <returns>The handle of the new job.</returns>
  88. public static JobHandle ScheduleConstruct(out NativeStream stream, NativeArray<int> bufferCount, JobHandle dependency, AllocatorManager.AllocatorHandle allocator)
  89. {
  90. AllocateBlock(out stream, allocator);
  91. var jobData = new ConstructJob { Length = bufferCount, Container = stream };
  92. return jobData.Schedule(dependency);
  93. }
  94. /// <summary>
  95. /// Returns true if this stream is empty.
  96. /// </summary>
  97. /// <returns>True if this stream is empty or the stream has not been constructed.</returns>
  98. public readonly bool IsEmpty()
  99. {
  100. CheckRead();
  101. return m_Stream.IsEmpty();
  102. }
  103. /// <summary>
  104. /// Whether this stream has been allocated (and not yet deallocated).
  105. /// </summary>
  106. /// <remarks>Does not necessarily reflect whether the buffers of the stream have themselves been allocated.</remarks>
  107. /// <value>True if this stream has been allocated (and not yet deallocated).</value>
  108. public readonly bool IsCreated
  109. {
  110. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  111. get => m_Stream.IsCreated;
  112. }
  113. /// <summary>
  114. /// The number of buffers in this stream.
  115. /// </summary>
  116. /// <value>The number of buffers in this stream.</value>
  117. public readonly int ForEachCount
  118. {
  119. get
  120. {
  121. CheckRead();
  122. return m_Stream.ForEachCount;
  123. }
  124. }
  125. /// <summary>
  126. /// Returns a reader of this stream.
  127. /// </summary>
  128. /// <returns>A reader of this stream.</returns>
  129. public Reader AsReader()
  130. {
  131. return new Reader(ref this);
  132. }
  133. /// <summary>
  134. /// Returns a writer of this stream.
  135. /// </summary>
  136. /// <returns>A writer of this stream.</returns>
  137. public Writer AsWriter()
  138. {
  139. return new Writer(ref this);
  140. }
  141. /// <summary>
  142. /// Returns the total number of items in the buffers of this stream.
  143. /// </summary>
  144. /// <remarks>Each <see cref="Writer.Write{T}"/> and <see cref="Writer.Allocate"/> call increments this number.</remarks>
  145. /// <returns>The total number of items in the buffers of this stream.</returns>
  146. public int Count()
  147. {
  148. CheckRead();
  149. return m_Stream.Count();
  150. }
  151. /// <summary>
  152. /// Returns a new NativeArray copy of this stream's data.
  153. /// </summary>
  154. /// <remarks>The length of the array will equal the count of this stream.
  155. ///
  156. /// Each buffer of this stream is copied to the array, one after the other.
  157. /// </remarks>
  158. /// <typeparam name="T">The type of values in the array.</typeparam>
  159. /// <param name="allocator">The allocator to use.</param>
  160. /// <returns>A new NativeArray copy of this stream's data.</returns>
  161. [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
  162. public NativeArray<T> ToNativeArray<T>(AllocatorManager.AllocatorHandle allocator) where T : unmanaged
  163. {
  164. CheckRead();
  165. return m_Stream.ToNativeArray<T>(allocator);
  166. }
  167. /// <summary>
  168. /// Releases all resources (memory and safety handles).
  169. /// </summary>
  170. public void Dispose()
  171. {
  172. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  173. if (!AtomicSafetyHandle.IsDefaultValue(m_Safety))
  174. {
  175. AtomicSafetyHandle.CheckExistsAndThrow(m_Safety);
  176. }
  177. #endif
  178. if (!IsCreated)
  179. {
  180. return;
  181. }
  182. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  183. CollectionHelper.DisposeSafetyHandle(ref m_Safety);
  184. #endif
  185. m_Stream.Dispose();
  186. }
  187. /// <summary>
  188. /// Creates and schedules a job that will release all resources (memory and safety handles) of this stream.
  189. /// </summary>
  190. /// <param name="inputDeps">A job handle which the newly scheduled job will depend upon.</param>
  191. /// <returns>The handle of a new job that will release all resources (memory and safety handles) of this stream.</returns>
  192. public JobHandle Dispose(JobHandle inputDeps)
  193. {
  194. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  195. if (!AtomicSafetyHandle.IsDefaultValue(m_Safety))
  196. {
  197. AtomicSafetyHandle.CheckExistsAndThrow(m_Safety);
  198. }
  199. #endif
  200. if (!IsCreated)
  201. {
  202. return inputDeps;
  203. }
  204. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  205. var jobHandle = new NativeStreamDisposeJob { Data = new NativeStreamDispose { m_StreamData = m_Stream, m_Safety = m_Safety } }.Schedule(inputDeps);
  206. AtomicSafetyHandle.Release(m_Safety);
  207. #else
  208. var jobHandle = new NativeStreamDisposeJob { Data = new NativeStreamDispose { m_StreamData = m_Stream } }.Schedule(inputDeps);
  209. #endif
  210. m_Stream = default;
  211. return jobHandle;
  212. }
  213. [BurstCompile]
  214. struct ConstructJobList : IJob
  215. {
  216. public NativeStream Container;
  217. [ReadOnly]
  218. [NativeDisableUnsafePtrRestriction]
  219. public UntypedUnsafeList* List;
  220. public void Execute()
  221. {
  222. Container.AllocateForEach(List->m_length);
  223. }
  224. }
  225. [BurstCompile]
  226. struct ConstructJob : IJob
  227. {
  228. public NativeStream Container;
  229. [ReadOnly]
  230. public NativeArray<int> Length;
  231. public void Execute()
  232. {
  233. Container.AllocateForEach(Length[0]);
  234. }
  235. }
  236. static void AllocateBlock(out NativeStream stream, AllocatorManager.AllocatorHandle allocator)
  237. {
  238. CollectionHelper.CheckAllocator(allocator);
  239. UnsafeStream.AllocateBlock(out stream.m_Stream, allocator);
  240. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  241. stream.m_Safety = CollectionHelper.CreateSafetyHandle(allocator);
  242. CollectionHelper.SetStaticSafetyId(ref stream.m_Safety, ref s_staticSafetyId.Data, "Unity.Collections.NativeStream");
  243. #endif
  244. }
  245. void AllocateForEach(int forEachCount)
  246. {
  247. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  248. CheckForEachCountGreaterThanZero(forEachCount);
  249. var blockData = (UnsafeStreamBlockData*)m_Stream.m_BlockData.Range.Pointer;
  250. var ranges = (UnsafeStreamRange*)blockData->Ranges.Range.Pointer;
  251. Assert.IsTrue(ranges == null);
  252. Assert.AreEqual(0, blockData->RangeCount);
  253. Assert.AreNotEqual(0, blockData->BlockCount);
  254. #endif
  255. m_Stream.AllocateForEach(forEachCount);
  256. }
  257. /// <summary>
  258. /// Writes data into a buffer of a <see cref="NativeStream"/>.
  259. /// </summary>
  260. /// <remarks>An individual writer can only be used for one buffer of one stream.
  261. /// Do not create more than one writer for an individual buffer.</remarks>
  262. [NativeContainer]
  263. [NativeContainerSupportsMinMaxWriteRestriction]
  264. [GenerateTestsForBurstCompatibility]
  265. public unsafe struct Writer
  266. {
  267. UnsafeStream.Writer m_Writer;
  268. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  269. AtomicSafetyHandle m_Safety;
  270. internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<Writer>();
  271. #pragma warning disable CS0414 // warning CS0414: The field 'NativeStream.Writer.m_Length' is assigned but its value is never used
  272. int m_Length;
  273. #pragma warning restore CS0414
  274. int m_MinIndex;
  275. int m_MaxIndex;
  276. #endif
  277. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  278. [NativeDisableUnsafePtrRestriction]
  279. void* m_PassByRefCheck;
  280. #endif
  281. internal Writer(ref NativeStream stream)
  282. {
  283. m_Writer = stream.m_Stream.AsWriter();
  284. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  285. m_Safety = stream.m_Safety;
  286. CollectionHelper.SetStaticSafetyId(ref m_Safety, ref s_staticSafetyId.Data, "Unity.Collections.NativeStream.Writer");
  287. m_Length = int.MaxValue;
  288. m_MinIndex = int.MinValue;
  289. m_MaxIndex = int.MinValue;
  290. #endif
  291. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  292. m_PassByRefCheck = null;
  293. #endif
  294. }
  295. /// <summary>
  296. /// The number of buffers in the stream of this writer.
  297. /// </summary>
  298. /// <value>The number of buffers in the stream of this writer.</value>
  299. public int ForEachCount
  300. {
  301. get
  302. {
  303. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  304. AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
  305. #endif
  306. return m_Writer.ForEachCount;
  307. }
  308. }
  309. /// <summary>
  310. /// For internal use only.
  311. /// </summary>
  312. /// <param name="foreEachIndex"></param>
  313. public void PatchMinMaxRange(int foreEachIndex)
  314. {
  315. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  316. m_MinIndex = foreEachIndex;
  317. m_MaxIndex = foreEachIndex;
  318. #endif
  319. }
  320. /// <summary>
  321. /// Readies this writer to write to a particular buffer of the stream.
  322. /// </summary>
  323. /// <remarks>Must be called before using this writer. For an individual writer, call this method only once.
  324. ///
  325. /// After calling BeginForEachIndex on this writer, passing this writer into functions must be passed by reference.
  326. ///
  327. /// When done using this writer, you must call <see cref="EndForEachIndex"/>.</remarks>
  328. /// <param name="foreachIndex">The index of the buffer to write.</param>
  329. public void BeginForEachIndex(int foreachIndex)
  330. {
  331. CheckBeginForEachIndex(foreachIndex);
  332. m_Writer.BeginForEachIndex(foreachIndex);
  333. }
  334. /// <summary>
  335. /// Readies the buffer written by this writer for reading.
  336. /// </summary>
  337. /// <remarks>Must be called before reading the buffer written by this writer.</remarks>
  338. public void EndForEachIndex()
  339. {
  340. CheckEndForEachIndex();
  341. m_Writer.EndForEachIndex();
  342. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  343. m_Writer.m_ForeachIndex = int.MinValue;
  344. #endif
  345. }
  346. /// <summary>
  347. /// Write a value to a buffer.
  348. /// </summary>
  349. /// <remarks>The value is written to the buffer which was specified
  350. /// with <see cref="BeginForEachIndex"/>.
  351. /// </remarks>
  352. /// <typeparam name="T">The type of value to write.</typeparam>
  353. /// <param name="value">The value to write.</param>
  354. /// <exception cref="ArgumentException">Thrown if BeginForEachIndex was not called.</exception>
  355. /// <exception cref="ArgumentException">Thrown when the NativeStream.Writer instance has been passed by value instead of by reference.</exception>
  356. [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
  357. public void Write<T>(T value) where T : unmanaged
  358. {
  359. ref T dst = ref Allocate<T>();
  360. dst = value;
  361. }
  362. /// <summary>
  363. /// Allocate space in a buffer.
  364. /// </summary>
  365. /// <remarks>The space is allocated in the buffer which was specified
  366. /// with <see cref="BeginForEachIndex"/>.
  367. /// </remarks>
  368. /// <typeparam name="T">The type of value to allocate space for.</typeparam>
  369. /// <returns>A reference to the allocation.</returns>
  370. /// <exception cref="ArgumentException">Thrown if BeginForEachIndex was not called.</exception>
  371. /// <exception cref="ArgumentException">Thrown when the NativeStream.Writer instance has been passed by value instead of by reference.</exception>
  372. [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
  373. public ref T Allocate<T>() where T : unmanaged
  374. {
  375. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  376. if (UnsafeUtility.IsNativeContainerType<T>())
  377. AtomicSafetyHandle.SetNestedContainer(m_Safety, true);
  378. #endif
  379. int size = UnsafeUtility.SizeOf<T>();
  380. return ref UnsafeUtility.AsRef<T>(Allocate(size));
  381. }
  382. /// <summary>
  383. /// Allocate space in a buffer.
  384. /// </summary>
  385. /// <remarks>The space is allocated in the buffer which was specified
  386. /// with <see cref="BeginForEachIndex"/>.</remarks>
  387. /// <param name="size">The number of bytes to allocate.</param>
  388. /// <returns>The allocation.</returns>
  389. /// <exception cref="ArgumentException">Thrown if BeginForEachIndex was not called.</exception>
  390. /// <exception cref="ArgumentException">Thrown when the NativeStream.Writer instance has been passed by value instead of by reference.</exception>
  391. public byte* Allocate(int size)
  392. {
  393. CheckAllocateSize(size);
  394. return m_Writer.Allocate(size);
  395. }
  396. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  397. void CheckBeginForEachIndex(int foreachIndex)
  398. {
  399. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  400. AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
  401. #endif
  402. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  403. if (m_PassByRefCheck == null)
  404. {
  405. m_PassByRefCheck = UnsafeUtility.AddressOf(ref this);
  406. }
  407. var blockData = (UnsafeStreamBlockData*)m_Writer.m_BlockData.Range.Pointer;
  408. var ranges = (UnsafeStreamRange*)blockData->Ranges.Range.Pointer;
  409. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  410. if (foreachIndex < m_MinIndex || foreachIndex > m_MaxIndex)
  411. {
  412. // When the code is not running through the job system no ParallelForRange patching will occur
  413. // We can't grab m_BlockStream->RangeCount on creation of the writer because the RangeCount can be initialized
  414. // in a job after creation of the writer
  415. if (m_MinIndex == int.MinValue && m_MaxIndex == int.MinValue)
  416. {
  417. m_MinIndex = 0;
  418. m_MaxIndex = blockData->RangeCount - 1;
  419. }
  420. if (foreachIndex < m_MinIndex || foreachIndex > m_MaxIndex)
  421. {
  422. throw new ArgumentException($"Index {foreachIndex} is out of restricted IJobParallelFor range [{m_MinIndex}...{m_MaxIndex}] in NativeStream.");
  423. }
  424. }
  425. #endif
  426. if (m_Writer.m_ForeachIndex != int.MinValue)
  427. {
  428. throw new ArgumentException($"BeginForEachIndex must always be balanced by a EndForEachIndex call");
  429. }
  430. if (0 != ranges[foreachIndex].ElementCount)
  431. {
  432. throw new ArgumentException($"BeginForEachIndex can only be called once for the same index ({foreachIndex}).");
  433. }
  434. Assert.IsTrue(foreachIndex >= 0 && foreachIndex < blockData->RangeCount);
  435. #endif
  436. }
  437. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  438. void CheckEndForEachIndex()
  439. {
  440. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  441. AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
  442. #endif
  443. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  444. if (m_Writer.m_ForeachIndex == int.MinValue)
  445. {
  446. throw new System.ArgumentException("EndForEachIndex must always be called balanced by a BeginForEachIndex or AppendForEachIndex call");
  447. }
  448. #endif
  449. }
  450. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  451. void CheckAllocateSize(int size)
  452. {
  453. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  454. AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
  455. #endif
  456. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  457. if (m_PassByRefCheck != UnsafeUtility.AddressOf(ref this)
  458. || m_Writer.m_ForeachIndex == int.MinValue)
  459. {
  460. throw new ArgumentException("BeginForEachIndex has not been called on NativeStream.Writer, or NativeStream.Writer is not passed by reference.");
  461. }
  462. if (size > UnsafeStreamBlockData.AllocationSize - sizeof(void*))
  463. {
  464. throw new ArgumentException("Allocation size is too large");
  465. }
  466. #endif
  467. }
  468. }
  469. /// <summary>
  470. /// Reads data from a buffer of a <see cref="NativeStream"/>.
  471. /// </summary>
  472. /// <remarks>An individual reader can only be used for one buffer of one stream.
  473. /// Do not create more than one reader for an individual buffer.</remarks>
  474. [NativeContainer]
  475. [NativeContainerIsReadOnly]
  476. [GenerateTestsForBurstCompatibility]
  477. public unsafe struct Reader
  478. {
  479. UnsafeStream.Reader m_Reader;
  480. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  481. int m_RemainingBlocks;
  482. #endif
  483. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  484. internal AtomicSafetyHandle m_Safety;
  485. internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<Reader>();
  486. #endif
  487. internal Reader(ref NativeStream stream)
  488. {
  489. m_Reader = stream.m_Stream.AsReader();
  490. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  491. m_RemainingBlocks = 0;
  492. #endif
  493. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  494. m_Safety = stream.m_Safety;
  495. CollectionHelper.SetStaticSafetyId(ref m_Safety, ref s_staticSafetyId.Data, "Unity.Collections.NativeStream.Reader");
  496. #endif
  497. }
  498. /// <summary>
  499. /// Readies this reader to read a particular buffer of the stream.
  500. /// </summary>
  501. /// <remarks>Must be called before using this reader. For an individual reader, call this method only once.
  502. ///
  503. /// When done using this reader, you must call <see cref="EndForEachIndex"/>.</remarks>
  504. /// <param name="foreachIndex">The index of the buffer to read.</param>
  505. /// <returns>The number of elements left to read from the buffer.</returns>
  506. public int BeginForEachIndex(int foreachIndex)
  507. {
  508. CheckBeginForEachIndex(foreachIndex);
  509. var remainingItemCount = m_Reader.BeginForEachIndex(foreachIndex);
  510. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  511. var blockData = (UnsafeStreamBlockData*)m_Reader.m_BlockData.Range.Pointer;
  512. var ranges = (UnsafeStreamRange*)blockData->Ranges.Range.Pointer;
  513. m_RemainingBlocks = ranges[foreachIndex].NumberOfBlocks;
  514. if (m_RemainingBlocks == 0)
  515. {
  516. m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + m_Reader.m_LastBlockSize;
  517. }
  518. #endif
  519. return remainingItemCount;
  520. }
  521. /// <summary>
  522. /// Checks if all data has been read from the buffer.
  523. /// </summary>
  524. /// <remarks>If you intentionally don't want to read *all* the data in the buffer, don't call this method.
  525. /// Otherwise, calling this method is recommended, even though it's not strictly necessary.</remarks>
  526. /// <exception cref="ArgumentException">Thrown if not all the buffer's data has been read.</exception>
  527. public void EndForEachIndex()
  528. {
  529. m_Reader.EndForEachIndex();
  530. CheckEndForEachIndex();
  531. }
  532. /// <summary>
  533. /// The number of buffers in the stream of this reader.
  534. /// </summary>
  535. /// <value>The number of buffers in the stream of this reader.</value>
  536. public int ForEachCount
  537. {
  538. get
  539. {
  540. CheckRead();
  541. return m_Reader.ForEachCount;
  542. }
  543. }
  544. /// <summary>
  545. /// The number of items not yet read from the buffer.
  546. /// </summary>
  547. /// <value>The number of items not yet read from the buffer.</value>
  548. public int RemainingItemCount => m_Reader.RemainingItemCount;
  549. /// <summary>
  550. /// Returns a pointer to the next position to read from the buffer. Advances the reader some number of bytes.
  551. /// </summary>
  552. /// <param name="size">The number of bytes to advance the reader.</param>
  553. /// <returns>A pointer to the next position to read from the buffer.</returns>
  554. /// <exception cref="ArgumentException">Thrown if the reader would advance past the end of the buffer.</exception>
  555. public byte* ReadUnsafePtr(int size)
  556. {
  557. CheckReadSize(size);
  558. m_Reader.m_RemainingItemCount--;
  559. byte* ptr = m_Reader.m_CurrentPtr;
  560. m_Reader.m_CurrentPtr += size;
  561. if (m_Reader.m_CurrentPtr > m_Reader.m_CurrentBlockEnd)
  562. {
  563. /*
  564. * On netfw/mono/il2cpp, doing m_CurrentBlock->Data does not throw, because it knows that it can
  565. * just do pointer + 8. On netcore, doing that throws a NullReferenceException. So, first check for
  566. * out of bounds accesses, and only then update m_CurrentBlock and m_CurrentPtr.
  567. */
  568. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  569. m_RemainingBlocks--;
  570. CheckNotReadingOutOfBounds(size);
  571. #endif
  572. m_Reader.m_CurrentBlock = m_Reader.m_CurrentBlock->Next;
  573. m_Reader.m_CurrentPtr = m_Reader.m_CurrentBlock->Data;
  574. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  575. if (m_RemainingBlocks <= 0)
  576. {
  577. m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + m_Reader.m_LastBlockSize;
  578. }
  579. else
  580. {
  581. m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;
  582. }
  583. #else
  584. m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;
  585. #endif
  586. ptr = m_Reader.m_CurrentPtr;
  587. m_Reader.m_CurrentPtr += size;
  588. }
  589. return ptr;
  590. }
  591. /// <summary>
  592. /// Reads the next value from the buffer.
  593. /// </summary>
  594. /// <remarks>Each read advances the reader to the next item in the buffer.</remarks>
  595. /// <typeparam name="T">The type of value to read.</typeparam>
  596. /// <returns>A reference to the next value from the buffer.</returns>
  597. /// <exception cref="ArgumentException">Thrown if the reader would advance past the end of the buffer.</exception>
  598. [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
  599. public ref T Read<T>() where T : unmanaged
  600. {
  601. int size = UnsafeUtility.SizeOf<T>();
  602. return ref UnsafeUtility.AsRef<T>(ReadUnsafePtr(size));
  603. }
  604. /// <summary>
  605. /// Reads the next value from the buffer. Does not advance the reader.
  606. /// </summary>
  607. /// <typeparam name="T">The type of value to read.</typeparam>
  608. /// <returns>A reference to the next value from the buffer.</returns>
  609. /// <exception cref="ArgumentException">Thrown if the read would go past the end of the buffer.</exception>
  610. [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
  611. public ref T Peek<T>() where T : unmanaged
  612. {
  613. int size = UnsafeUtility.SizeOf<T>();
  614. CheckReadSize(size);
  615. return ref m_Reader.Peek<T>();
  616. }
  617. /// <summary>
  618. /// Returns the total number of items in the buffers of the stream.
  619. /// </summary>
  620. /// <returns>The total number of items in the buffers of the stream.</returns>
  621. public int Count()
  622. {
  623. CheckRead();
  624. return m_Reader.Count();
  625. }
  626. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  627. void CheckNotReadingOutOfBounds(int size)
  628. {
  629. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  630. if (m_RemainingBlocks < 0)
  631. throw new System.ArgumentException("Reading out of bounds");
  632. if (m_RemainingBlocks == 0 && size + sizeof(void*) > m_Reader.m_LastBlockSize)
  633. throw new System.ArgumentException("Reading out of bounds");
  634. #endif
  635. }
  636. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
  637. void CheckRead()
  638. {
  639. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  640. AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
  641. #endif
  642. }
  643. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  644. void CheckReadSize(int size)
  645. {
  646. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  647. AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
  648. #endif
  649. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  650. Assert.IsTrue(size <= UnsafeStreamBlockData.AllocationSize - (sizeof(void*)));
  651. if (m_Reader.m_RemainingItemCount < 1)
  652. {
  653. throw new ArgumentException("There are no more items left to be read.");
  654. }
  655. #endif
  656. }
  657. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  658. void CheckBeginForEachIndex(int forEachIndex)
  659. {
  660. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  661. AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
  662. #endif
  663. #if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
  664. var blockData = (UnsafeStreamBlockData*)m_Reader.m_BlockData.Range.Pointer;
  665. if ((uint)forEachIndex >= (uint)blockData->RangeCount)
  666. {
  667. throw new System.ArgumentOutOfRangeException(nameof(forEachIndex), $"foreachIndex: {forEachIndex} must be between 0 and ForEachCount: {blockData->RangeCount}");
  668. }
  669. #endif
  670. }
  671. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  672. void CheckEndForEachIndex()
  673. {
  674. if (m_Reader.m_RemainingItemCount != 0)
  675. {
  676. throw new System.ArgumentException("Not all elements (Count) have been read. If this is intentional, simply skip calling EndForEachIndex();");
  677. }
  678. if (m_Reader.m_CurrentBlockEnd != m_Reader.m_CurrentPtr)
  679. {
  680. throw new System.ArgumentException("Not all data (Data Size) has been read. If this is intentional, simply skip calling EndForEachIndex();");
  681. }
  682. }
  683. }
  684. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
  685. static void CheckForEachCountGreaterThanZero(int forEachCount)
  686. {
  687. if (forEachCount <= 0)
  688. throw new ArgumentException("foreachCount must be > 0", "foreachCount");
  689. }
  690. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
  691. readonly void CheckRead()
  692. {
  693. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  694. AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
  695. #endif
  696. }
  697. }
  698. [NativeContainer]
  699. [GenerateTestsForBurstCompatibility]
  700. internal unsafe struct NativeStreamDispose
  701. {
  702. public UnsafeStream m_StreamData;
  703. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  704. internal AtomicSafetyHandle m_Safety;
  705. #endif
  706. public void Dispose()
  707. {
  708. m_StreamData.Dispose();
  709. }
  710. }
  711. [BurstCompile]
  712. struct NativeStreamDisposeJob : IJob
  713. {
  714. public NativeStreamDispose Data;
  715. public void Execute()
  716. {
  717. Data.Dispose();
  718. }
  719. }
  720. }