Skip to content

Commit

Permalink
Merge pull request #696 from leapmotion/feature-voidparallel
Browse files Browse the repository at this point in the history
Support no-return parallel entries
  • Loading branch information
jdonald committed Jul 31, 2015
2 parents e9b1389 + 7752637 commit 7871694
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 73 deletions.
217 changes: 145 additions & 72 deletions autowiring/Parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,109 @@

namespace autowiring {

class parallel;

/// <summary>
/// Iterator that acts as a proxy to the outputs of a parallel structure
/// </summary>
template<typename T>
struct parallel_iterator :
public std::iterator<std::input_iterator_tag, T>
{
public:
parallel_iterator(parallel& p, const size_t& remaining):
m_parent(p),
m_remaining(remaining)
{}

protected:
parallel& m_parent;
const size_t& m_remaining;

public:
bool operator!=(const parallel_iterator& rhs) {
return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent;
}

// Wrap, required to satisfy InputIterator requirements.
struct wrap {
wrap(T val) : val(val) {}

T val;
T& operator*(void) { return val; }
};

// Iterator operaror overloads:
parallel_iterator operator++(void);
wrap operator++(int);
T operator*(void);
};

template<typename T>
class parallel_collection {
public:
typedef parallel_iterator<T> iterator;

explicit parallel_collection(iterator begin, iterator end):
m_begin(begin),
m_end(end)
{}

private:
iterator m_begin;
iterator m_end;

public:
const iterator& begin(void) { return m_begin; }
const iterator& end(void) const { return m_end; }
};

/// <summary>
/// Full specialization for null responses
/// </summary>
/// <remarks>
/// Technically, this isn't even an iterator, but it's provided to allow parallel::begin to work
/// properly with void as the template type.
/// </remarks>
template<>
struct parallel_iterator<void>
{
public:
parallel_iterator(parallel& p, const size_t& remaining) :
m_parent(p),
m_remaining(remaining)
{}

protected:
parallel& m_parent;
const size_t& m_remaining;

public:
bool operator!=(const parallel_iterator& rhs) {
return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent;
}

parallel_iterator operator++(void) {
this->operator++(0);
return *this;
}
void operator++(int);

struct unused {};
unused operator*(void) const { return{}; };
};

// Provides fan-out and gather functionality. Lambda "jobs" can be started using operator+=
// and gathered using the standard container iteration interface using begin and end. Jobs
// are run in the thread pool of the current context
class parallel {
public:
// Add job to be run in the thread pool
template<typename _Fx>
void operator+=(_Fx&& fx) {
typename std::enable_if<
!std::is_same<void, typename std::result_of<_Fx()>::type>::value
>::type
operator+=(_Fx&& fx) {
using RetType = typename std::remove_cv<decltype(fx())>::type;

// Increment remain jobs. This is decremented by calls to "Pop"
Expand All @@ -32,20 +127,38 @@ class parallel {
};
}

// Specialization for jobs that don't return anything
template<typename _Fx>
typename std::enable_if<
std::is_same<void, typename std::result_of<_Fx()>::type>::value
>::type
operator+=(_Fx&& fx) {
// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_queueMutex, ++m_outstandingCount;

*m_ctxt += [this, fx] {
std::lock_guard<std::mutex> lk(m_queueMutex);
m_nVoidEntries++;
m_queueUpdated.notify_all();
};
}

// Discard the most recent result. Blocks until the next result arives.
template<typename T>
void Pop(void) {
std::unique_lock<std::mutex> lk(m_queueMutex);
if (!m_outstandingCount)
throw std::out_of_range("No outstanding jobs");

if (std::is_same<void, T>::value) {
m_queueUpdated.wait(lk, [this] { return m_nVoidEntries != 0; });
m_nVoidEntries--;
} else {
auto& qu = m_queue[typeid(T)];
m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
qu.pop_front();
}

if (m_queue[typeid(T)].empty())
if (!m_outstandingCount)
throw std::out_of_range("No outstanding jobs");

m_queueUpdated.wait(lk, [this]{
return !m_queue[typeid(T)].empty();
});

m_queue[typeid(T)].pop_front();
--m_outstandingCount;
}

Expand All @@ -62,70 +175,10 @@ class parallel {
return *static_cast<T*>(m_queue[typeid(T)].front().ptr());
}

// Iterator that acts as a proxy to
template<typename T>
struct parallel_iterator:
public std::iterator<std::input_iterator_tag, T>
{
parallel_iterator(parallel& p, const size_t& remaining):
m_parent(p),
m_remaining(remaining)
{}

bool operator!=(const parallel_iterator& rhs) {
return m_remaining != rhs.m_remaining || &m_parent != &rhs.m_parent;
}

parallel_iterator operator++(void) {
m_parent.Pop<T>();
return *this;
}

struct wrap {
wrap(T val) : val(val) {}

T val;
T& operator*(void) { return val; }
};

wrap operator++(int) {
wrap retVal = **this;
m_parent.Pop<T>();
return retVal;
}

T operator*(void) {
return m_parent.Top<T>();
}

protected:
parallel& m_parent;
const size_t& m_remaining;
};

template<typename T>
class collection {
public:
typedef parallel_iterator<T> iterator;

explicit collection(parallel& ll):
m_begin(ll.begin<T>()),
m_end(ll.end<T>())
{}

private:
iterator m_begin;
iterator m_end;

public:
const iterator& begin(void) { return m_begin; }
const iterator& end(void) const { return m_end; }
};

// Get a collection containing all entries of the specified type
template<typename T>
collection<T> all(void) {
return collection<T> { *this };
parallel_collection<T> all(void) {
return parallel_collection<T> { begin<T>(), end<T>() };
}

// Get an iterator to the begining of out queue of job results
Expand All @@ -151,10 +204,30 @@ class parallel {
std::condition_variable m_queueUpdated;
std::unordered_map<std::type_index, std::deque<AnySharedPointer>> m_queue;

// For void entries we don't need a queue, we can just keep a general count of "done"
size_t m_nVoidEntries = 0;

size_t m_outstandingCount = 0;

AutoCurrentContext m_ctxt;
};

template<typename T>
parallel_iterator<T> parallel_iterator<T>::operator++(void) {
m_parent.Pop<T>();
return *this;
}

template<typename T>
typename parallel_iterator<T>::wrap parallel_iterator<T>::operator++(int) {
wrap retVal = **this;
m_parent.Pop<T>();
return retVal;
}

template<typename T>
T parallel_iterator<T>::operator*(void) {
return m_parent.Top<T>();
}

}//namespace autowiring
5 changes: 5 additions & 0 deletions src/autowiring/Parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
#include "stdafx.h"
#include "Parallel.h"

using namespace autowiring;

void parallel_iterator<void>::operator++(int) {
m_parent.Pop<void>();
}
30 changes: 29 additions & 1 deletion src/autowiring/test/ParallelTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TEST_F(ParallelTest, Basic) {

for (int i : {0,4,2,5,1,3}) {
int sleepTime = dist(mt);
p += [i, sleepTime]() {
p += [i, sleepTime] {
std::this_thread::sleep_for(sleepTime*std::chrono::milliseconds(1));
return i;
};
Expand Down Expand Up @@ -55,3 +55,31 @@ TEST_F(ParallelTest, All) {
for (size_t i = 1; i < entries.size(); i++)
ASSERT_EQ(entries[i - 1], entries[i] - 1) << "Entry did not complete as expected";
}

TEST_F(ParallelTest, VoidReturn) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

auto val = std::make_shared<std::atomic<size_t>>(0);
for (size_t i = 0; i < 100; i++)
p += [val] { (*val)++; };

size_t i = 0;
for (auto q = p.begin<void>(); q != p.end<void>(); ++q)
i++;
ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered";
}

TEST_F(ParallelTest, VoidReturnAll) {
AutoCurrentContext()->Initiate();
autowiring::parallel p;

auto val = std::make_shared<std::atomic<size_t>>(0);
for (size_t i = 0; i < 100; i++)
p += [val] { (*val)++; };

size_t i = 0;
for (auto entry : p.all<void>())
i++;
ASSERT_EQ(100UL, i) << "A sufficient number of empty lambdas were not encountered";
}

0 comments on commit 7871694

Please sign in to comment.