From ad988235ff9a6c80537df118d6e01e68885f8c3d Mon Sep 17 00:00:00 2001 From: Jason Lokerson Date: Wed, 29 Jul 2015 10:45:12 -0700 Subject: [PATCH] Support no-return parallel entries This allows lambdas that have no return type to be added to the parallel collection, and have their completion status be tracked the same way. Fixes #687 --- autowiring/Parallel.h | 215 ++++++++++++++++++--------- src/autowiring/Parallel.cpp | 5 + src/autowiring/test/ParallelTest.cpp | 30 +++- 3 files changed, 180 insertions(+), 70 deletions(-) diff --git a/autowiring/Parallel.h b/autowiring/Parallel.h index 859ce619f..a98bfca07 100644 --- a/autowiring/Parallel.h +++ b/autowiring/Parallel.h @@ -10,6 +10,98 @@ namespace autowiring { +class parallel; + +/// +/// Iterator that acts as a proxy to the outputs of a parallel structure +/// +template +struct parallel_iterator : + public std::iterator +{ +public: + parallel_iterator(parallel& p, const size_t& remaining): + m_parent(p), + m_remaining(remaining) + {} + +protected: + parallel& m_parent; + const size_t& m_remaining; + +public: + bool operator!=(const parallel_iterator& rhs) { + return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent; + } + + // Wrap, required to satisfy InputIterator requirements. + struct wrap { + wrap(T val) : val(val) {} + + T val; + T& operator*(void) { return val; } + }; + + // Iterator operaror overloads: + parallel_iterator operator++(void); + wrap operator++(int); + T operator*(void); +}; + +template +class parallel_collection { +public: + typedef parallel_iterator iterator; + + explicit parallel_collection(iterator begin, iterator end): + m_begin(begin), + m_end(end) + {} + +private: + iterator m_begin; + iterator m_end; + +public: + const iterator& begin(void) { return m_begin; } + const iterator& end(void) const { return m_end; } +}; + +/// +/// Full specialization for null responses +/// +/// +/// Technically, this isn't even an iterator, but it's provided to allow parallel::begin to work +/// properly with void as the template type. +/// +template<> +struct parallel_iterator +{ +public: + parallel_iterator(parallel& p, const size_t& remaining) : + m_parent(p), + m_remaining(remaining) + {} + +protected: + parallel& m_parent; + const size_t& m_remaining; + +public: + bool operator!=(const parallel_iterator& rhs) { + return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent; + } + + parallel_iterator operator++(void) { + this->operator++(0); + return *this; + } + void operator++(int); + + struct unused {}; + unused operator*(void) const { return{}; }; +}; + // Provides fan-out and gather functionality. Lambda "jobs" can be started using operator+= // and gathered using the standard container iteration interface using begin and end. Jobs // are run in the thread pool of the current context @@ -17,7 +109,10 @@ class parallel { public: // Add job to be run in the thread pool template - void operator+=(_Fx&& fx) { + typename std::enable_if< + !std::is_same::type>::value + >::type + operator+=(_Fx&& fx) { using RetType = typename std::remove_cv::type; // Increment remain jobs. This is decremented by calls to "Pop" @@ -32,20 +127,42 @@ class parallel { }; } + // Specialization for jobs that don't return anything + template + typename std::enable_if< + std::is_same::type>::value + >::type + operator+=(_Fx&& fx) { + // Increment remain jobs. This is decremented by calls to "Pop" + (std::lock_guard)m_queueMutex, ++m_outstandingCount; + + *m_ctxt += [this, fx] { + std::lock_guard lk(m_queueMutex); + m_nVoidEntries++; + m_queueUpdated.notify_all(); + }; + } + // Discard the most recent result. Blocks until the next result arives. template void Pop(void) { std::unique_lock lk(m_queueMutex); - if (m_queue[typeid(T)].empty()) - if (!m_outstandingCount) + if (std::is_same::value) { + if(!m_nVoidEntries) throw std::out_of_range("No outstanding jobs"); - m_queueUpdated.wait(lk, [this]{ - return !m_queue[typeid(T)].empty(); - }); + m_queueUpdated.wait(lk, [this] { return m_nVoidEntries != 0; }); + m_nVoidEntries--; + } else { + auto& qu = m_queue[typeid(T)]; + if (qu.empty()) + throw std::out_of_range("No outstanding jobs"); + + m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); }); + qu.pop_front(); + } - m_queue[typeid(T)].pop_front(); --m_outstandingCount; } @@ -62,70 +179,10 @@ class parallel { return *static_cast(m_queue[typeid(T)].front().ptr()); } - // Iterator that acts as a proxy to - template - struct parallel_iterator: - public std::iterator - { - parallel_iterator(parallel& p, const size_t& remaining): - m_parent(p), - m_remaining(remaining) - {} - - bool operator!=(const parallel_iterator& rhs) { - return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent; - } - - parallel_iterator operator++(void) { - m_parent.Pop(); - return *this; - } - - struct wrap { - wrap(T val) : val(val) {} - - T val; - T& operator*(void) { return val; } - }; - - wrap operator++(int) { - wrap retVal = **this; - m_parent.Pop(); - return retVal; - } - - T operator*(void) { - return m_parent.Top(); - } - - protected: - parallel& m_parent; - const size_t& m_remaining; - }; - - template - class collection { - public: - typedef parallel_iterator iterator; - - explicit collection(parallel& ll): - m_begin(ll.begin()), - m_end(ll.end()) - {} - - private: - iterator m_begin; - iterator m_end; - - public: - const iterator& begin(void) { return m_begin; } - const iterator& end(void) const { return m_end; } - }; - // Get a collection containing all entries of the specified type template - collection all(void) { - return collection { *this }; + parallel_collection all(void) { + return parallel_collection { begin(), end() }; } // Get an iterator to the begining of out queue of job results @@ -146,10 +203,30 @@ class parallel { std::condition_variable m_queueUpdated; std::unordered_map> m_queue; + // For void entries we don't need a queue, we can just keep a general count of "done" + size_t m_nVoidEntries = 0; + size_t m_outstandingCount = 0; AutoCurrentContext m_ctxt; }; +template +parallel_iterator parallel_iterator::operator++(void) { + m_parent.Pop(); + return *this; +} + +template +typename parallel_iterator::wrap parallel_iterator::operator++(int) { + wrap retVal = **this; + m_parent.Pop(); + return retVal; +} + +template +T parallel_iterator::operator*(void) { + return m_parent.Top(); +} }//namespace autowiring diff --git a/src/autowiring/Parallel.cpp b/src/autowiring/Parallel.cpp index e5f78f0aa..160f05bb6 100644 --- a/src/autowiring/Parallel.cpp +++ b/src/autowiring/Parallel.cpp @@ -2,3 +2,8 @@ #include "stdafx.h" #include "Parallel.h" +using namespace autowiring; + +void parallel_iterator::operator++(int) { + m_parent.Pop(); +} \ No newline at end of file diff --git a/src/autowiring/test/ParallelTest.cpp b/src/autowiring/test/ParallelTest.cpp index 2035fd2e4..05d4d572f 100644 --- a/src/autowiring/test/ParallelTest.cpp +++ b/src/autowiring/test/ParallelTest.cpp @@ -19,7 +19,7 @@ TEST_F(ParallelTest, Basic) { for (int i : {0,4,2,5,1,3}) { int sleepTime = dist(mt); - p += [i, sleepTime]() { + p += [i, sleepTime] { std::this_thread::sleep_for(sleepTime*std::chrono::milliseconds(1)); return i; }; @@ -55,3 +55,31 @@ TEST_F(ParallelTest, All) { for (size_t i = 1; i < entries.size(); i++) ASSERT_EQ(entries[i - 1], entries[i] - 1) << "Entry did not complete as expected"; } + +TEST_F(ParallelTest, VoidReturn) { + AutoCurrentContext()->Initiate(); + autowiring::parallel p; + + auto val = std::make_shared>(0); + for (size_t i = 0; i < 100; i++) + p += [val] { (*val)++; }; + + size_t i = 0; + for (auto q = p.begin(); q != p.end(); ++q) + i++; + ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered"; +} + +TEST_F(ParallelTest, VoidReturnAll) { + AutoCurrentContext()->Initiate(); + autowiring::parallel p; + + auto val = std::make_shared>(0); + for (size_t i = 0; i < 100; i++) + p += [val] { (*val)++; }; + + size_t i = 0; + for (auto entry : p.all()) + i++; + ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered"; +}