diff --git a/autowiring/Parallel.h b/autowiring/Parallel.h index 3fb969639..02910e116 100644 --- a/autowiring/Parallel.h +++ b/autowiring/Parallel.h @@ -183,6 +183,17 @@ class parallel { return parallel_collection { begin(), end() }; } + // Blocks until all outstanding work is done + void barrier(void) { + std::unique_lock lk(m_queueMutex); + m_queueUpdated.wait(lk, [this] { + size_t totalReady = m_nVoidEntries; + for (auto& entry : m_queue) + totalReady += entry.second.size(); + return m_outstandingCount == totalReady; + }); + } + // Get an iterator to the begining of out queue of job results template parallel_iterator begin(void) { @@ -210,6 +221,7 @@ class parallel { // For void entries we don't need a queue, we can just keep a general count of "done" size_t m_nVoidEntries = 0; + // Total number of entries currently outstanding: size_t m_outstandingCount = 0; AutoCurrentContext m_ctxt; diff --git a/src/autowiring/test/ParallelTest.cpp b/src/autowiring/test/ParallelTest.cpp index 6b275a85e..439c01f1b 100644 --- a/src/autowiring/test/ParallelTest.cpp +++ b/src/autowiring/test/ParallelTest.cpp @@ -84,3 +84,15 @@ TEST_F(ParallelTest, VoidReturnAll) { ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered"; ASSERT_EQ(100, *val) << "Not all pended lambda functions were called as expected"; } + +TEST_F(ParallelTest, Barrier) { + AutoCurrentContext()->Initiate(); + autowiring::parallel p; + + std::atomic x{ 0 }; + for (size_t i = 0; i < 1000; i++) + p += [&x] { x++; }; + + p.barrier(); + ASSERT_EQ(1000, x) << "Not all parallel watchers were completed on return from join"; +}