No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

Barrier.h 4.4KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #pragma once
  2. #include "Atomic.h"
  3. #include "Semaphore.h"
  4. namespace baselib
  5. {
  6. BASELIB_CPP_INTERFACE
  7. {
  8. // In parallel computing, a barrier is a type of synchronization
  9. // method. A barrier for a group of threads or processes in the source
  10. // code means any thread/process must stop at this point and cannot
  11. // proceed until all other threads/processes reach this barrier.
  12. //
  13. // "Barrier (computer science)", Wikipedia: The Free Encyclopedia
  14. // https://en.wikipedia.org/wiki/Barrier_(computer_science)
  15. //
  16. // For optimal performance, baselib::Barrier should be stored at a
  17. // cache aligned memory location.
  18. class Barrier
  19. {
  20. public:
  21. // non-copyable
  22. Barrier(const Barrier& other) = delete;
  23. Barrier& operator=(const Barrier& other) = delete;
  24. // non-movable (strictly speaking not needed but listed to signal intent)
  25. Barrier(Barrier&& other) = delete;
  26. Barrier& operator=(Barrier&& other) = delete;
  27. // Creates a barrier with a set number of threads to synchronize.
  28. // Once a set of threads enter a Barrier, the *same* set of threads
  29. // must continue to use the Barrier - i.e. no additional threads may
  30. // enter any of the Acquires. For example, it is *not* allowed to
  31. // create a Barrier with threads_num=10, then send 30 threads into
  32. // barrier.Acquire() with the expectation 3 batches of 10 will be
  33. // released. However, once it is guaranteed that all threads have
  34. // exited all of the Acquire invocations, it is okay to reuse the
  35. // same barrier object with a different set of threads - for
  36. // example, after Join() has been called on all participating
  37. // threads and a new batch of threads is launched.
  38. //
  39. // \param threads_num Wait for this number of threads before letting all proceed.
  40. explicit Barrier(uint16_t threads_num)
  41. : m_threshold(threads_num), m_count(0)
  42. {
  43. }
  44. // Block the current thread until the specified number of threads
  45. // also reach this `Acquire()`.
  46. void Acquire()
  47. {
  48. // If there is two Barrier::Acquire calls in a row, when the
  49. // first Acquire releases, one thread may jump out of the gate
  50. // so fast that it reaches the next Acquire and steals *another*
  51. // semaphore slot, continuing past the *second* Acquire, before
  52. // all threads have even left the first Acquire. So, we instead
  53. // construct two semaphores and alternate between them to
  54. // prevent this.
  55. uint16_t previous_value = m_count.fetch_add(1, memory_order_relaxed);
  56. BaselibAssert(previous_value < m_threshold * 2);
  57. // If count is in range [0, m_threshold), use semaphore A.
  58. // If count is in range [m_threshold, m_threshold * 2), use semaphore B.
  59. bool useSemaphoreB = previous_value >= m_threshold;
  60. Semaphore& semaphore = useSemaphoreB ? m_semaphoreB : m_semaphoreA;
  61. // If (count % m_threshold) == (m_threshold - 1), then we're the last thread in the group, release the semaphore.
  62. bool do_release = previous_value % m_threshold == m_threshold - 1;
  63. if (do_release)
  64. {
  65. if (previous_value == m_threshold * 2 - 1)
  66. {
  67. // Note this needs to happen before the Release to avoid
  68. // a race condition (if this thread yields right before
  69. // the Release, but after the add, the invariant of
  70. // previous_value < m_threshold * 2 may break for
  71. // another thread)
  72. m_count.fetch_sub(m_threshold * 2, memory_order_relaxed);
  73. }
  74. semaphore.Release(m_threshold - 1);
  75. }
  76. else
  77. {
  78. semaphore.Acquire();
  79. }
  80. }
  81. private:
  82. Semaphore m_semaphoreA;
  83. Semaphore m_semaphoreB;
  84. uint16_t m_threshold;
  85. atomic<uint16_t> m_count;
  86. };
  87. }
  88. }