설명 없음
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

NativeQueue.cs 24KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
  1. using System;
  2. using System.Runtime.InteropServices;
  3. using System.Threading;
  4. using Unity.Collections.LowLevel.Unsafe;
  5. using Unity.Burst;
  6. using Unity.Jobs;
  7. using Unity.Jobs.LowLevel.Unsafe;
  8. using System.Diagnostics;
  9. namespace Unity.Collections
  10. {
  11. unsafe struct NativeQueueBlockHeader
  12. {
  13. public NativeQueueBlockHeader* m_NextBlock;
  14. public int m_NumItems;
  15. }
  16. [StructLayout(LayoutKind.Sequential)]
  17. [BurstCompatible]
  18. internal unsafe struct NativeQueueBlockPoolData
  19. {
  20. internal IntPtr m_FirstBlock;
  21. internal int m_NumBlocks;
  22. internal int m_MaxBlocks;
  23. internal const int m_BlockSize = 16 * 1024;
  24. internal int m_AllocLock;
  25. public NativeQueueBlockHeader* AllocateBlock()
  26. {
  27. // There can only ever be a single thread allocating an entry from the free list since it needs to
  28. // access the content of the block (the next pointer) before doing the CAS.
  29. // If there was no lock thread A could read the next pointer, thread B could quickly allocate
  30. // the same block then free it with another next pointer before thread A performs the CAS which
  31. // leads to an invalid free list potentially causing memory corruption.
  32. // Having multiple threads freeing data concurrently to each other while another thread is allocating
  33. // is no problems since there is only ever a single thread modifying global data in that case.
  34. while (Interlocked.CompareExchange(ref m_AllocLock, 1, 0) != 0)
  35. {
  36. }
  37. NativeQueueBlockHeader* checkBlock = (NativeQueueBlockHeader*)m_FirstBlock;
  38. NativeQueueBlockHeader* block;
  39. do
  40. {
  41. block = checkBlock;
  42. if (block == null)
  43. {
  44. Interlocked.Exchange(ref m_AllocLock, 0);
  45. Interlocked.Increment(ref m_NumBlocks);
  46. block = (NativeQueueBlockHeader*)Memory.Unmanaged.Allocate(m_BlockSize, 16, Allocator.Persistent);
  47. return block;
  48. }
  49. checkBlock = (NativeQueueBlockHeader*)Interlocked.CompareExchange(ref m_FirstBlock, (IntPtr)block->m_NextBlock, (IntPtr)block);
  50. }
  51. while (checkBlock != block);
  52. Interlocked.Exchange(ref m_AllocLock, 0);
  53. return block;
  54. }
  55. public void FreeBlock(NativeQueueBlockHeader* block)
  56. {
  57. if (m_NumBlocks > m_MaxBlocks)
  58. {
  59. if (Interlocked.Decrement(ref m_NumBlocks) + 1 > m_MaxBlocks)
  60. {
  61. Memory.Unmanaged.Free(block, Allocator.Persistent);
  62. return;
  63. }
  64. Interlocked.Increment(ref m_NumBlocks);
  65. }
  66. NativeQueueBlockHeader* checkBlock = (NativeQueueBlockHeader*)m_FirstBlock;
  67. NativeQueueBlockHeader* nextPtr;
  68. do
  69. {
  70. nextPtr = checkBlock;
  71. block->m_NextBlock = checkBlock;
  72. checkBlock = (NativeQueueBlockHeader*)Interlocked.CompareExchange(ref m_FirstBlock, (IntPtr)block, (IntPtr)checkBlock);
  73. }
  74. while (checkBlock != nextPtr);
  75. }
  76. }
  77. internal unsafe class NativeQueueBlockPool
  78. {
  79. static readonly SharedStatic<IntPtr> Data = SharedStatic<IntPtr>.GetOrCreate<NativeQueueBlockPool>();
  80. internal static NativeQueueBlockPoolData* GetQueueBlockPool()
  81. {
  82. var pData = (NativeQueueBlockPoolData**)Data.UnsafeDataPointer;
  83. var data = *pData;
  84. if (data == null)
  85. {
  86. data = (NativeQueueBlockPoolData*)Memory.Unmanaged.Allocate(UnsafeUtility.SizeOf<NativeQueueBlockPoolData>(), 8, Allocator.Persistent);
  87. *pData = data;
  88. data->m_NumBlocks = data->m_MaxBlocks = 256;
  89. data->m_AllocLock = 0;
  90. // Allocate MaxBlocks items
  91. NativeQueueBlockHeader* prev = null;
  92. for (int i = 0; i < data->m_MaxBlocks; ++i)
  93. {
  94. NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)Memory.Unmanaged.Allocate(NativeQueueBlockPoolData.m_BlockSize, 16, Allocator.Persistent);
  95. block->m_NextBlock = prev;
  96. prev = block;
  97. }
  98. data->m_FirstBlock = (IntPtr)prev;
  99. AppDomainOnDomainUnload();
  100. }
  101. return data;
  102. }
  103. [BurstDiscard]
  104. static void AppDomainOnDomainUnload()
  105. {
  106. #if !UNITY_DOTSRUNTIME
  107. AppDomain.CurrentDomain.DomainUnload += OnDomainUnload;
  108. #endif
  109. }
  110. #if !UNITY_DOTSRUNTIME
  111. static void OnDomainUnload(object sender, EventArgs e)
  112. {
  113. var pData = (NativeQueueBlockPoolData**)Data.UnsafeDataPointer;
  114. var data = *pData;
  115. while (data->m_FirstBlock != IntPtr.Zero)
  116. {
  117. NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)data->m_FirstBlock;
  118. data->m_FirstBlock = (IntPtr)block->m_NextBlock;
  119. Memory.Unmanaged.Free(block, Allocator.Persistent);
  120. --data->m_NumBlocks;
  121. }
  122. Memory.Unmanaged.Free(data, Allocator.Persistent);
  123. *pData = null;
  124. }
  125. #endif
  126. }
  127. [StructLayout(LayoutKind.Sequential)]
  128. [BurstCompatible]
  129. internal unsafe struct NativeQueueData
  130. {
  131. public IntPtr m_FirstBlock;
  132. public IntPtr m_LastBlock;
  133. public int m_MaxItems;
  134. public int m_CurrentRead;
  135. public byte* m_CurrentWriteBlockTLS;
  136. internal NativeQueueBlockHeader* GetCurrentWriteBlockTLS(int threadIndex)
  137. {
  138. var data = (NativeQueueBlockHeader**)&m_CurrentWriteBlockTLS[threadIndex * JobsUtility.CacheLineSize];
  139. return *data;
  140. }
  141. internal void SetCurrentWriteBlockTLS(int threadIndex, NativeQueueBlockHeader* currentWriteBlock)
  142. {
  143. var data = (NativeQueueBlockHeader**)&m_CurrentWriteBlockTLS[threadIndex * JobsUtility.CacheLineSize];
  144. *data = currentWriteBlock;
  145. }
  146. [BurstCompatible(GenericTypeArguments = new [] { typeof(int) })]
  147. public static NativeQueueBlockHeader* AllocateWriteBlockMT<T>(NativeQueueData* data, NativeQueueBlockPoolData* pool, int threadIndex) where T : struct
  148. {
  149. NativeQueueBlockHeader* currentWriteBlock = data->GetCurrentWriteBlockTLS(threadIndex);
  150. if (currentWriteBlock != null
  151. && currentWriteBlock->m_NumItems == data->m_MaxItems)
  152. {
  153. currentWriteBlock = null;
  154. }
  155. if (currentWriteBlock == null)
  156. {
  157. currentWriteBlock = pool->AllocateBlock();
  158. currentWriteBlock->m_NextBlock = null;
  159. currentWriteBlock->m_NumItems = 0;
  160. NativeQueueBlockHeader* prevLast = (NativeQueueBlockHeader*)Interlocked.Exchange(ref data->m_LastBlock, (IntPtr)currentWriteBlock);
  161. if (prevLast == null)
  162. {
  163. data->m_FirstBlock = (IntPtr)currentWriteBlock;
  164. }
  165. else
  166. {
  167. prevLast->m_NextBlock = currentWriteBlock;
  168. }
  169. data->SetCurrentWriteBlockTLS(threadIndex, currentWriteBlock);
  170. }
  171. return currentWriteBlock;
  172. }
  173. [BurstCompatible(GenericTypeArguments = new [] { typeof(int) })]
  174. public unsafe static void AllocateQueue<T>(AllocatorManager.AllocatorHandle label, out NativeQueueData* outBuf) where T : struct
  175. {
  176. var queueDataSize = CollectionHelper.Align(UnsafeUtility.SizeOf<NativeQueueData>(), JobsUtility.CacheLineSize);
  177. var data = (NativeQueueData*)Memory.Unmanaged.Allocate(
  178. queueDataSize
  179. + JobsUtility.CacheLineSize * JobsUtility.MaxJobThreadCount
  180. , JobsUtility.CacheLineSize
  181. , label
  182. );
  183. data->m_CurrentWriteBlockTLS = (((byte*)data) + queueDataSize);
  184. data->m_FirstBlock = IntPtr.Zero;
  185. data->m_LastBlock = IntPtr.Zero;
  186. data->m_MaxItems = (NativeQueueBlockPoolData.m_BlockSize - UnsafeUtility.SizeOf<NativeQueueBlockHeader>()) / UnsafeUtility.SizeOf<T>();
  187. data->m_CurrentRead = 0;
  188. for (int threadIndex = 0; threadIndex < JobsUtility.MaxJobThreadCount; ++threadIndex)
  189. {
  190. data->SetCurrentWriteBlockTLS(threadIndex, null);
  191. }
  192. outBuf = data;
  193. }
  194. public unsafe static void DeallocateQueue(NativeQueueData* data, NativeQueueBlockPoolData* pool, AllocatorManager.AllocatorHandle allocation)
  195. {
  196. NativeQueueBlockHeader* firstBlock = (NativeQueueBlockHeader*)data->m_FirstBlock;
  197. while (firstBlock != null)
  198. {
  199. NativeQueueBlockHeader* next = firstBlock->m_NextBlock;
  200. pool->FreeBlock(firstBlock);
  201. firstBlock = next;
  202. }
  203. Memory.Unmanaged.Free(data, allocation);
  204. }
  205. }
  206. /// <summary>
  207. /// An unmanaged queue.
  208. /// </summary>
  209. /// <typeparam name="T">The type of the elements.</typeparam>
  210. [StructLayout(LayoutKind.Sequential)]
  211. [NativeContainer]
  212. [BurstCompatible(GenericTypeArguments = new [] { typeof(int) })]
  213. public unsafe struct NativeQueue<T>
  214. : INativeDisposable
  215. where T : struct
  216. {
  217. [NativeDisableUnsafePtrRestriction]
  218. NativeQueueData* m_Buffer;
  219. [NativeDisableUnsafePtrRestriction]
  220. NativeQueueBlockPoolData* m_QueuePool;
  221. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  222. AtomicSafetyHandle m_Safety;
  223. static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<NativeQueue<T>>();
  224. #if REMOVE_DISPOSE_SENTINEL
  225. #else
  226. [NativeSetClassTypeToNullOnSchedule]
  227. DisposeSentinel m_DisposeSentinel;
  228. #endif
  229. #endif
  230. AllocatorManager.AllocatorHandle m_AllocatorLabel;
  231. /// <summary>
  232. /// Initializes and returns an instance of NativeQueue.
  233. /// </summary>
  234. /// <param name="allocator">The allocator to use.</param>
  235. public NativeQueue(AllocatorManager.AllocatorHandle allocator)
  236. {
  237. CollectionHelper.CheckIsUnmanaged<T>();
  238. m_QueuePool = NativeQueueBlockPool.GetQueueBlockPool();
  239. m_AllocatorLabel = allocator;
  240. NativeQueueData.AllocateQueue<T>(allocator, out m_Buffer);
  241. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  242. #if REMOVE_DISPOSE_SENTINEL
  243. m_Safety = CollectionHelper.CreateSafetyHandle(allocator);
  244. #else
  245. if (allocator.IsCustomAllocator)
  246. {
  247. m_Safety = AtomicSafetyHandle.Create();
  248. m_DisposeSentinel = null;
  249. }
  250. else
  251. {
  252. DisposeSentinel.Create(out m_Safety, out m_DisposeSentinel, 0, allocator.ToAllocator);
  253. }
  254. #endif
  255. CollectionHelper.SetStaticSafetyId<NativeQueue<T>>(ref m_Safety, ref s_staticSafetyId.Data);
  256. #endif
  257. }
  258. /// <summary>
  259. /// Returns true if this queue is empty.
  260. /// </summary>
  261. /// <value>True if this queue has no items or if the queue has not been constructed.</value>
  262. public bool IsEmpty()
  263. {
  264. if (!IsCreated)
  265. {
  266. return true;
  267. }
  268. CheckRead();
  269. int count = 0;
  270. var currentRead = m_Buffer->m_CurrentRead;
  271. for (NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock
  272. ; block != null
  273. ; block = block->m_NextBlock
  274. )
  275. {
  276. count += block->m_NumItems;
  277. if (count > currentRead)
  278. {
  279. return false;
  280. }
  281. }
  282. return count == currentRead;
  283. }
  284. /// <summary>
  285. /// Returns the current number of elements in this queue.
  286. /// </summary>
  287. /// <remarks>Note that getting the count requires traversing the queue's internal linked list of blocks.
  288. /// Where possible, cache this value instead of reading the property repeatedly.</remarks>
  289. /// <returns>The current number of elements in this queue.</returns>
  290. public int Count
  291. {
  292. get
  293. {
  294. CheckRead();
  295. int count = 0;
  296. for (NativeQueueBlockHeader* block = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock
  297. ; block != null
  298. ; block = block->m_NextBlock
  299. )
  300. {
  301. count += block->m_NumItems;
  302. }
  303. return count - m_Buffer->m_CurrentRead;
  304. }
  305. }
  306. internal static int PersistentMemoryBlockCount
  307. {
  308. get { return NativeQueueBlockPool.GetQueueBlockPool()->m_MaxBlocks; }
  309. set { Interlocked.Exchange(ref NativeQueueBlockPool.GetQueueBlockPool()->m_MaxBlocks, value); }
  310. }
  311. internal static int MemoryBlockSize
  312. {
  313. get { return NativeQueueBlockPoolData.m_BlockSize; }
  314. }
  315. /// <summary>
  316. /// Returns the element at the end of this queue without removing it.
  317. /// </summary>
  318. /// <returns>The element at the end of this queue.</returns>
  319. public T Peek()
  320. {
  321. CheckReadNotEmpty();
  322. NativeQueueBlockHeader* firstBlock = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
  323. return UnsafeUtility.ReadArrayElement<T>(firstBlock + 1, m_Buffer->m_CurrentRead);
  324. }
  325. /// <summary>
  326. /// Adds an element at the front of this queue.
  327. /// </summary>
  328. /// <param name="value">The value to be enqueued.</param>
  329. public void Enqueue(T value)
  330. {
  331. CheckWrite();
  332. NativeQueueBlockHeader* writeBlock = NativeQueueData.AllocateWriteBlockMT<T>(m_Buffer, m_QueuePool, 0);
  333. UnsafeUtility.WriteArrayElement(writeBlock + 1, writeBlock->m_NumItems, value);
  334. ++writeBlock->m_NumItems;
  335. }
  336. /// <summary>
  337. /// Removes and returns the element at the end of this queue.
  338. /// </summary>
  339. /// <exception cref="InvalidOperationException">Thrown if this queue is empty.</exception>
  340. /// <returns>The element at the end of this queue.</returns>
  341. public T Dequeue()
  342. {
  343. if (!TryDequeue(out T item))
  344. {
  345. ThrowEmpty();
  346. }
  347. return item;
  348. }
  349. /// <summary>
  350. /// Removes and outputs the element at the end of this queue.
  351. /// </summary>
  352. /// <param name="item">Outputs the removed element.</param>
  353. /// <returns>True if this queue was not empty.</returns>
  354. public bool TryDequeue(out T item)
  355. {
  356. CheckWrite();
  357. NativeQueueBlockHeader* firstBlock = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
  358. if (firstBlock == null)
  359. {
  360. item = default(T);
  361. return false;
  362. }
  363. var currentRead = m_Buffer->m_CurrentRead++;
  364. item = UnsafeUtility.ReadArrayElement<T>(firstBlock + 1, currentRead);
  365. if (m_Buffer->m_CurrentRead >= firstBlock->m_NumItems)
  366. {
  367. m_Buffer->m_CurrentRead = 0;
  368. m_Buffer->m_FirstBlock = (IntPtr)firstBlock->m_NextBlock;
  369. if (m_Buffer->m_FirstBlock == IntPtr.Zero)
  370. {
  371. m_Buffer->m_LastBlock = IntPtr.Zero;
  372. }
  373. for (int threadIndex = 0; threadIndex < JobsUtility.MaxJobThreadCount; ++threadIndex)
  374. {
  375. if (m_Buffer->GetCurrentWriteBlockTLS(threadIndex) == firstBlock)
  376. {
  377. m_Buffer->SetCurrentWriteBlockTLS(threadIndex, null);
  378. }
  379. }
  380. m_QueuePool->FreeBlock(firstBlock);
  381. }
  382. return true;
  383. }
  384. /// <summary>
  385. /// Returns an array containing a copy of this queue's content.
  386. /// </summary>
  387. /// <param name="allocator">The allocator to use.</param>
  388. /// <returns>An array containing a copy of this queue's content. The elements are ordered in the same order they were
  389. /// enqueued, *e.g.* the earliest enqueued element is copied to index 0 of the array.</returns>
  390. public NativeArray<T> ToArray(AllocatorManager.AllocatorHandle allocator)
  391. {
  392. CheckRead();
  393. NativeQueueBlockHeader* firstBlock = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
  394. var outputArray = CollectionHelper.CreateNativeArray<T>(Count, allocator);
  395. NativeQueueBlockHeader* currentBlock = firstBlock;
  396. var arrayPtr = (byte*)outputArray.GetUnsafePtr();
  397. int size = UnsafeUtility.SizeOf<T>();
  398. int dstOffset = 0;
  399. int srcOffset = m_Buffer->m_CurrentRead * size;
  400. int srcOffsetElements = m_Buffer->m_CurrentRead;
  401. while (currentBlock != null)
  402. {
  403. int bytesToCopy = (currentBlock->m_NumItems - srcOffsetElements) * size;
  404. UnsafeUtility.MemCpy(arrayPtr + dstOffset, (byte*)(currentBlock + 1) + srcOffset, bytesToCopy);
  405. srcOffset = srcOffsetElements = 0;
  406. dstOffset += bytesToCopy;
  407. currentBlock = currentBlock->m_NextBlock;
  408. }
  409. return outputArray;
  410. }
  411. /// <summary>
  412. /// Removes all elements of this queue.
  413. /// </summary>
  414. /// <remarks>Does not change the capacity.</remarks>
  415. public void Clear()
  416. {
  417. CheckWrite();
  418. NativeQueueBlockHeader* firstBlock = (NativeQueueBlockHeader*)m_Buffer->m_FirstBlock;
  419. while (firstBlock != null)
  420. {
  421. NativeQueueBlockHeader* next = firstBlock->m_NextBlock;
  422. m_QueuePool->FreeBlock(firstBlock);
  423. firstBlock = next;
  424. }
  425. m_Buffer->m_FirstBlock = IntPtr.Zero;
  426. m_Buffer->m_LastBlock = IntPtr.Zero;
  427. m_Buffer->m_CurrentRead = 0;
  428. for (int threadIndex = 0; threadIndex < JobsUtility.MaxJobThreadCount; ++threadIndex)
  429. {
  430. m_Buffer->SetCurrentWriteBlockTLS(threadIndex, null);
  431. }
  432. }
  433. /// <summary>
  434. /// Whether this queue has been allocated (and not yet deallocated).
  435. /// </summary>
  436. /// <value>True if this queue has been allocated (and not yet deallocated).</value>
  437. public bool IsCreated => m_Buffer != null;
  438. /// <summary>
  439. /// Releases all resources (memory and safety handles).
  440. /// </summary>
  441. public void Dispose()
  442. {
  443. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  444. #if REMOVE_DISPOSE_SENTINEL
  445. CollectionHelper.DisposeSafetyHandle(ref m_Safety);
  446. #else
  447. DisposeSentinel.Dispose(ref m_Safety, ref m_DisposeSentinel);
  448. #endif
  449. #endif
  450. NativeQueueData.DeallocateQueue(m_Buffer, m_QueuePool, m_AllocatorLabel);
  451. m_Buffer = null;
  452. }
  453. /// <summary>
  454. /// Creates and schedules a job that releases all resources (memory and safety handles) of this queue.
  455. /// </summary>
  456. /// <param name="inputDeps">The dependency for the new job.</param>
  457. /// <returns>The handle of the new job. The job depends upon `inputDeps` and releases all resources (memory and safety handles) of this queue.</returns>
  458. [NotBurstCompatible /* This is not burst compatible because of IJob's use of a static IntPtr. Should switch to IJobBurstSchedulable in the future */]
  459. public JobHandle Dispose(JobHandle inputDeps)
  460. {
  461. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  462. #if REMOVE_DISPOSE_SENTINEL
  463. #else
  464. // [DeallocateOnJobCompletion] is not supported, but we want the deallocation
  465. // to happen in a thread. DisposeSentinel needs to be cleared on main thread.
  466. // AtomicSafetyHandle can be destroyed after the job was scheduled (Job scheduling
  467. // will check that no jobs are writing to the container).
  468. DisposeSentinel.Clear(ref m_DisposeSentinel);
  469. #endif
  470. var jobHandle = new NativeQueueDisposeJob { Data = new NativeQueueDispose { m_Buffer = m_Buffer, m_QueuePool = m_QueuePool, m_AllocatorLabel = m_AllocatorLabel, m_Safety = m_Safety } }.Schedule(inputDeps);
  471. AtomicSafetyHandle.Release(m_Safety);
  472. #else
  473. var jobHandle = new NativeQueueDisposeJob { Data = new NativeQueueDispose { m_Buffer = m_Buffer, m_QueuePool = m_QueuePool, m_AllocatorLabel = m_AllocatorLabel } }.Schedule(inputDeps);
  474. #endif
  475. m_Buffer = null;
  476. return jobHandle;
  477. }
  478. /// <summary>
  479. /// Returns a parallel writer for this queue.
  480. /// </summary>
  481. /// <returns>A parallel writer for this queue.</returns>
  482. public ParallelWriter AsParallelWriter()
  483. {
  484. ParallelWriter writer;
  485. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  486. writer.m_Safety = m_Safety;
  487. CollectionHelper.SetStaticSafetyId<ParallelWriter>(ref writer.m_Safety, ref ParallelWriter.s_staticSafetyId.Data);
  488. #endif
  489. writer.m_Buffer = m_Buffer;
  490. writer.m_QueuePool = m_QueuePool;
  491. writer.m_ThreadIndex = 0;
  492. return writer;
  493. }
  494. /// <summary>
  495. /// A parallel writer for a NativeQueue.
  496. /// </summary>
  497. /// <remarks>
  498. /// Use <see cref="AsParallelWriter"/> to create a parallel writer for a NativeQueue.
  499. /// </remarks>
  500. [NativeContainer]
  501. [NativeContainerIsAtomicWriteOnly]
  502. [BurstCompatible(GenericTypeArguments = new [] { typeof(int) })]
  503. public unsafe struct ParallelWriter
  504. {
  505. [NativeDisableUnsafePtrRestriction]
  506. internal NativeQueueData* m_Buffer;
  507. [NativeDisableUnsafePtrRestriction]
  508. internal NativeQueueBlockPoolData* m_QueuePool;
  509. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  510. internal AtomicSafetyHandle m_Safety;
  511. internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<ParallelWriter>();
  512. #endif
  513. [NativeSetThreadIndex]
  514. internal int m_ThreadIndex;
  515. /// <summary>
  516. /// Adds an element at the front of the queue.
  517. /// </summary>
  518. /// <param name="value">The value to be enqueued.</param>
  519. public void Enqueue(T value)
  520. {
  521. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  522. AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
  523. #endif
  524. NativeQueueBlockHeader* writeBlock = NativeQueueData.AllocateWriteBlockMT<T>(m_Buffer, m_QueuePool, m_ThreadIndex);
  525. UnsafeUtility.WriteArrayElement(writeBlock + 1, writeBlock->m_NumItems, value);
  526. ++writeBlock->m_NumItems;
  527. }
  528. }
  529. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
  530. void CheckRead()
  531. {
  532. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  533. AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
  534. #endif
  535. }
  536. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
  537. void CheckReadNotEmpty()
  538. {
  539. CheckRead();
  540. if (m_Buffer->m_FirstBlock == (IntPtr)0)
  541. {
  542. ThrowEmpty();
  543. }
  544. }
  545. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
  546. void CheckWrite()
  547. {
  548. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  549. AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
  550. #endif
  551. }
  552. [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
  553. static void ThrowEmpty()
  554. {
  555. throw new InvalidOperationException("Trying to read from an empty queue.");
  556. }
  557. }
  558. [NativeContainer]
  559. [BurstCompatible]
  560. internal unsafe struct NativeQueueDispose
  561. {
  562. [NativeDisableUnsafePtrRestriction]
  563. internal NativeQueueData* m_Buffer;
  564. [NativeDisableUnsafePtrRestriction]
  565. internal NativeQueueBlockPoolData* m_QueuePool;
  566. internal AllocatorManager.AllocatorHandle m_AllocatorLabel;
  567. #if ENABLE_UNITY_COLLECTIONS_CHECKS
  568. internal AtomicSafetyHandle m_Safety;
  569. #endif
  570. public void Dispose()
  571. {
  572. NativeQueueData.DeallocateQueue(m_Buffer, m_QueuePool, m_AllocatorLabel);
  573. }
  574. }
  575. [BurstCompile]
  576. struct NativeQueueDisposeJob : IJob
  577. {
  578. public NativeQueueDispose Data;
  579. public void Execute()
  580. {
  581. Data.Dispose();
  582. }
  583. }
  584. }