Skip to content

Commit

Permalink
Merge pull request #1033 from leapmotion/misc-fixes
Browse files Browse the repository at this point in the history
Miscellaneous  minor fixes
  • Loading branch information
jdonald authored Nov 7, 2017
2 parents 8e0a804 + 4e7d0be commit 6e4022a
Show file tree
Hide file tree
Showing 16 changed files with 132 additions and 51 deletions.
20 changes: 10 additions & 10 deletions src/autowiring/AutoPacketFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ bool AutoPacketFactory::IsRunning(void) const {
}

std::shared_ptr<AutoPacket> AutoPacketFactory::CurrentPacket(void) {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
return m_curPacket.lock();
}

std::shared_ptr<AutoPacket> AutoPacketFactory::NewPacket(void) {
std::shared_ptr<AutoPacketInternal> retVal;
bool isFirstPacket;
{
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);

if (ShouldStop())
throw autowiring_error("Attempted to create a packet on an AutoPacketFactory that was already terminated");
Expand Down Expand Up @@ -84,14 +84,14 @@ std::shared_ptr<void> AutoPacketFactory::GetInternalOutstanding(void) {
}

std::vector<AutoFilterDescriptor> AutoPacketFactory::GetAutoFilters(void) const {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
std::vector<AutoFilterDescriptor> retVal;
retVal.assign(m_autoFilters.begin(), m_autoFilters.end());
return retVal;
}

SatCounter* AutoPacketFactory::CreateSatCounterList(void) const {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);

// Trivial return check
if (m_autoFilters.empty())
Expand All @@ -113,7 +113,7 @@ SatCounter* AutoPacketFactory::CreateSatCounterList(void) const {

bool AutoPacketFactory::OnStart(void) {
// Initialize first packet
std::lock_guard<std::mutex>{m_lock},
std::lock_guard<std::mutex>{m_apfLock},
m_nextPacket = ConstructPacket();
return true;
}
Expand All @@ -124,7 +124,7 @@ void AutoPacketFactory::OnStop(bool graceful) {
std::shared_ptr<AutoPacketInternal> nextPacket;

// Lock destruction precedes local variables
std::lock_guard<std::mutex>{m_lock},
std::lock_guard<std::mutex>{m_apfLock},
autoFilters.swap(m_autoFilters),
nextPacket.swap(m_nextPacket);
}
Expand Down Expand Up @@ -156,14 +156,14 @@ void AutoPacketFactory::Clear(void) {
}

const AutoFilterDescriptor& AutoPacketFactory::AddSubscriber(const AutoFilterDescriptor& rhs) {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
m_autoFilters.insert(rhs);
return rhs;
}

void AutoPacketFactory::RemoveSubscriber(const AutoFilterDescriptor& autoFilter) {
// Trivial removal from the autofilter set:
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
m_autoFilters.erase(autoFilter);
}

Expand All @@ -186,7 +186,7 @@ size_t AutoPacketFactory::GetOutstandingPacketCount(void) const {
}

void AutoPacketFactory::RecordPacketDuration(std::chrono::nanoseconds duration) {
std::unique_lock<std::mutex> lk(m_lock);
std::unique_lock<std::mutex> lk(m_apfLock);
m_packetDurationSum += duration.count();
m_packetDurationSqSum += duration.count() * duration.count();
}
Expand All @@ -202,7 +202,7 @@ double AutoPacketFactory::GetPacketLifetimeStandardDeviation(void) {
}

void AutoPacketFactory::ResetPacketStatistics(void) {
std::unique_lock<std::mutex> lk(m_lock);
std::unique_lock<std::mutex> lk(m_apfLock);
m_packetCount = 0;
m_packetDurationSum = 0.0;
m_packetDurationSqSum = 0.0;
Expand Down
4 changes: 2 additions & 2 deletions src/autowiring/AutoPacketFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class AutoPacketFactory:

private:
// Lock for this type
mutable std::mutex m_lock;
mutable std::mutex m_apfLock;

// Internal outstanding reference for issued packet:
std::weak_ptr<void> m_outstandingInternal;
Expand Down Expand Up @@ -60,7 +60,7 @@ class AutoPacketFactory:
/// </summary>
template<class T>
void AppendAutoFiltersTo(T& container) const {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
container.insert(container.end(), m_autoFilters.begin(), m_autoFilters.end());
}

Expand Down
9 changes: 7 additions & 2 deletions src/autowiring/CoreRunnable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ bool CoreRunnable::Start(std::shared_ptr<CoreObject> outstanding) {
return true;

m_wasStarted = true;
m_outstanding = outstanding;
m_outstanding = std::move(outstanding);
outstanding.reset();
if(!OnStart()) {
m_shouldStop = true;

m_outstanding.reset();

// Immediately invoke a graceless stop in response
Expand All @@ -38,20 +40,23 @@ bool CoreRunnable::Start(std::shared_ptr<CoreObject> outstanding) {
}

void CoreRunnable::Stop(bool graceful) {
std::unique_lock<std::mutex> lk(m_lock);
if (!m_shouldStop) {
// Stop flag should be pulled high
m_shouldStop = true;

// Do not call this method more than once:
lk.unlock();
OnStop(graceful);
lk.lock();
}

if (m_outstanding) {
// Ensure we do not invoke the outstanding count dtor while holding a lock
std::shared_ptr<CoreObject> outstanding;
std::lock_guard<std::mutex>{m_lock},
outstanding.swap(m_outstanding);
}
lk.unlock();

// Everything looks good now
m_cv.notify_all();
Expand Down
5 changes: 5 additions & 0 deletions src/autowiring/CoreThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ void CoreThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shar
// Kill everything in the dispatch queue and also run it down
{
CurrentContextPusher pshr(ctxt);
// Only allow one thread at a time to clean up the dispatch queue
std::lock_guard<std::mutex> lk(m_stoppingLock);
Rundown();
}

Expand All @@ -27,6 +29,9 @@ void CoreThread::Run() {
}

void CoreThread::OnStop(bool graceful) {
// Only allow one thread at a time to clean up the dispatch queue
std::lock_guard<std::mutex> lk(m_stoppingLock);

// Base class handling first:
BasicThread::OnStop(graceful);

Expand Down
5 changes: 5 additions & 0 deletions src/autowiring/CoreThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class CoreThread:
virtual ~CoreThread(void);

protected:
/// <summary>
/// While stopping, make sure we do it exclusively
/// </summary>
std::mutex m_stoppingLock;

/// <summary>
/// Overridden here so we can rundown the dispatch queue
/// </summary>
Expand Down
95 changes: 67 additions & 28 deletions src/autowiring/Parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,23 @@ class parallel {
operator+=(_Fx&& fx) {
using RetType = typename std::remove_cv<decltype(fx())>::type;

auto block = m_block;
if (!block)
return;

// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_block->m_lock, ++m_block->m_outstandingCount;
(std::lock_guard<std::mutex>)block->m_lock, ++block->m_outstandingCount;

block->dq += [this, fx] {
auto block = m_block;
if (!block)
return;

m_block->dq += [this, fx] {
auto result = std::make_shared<RetType>(fx());

std::lock_guard<std::mutex>{m_block->m_lock},
m_block->m_queue[auto_id_t<RetType>{}].emplace_back(std::move(result));
m_block->m_queueUpdated.notify_all();
std::lock_guard<std::mutex>{block->m_lock},
block->m_queue[auto_id_t<RetType>{}].emplace_back(std::move(result));
block->m_queueUpdated.notify_all();
};
}

Expand All @@ -168,48 +176,68 @@ class parallel {
std::is_same<void, typename std::result_of<_Fx()>::type>::value
>::type
operator+=(_Fx&& fx) {
auto block = m_block;
if (!block)
return;

// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_block->m_lock, ++m_block->m_outstandingCount;
(std::lock_guard<std::mutex>)block->m_lock, ++block->m_outstandingCount;

block->dq += [this, fx] {
auto block = m_block;
if (!block)
return;

m_block->dq += [this, fx] {
fx();

std::lock_guard<std::mutex>{m_block->m_lock},
m_block->m_nVoidEntries++;
m_block->m_queueUpdated.notify_all();
std::lock_guard<std::mutex>{block->m_lock},
block->m_nVoidEntries++;
block->m_queueUpdated.notify_all();
};
}

// Discard the most recent result. Blocks until the next result arives.
template<typename T>
void Pop(void) {
std::unique_lock<std::mutex> lk(m_block->m_lock);
if (!m_block->m_outstandingCount)
auto block = m_block;
if (!block)
return;

std::unique_lock<std::mutex> lk(block->m_lock);
if (!block->m_outstandingCount)
throw std::out_of_range("No outstanding jobs");

if (std::is_same<void, T>::value) {
m_block->m_queueUpdated.wait(lk, [this] { return m_block->m_nVoidEntries != 0; });
m_block->m_nVoidEntries--;
block->m_queueUpdated.wait(lk, [this] { auto block = m_block; return block ? block->m_nVoidEntries != 0 : true; });
block->m_nVoidEntries--;
} else {
auto& qu = m_block->m_queue[auto_id_t<T>{}];
m_block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
auto& qu = block->m_queue[auto_id_t<T>{}];
block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
qu.pop_front();
}

--m_block->m_outstandingCount;
--block->m_outstandingCount;
}

// Get the most result from the most recent job. Blocks until a result arrives
// if there isn't one already available
template<typename T>
T Top(void) {
std::unique_lock<std::mutex> lk(m_block->m_lock);
auto block = m_block;
if (!block)
return T{};

std::unique_lock<std::mutex> lk(block->m_lock);

if (m_block->m_queue[auto_id_t<T>{}].empty())
m_block->m_queueUpdated.wait(lk, [this]{
return !m_block->m_queue[auto_id_t<T>{}].empty();
if (block->m_queue[auto_id_t<T>{}].empty())
block->m_queueUpdated.wait(lk, [this]{
auto block = m_block;
if (!block)
return true;

return !block->m_queue[auto_id_t<T>{}].empty();
});
return *static_cast<T*>(m_block->m_queue[auto_id_t<T>{}].front().ptr());
return *static_cast<T*>(block->m_queue[auto_id_t<T>{}].front().ptr());
}

// Get a collection containing all entries of the specified type
Expand All @@ -225,19 +253,30 @@ class parallel {
/// If a stop call has been made, this method will also block until all owned threads have quit
/// </remarks>
void barrier(void) {
std::unique_lock<std::mutex> lk(m_block->m_lock);
m_block->m_queueUpdated.wait(lk, [this] {
size_t totalReady = m_block->m_nVoidEntries;
for (auto& entry : m_block->m_queue)
auto block = m_block;
if (!block)
return;

std::unique_lock<std::mutex> lk(block->m_lock);
block->m_queueUpdated.wait(lk, [this] {
auto block = m_block;
if (!block)
return true;
size_t totalReady = block->m_nVoidEntries;
for (auto& entry : block->m_queue)
totalReady += entry.second.size();
return m_block->m_outstandingCount == totalReady;
return block->m_outstandingCount == totalReady;
});
}

// Get an iterator to the begining of out queue of job results
template<typename T>
parallel_iterator<T> begin(void) {
return{ *this, m_block->m_outstandingCount };
auto block = m_block;
if (!block)
return{ *this, 0 };

return{ *this, block->m_outstandingCount };
}

// Iterator representing no jobs results remaining
Expand Down
7 changes: 7 additions & 0 deletions src/autowiring/auto_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ namespace autowiring {
)
{}

#if !defined(_MSC_VER)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wc++1z-compat-mangling"
#endif
auto_id_block(
int index,
const std::type_info* ti,
Expand All @@ -75,6 +79,9 @@ namespace autowiring {
pToObj(pToObj),
pFromObj(pFromObj)
{}
#if !defined(_MSC_VER)
#pragma clang diagnostic pop
#endif

// Index and underlying type. Indexes are guaranteed to start at 1. The index value of 0
// is reserved as the invalid index.
Expand Down
2 changes: 2 additions & 0 deletions src/autowiring/test/AnySharedPointerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ TEST_F(AnySharedPointerTest, CanHoldCoreObject) {
ASSERT_EQ(co, x) << "Held CoreObject was not equivalent to constructed instance";
}

template<> const autowiring::fast_pointer_cast_initializer<CoreObject, CoreObject> autowiring::fast_pointer_cast_initializer<CoreObject, CoreObject>::sc_init;

TEST_F(AnySharedPointerTest, CanFastCastToSelf) {
(void)autowiring::fast_pointer_cast_initializer<CoreObject, CoreObject>::sc_init;

Expand Down
3 changes: 3 additions & 0 deletions src/autowiring/test/ContextMemberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ namespace {
}

TEST_F(ContextMemberTest, PathologicalResetCase) {
if (std::thread::hardware_concurrency() == 1)
return; // Don't bother running on a single-core machine

Autowired<TypeThatIsNotInjected>* pv;
volatile std::atomic<size_t> nBarr{ 0 };
volatile bool proceed = true;
Expand Down
Loading

0 comments on commit 6e4022a

Please sign in to comment.