Skip to content

Commit

Permalink
Support no-return parallel entries
Browse files Browse the repository at this point in the history
This allows lambdas that have no return type to be added to the parallel collection, and have their completion status be tracked the same way.

Fixes #687
  • Loading branch information
codemercenary committed Jul 30, 2015
1 parent 8649c9c commit ad98823
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 70 deletions.
215 changes: 146 additions & 69 deletions autowiring/Parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,109 @@

namespace autowiring {

class parallel;

/// <summary>
/// Iterator that acts as a proxy to the outputs of a parallel structure
/// </summary>
template<typename T>
struct parallel_iterator :
public std::iterator<std::input_iterator_tag, T>
{
public:
parallel_iterator(parallel& p, const size_t& remaining):
m_parent(p),
m_remaining(remaining)
{}

protected:
parallel& m_parent;
const size_t& m_remaining;

public:
bool operator!=(const parallel_iterator& rhs) {
return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent;
}

// Wrap, required to satisfy InputIterator requirements.
struct wrap {
wrap(T val) : val(val) {}

T val;
T& operator*(void) { return val; }
};

// Iterator operaror overloads:
parallel_iterator operator++(void);
wrap operator++(int);
T operator*(void);
};

template<typename T>
class parallel_collection {
public:
typedef parallel_iterator<T> iterator;

explicit parallel_collection(iterator begin, iterator end):
m_begin(begin),
m_end(end)
{}

private:
iterator m_begin;
iterator m_end;

public:
const iterator& begin(void) { return m_begin; }
const iterator& end(void) const { return m_end; }
};

/// <summary>
/// Full specialization for null responses
/// </summary>
/// <remarks>
/// Technically, this isn't even an iterator, but it's provided to allow parallel::begin to work
/// properly with void as the template type.
/// </remarks>
template<>
struct parallel_iterator<void>
{
public:
parallel_iterator(parallel& p, const size_t& remaining) :
m_parent(p),
m_remaining(remaining)
{}

protected:
parallel& m_parent;
const size_t& m_remaining;

public:
bool operator!=(const parallel_iterator& rhs) {
return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent;
}

parallel_iterator operator++(void) {
this->operator++(0);
return *this;
}
void operator++(int);

struct unused {};
unused operator*(void) const { return{}; };
};

// Provides fan-out and gather functionality. Lambda "jobs" can be started using operator+=
// and gathered using the standard container iteration interface using begin and end. Jobs
// are run in the thread pool of the current context
class parallel {
public:
// Add job to be run in the thread pool
template<typename _Fx>
void operator+=(_Fx&& fx) {
typename std::enable_if<
!std::is_same<void, typename std::result_of<_Fx()>::type>::value
>::type
operator+=(_Fx&& fx) {
using RetType = typename std::remove_cv<decltype(fx())>::type;

// Increment remain jobs. This is decremented by calls to "Pop"
Expand All @@ -32,20 +127,42 @@ class parallel {
};
}

// Specialization for jobs that don't return anything
template<typename _Fx>
typename std::enable_if<
std::is_same<void, typename std::result_of<_Fx()>::type>::value
>::type
operator+=(_Fx&& fx) {
// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_queueMutex, ++m_outstandingCount;

*m_ctxt += [this, fx] {
std::lock_guard<std::mutex> lk(m_queueMutex);
m_nVoidEntries++;
m_queueUpdated.notify_all();
};
}

// Discard the most recent result. Blocks until the next result arives.
template<typename T>
void Pop(void) {
std::unique_lock<std::mutex> lk(m_queueMutex);

if (m_queue[typeid(T)].empty())
if (!m_outstandingCount)
if (std::is_same<void, T>::value) {
if(!m_nVoidEntries)
throw std::out_of_range("No outstanding jobs");

m_queueUpdated.wait(lk, [this]{
return !m_queue[typeid(T)].empty();
});
m_queueUpdated.wait(lk, [this] { return m_nVoidEntries != 0; });
m_nVoidEntries--;
} else {
auto& qu = m_queue[typeid(T)];
if (qu.empty())
throw std::out_of_range("No outstanding jobs");

m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
qu.pop_front();
}

m_queue[typeid(T)].pop_front();
--m_outstandingCount;
}

Expand All @@ -62,70 +179,10 @@ class parallel {
return *static_cast<T*>(m_queue[typeid(T)].front().ptr());
}

// Iterator that acts as a proxy to
template<typename T>
struct parallel_iterator:
public std::iterator<std::input_iterator_tag, T>
{
parallel_iterator(parallel& p, const size_t& remaining):
m_parent(p),
m_remaining(remaining)
{}

bool operator!=(const parallel_iterator& rhs) {
return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent;
}

parallel_iterator operator++(void) {
m_parent.Pop<T>();
return *this;
}

struct wrap {
wrap(T val) : val(val) {}

T val;
T& operator*(void) { return val; }
};

wrap operator++(int) {
wrap retVal = **this;
m_parent.Pop<T>();
return retVal;
}

T operator*(void) {
return m_parent.Top<T>();
}

protected:
parallel& m_parent;
const size_t& m_remaining;
};

template<typename T>
class collection {
public:
typedef parallel_iterator<T> iterator;

explicit collection(parallel& ll):
m_begin(ll.begin<T>()),
m_end(ll.end<T>())
{}

private:
iterator m_begin;
iterator m_end;

public:
const iterator& begin(void) { return m_begin; }
const iterator& end(void) const { return m_end; }
};

// Get a collection containing all entries of the specified type
template<typename T>
collection<T> all(void) {
return collection<T> { *this };
parallel_collection<T> all(void) {
return parallel_collection<T> { begin<T>(), end<T>() };
}

// Get an iterator to the begining of out queue of job results
Expand All @@ -146,10 +203,30 @@ class parallel {
std::condition_variable m_queueUpdated;
std::unordered_map<std::type_index, std::deque<AnySharedPointer>> m_queue;

// For void entries we don't need a queue, we can just keep a general count of "done"
size_t m_nVoidEntries = 0;

size_t m_outstandingCount = 0;

AutoCurrentContext m_ctxt;
};

template<typename T>
parallel_iterator<T> parallel_iterator<T>::operator++(void) {
m_parent.Pop<T>();
return *this;
}

template<typename T>
typename parallel_iterator<T>::wrap parallel_iterator<T>::operator++(int) {
wrap retVal = **this;
m_parent.Pop<T>();
return retVal;
}

template<typename T>
T parallel_iterator<T>::operator*(void) {
return m_parent.Top<T>();
}

}//namespace autowiring
5 changes: 5 additions & 0 deletions src/autowiring/Parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
#include "stdafx.h"
#include "Parallel.h"

using namespace autowiring;

void parallel_iterator<void>::operator++(int) {
m_parent.Pop<void>();
}
30 changes: 29 additions & 1 deletion src/autowiring/test/ParallelTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TEST_F(ParallelTest, Basic) {

for (int i : {0,4,2,5,1,3}) {
int sleepTime = dist(mt);
p += [i, sleepTime]() {
p += [i, sleepTime] {
std::this_thread::sleep_for(sleepTime*std::chrono::milliseconds(1));
return i;
};
Expand Down Expand Up @@ -55,3 +55,31 @@ TEST_F(ParallelTest, All) {
for (size_t i = 1; i < entries.size(); i++)
ASSERT_EQ(entries[i - 1], entries[i] - 1) << "Entry did not complete as expected";
}

TEST_F(ParallelTest, VoidReturn) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

auto val = std::make_shared<std::atomic<size_t>>(0);
for (size_t i = 0; i < 100; i++)
p += [val] { (*val)++; };

size_t i = 0;
for (auto q = p.begin<void>(); q != p.end<void>(); ++q)
i++;
ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered";
}

TEST_F(ParallelTest, VoidReturnAll) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

auto val = std::make_shared<std::atomic<size_t>>(0);
for (size_t i = 0; i < 100; i++)
p += [val] { (*val)++; };

size_t i = 0;
for (auto entry : p.all<void>())
i++;
ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered";
}

0 comments on commit ad98823

Please sign in to comment.