diff --git a/src/autowiring/AutoPacketFactory.cpp b/src/autowiring/AutoPacketFactory.cpp index 653078dc6..c890a8d83 100644 --- a/src/autowiring/AutoPacketFactory.cpp +++ b/src/autowiring/AutoPacketFactory.cpp @@ -21,7 +21,7 @@ bool AutoPacketFactory::IsRunning(void) const { } std::shared_ptr AutoPacketFactory::CurrentPacket(void) { - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); return m_curPacket.lock(); } @@ -29,7 +29,7 @@ std::shared_ptr AutoPacketFactory::NewPacket(void) { std::shared_ptr retVal; bool isFirstPacket; { - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); if (ShouldStop()) throw autowiring_error("Attempted to create a packet on an AutoPacketFactory that was already terminated"); @@ -84,14 +84,14 @@ std::shared_ptr AutoPacketFactory::GetInternalOutstanding(void) { } std::vector AutoPacketFactory::GetAutoFilters(void) const { - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); std::vector retVal; retVal.assign(m_autoFilters.begin(), m_autoFilters.end()); return retVal; } SatCounter* AutoPacketFactory::CreateSatCounterList(void) const { - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); // Trivial return check if (m_autoFilters.empty()) @@ -113,7 +113,7 @@ SatCounter* AutoPacketFactory::CreateSatCounterList(void) const { bool AutoPacketFactory::OnStart(void) { // Initialize first packet - std::lock_guard{m_lock}, + std::lock_guard{m_apfLock}, m_nextPacket = ConstructPacket(); return true; } @@ -124,7 +124,7 @@ void AutoPacketFactory::OnStop(bool graceful) { std::shared_ptr nextPacket; // Lock destruction precedes local variables - std::lock_guard{m_lock}, + std::lock_guard{m_apfLock}, autoFilters.swap(m_autoFilters), nextPacket.swap(m_nextPacket); } @@ -156,14 +156,14 @@ void AutoPacketFactory::Clear(void) { } const AutoFilterDescriptor& AutoPacketFactory::AddSubscriber(const AutoFilterDescriptor& rhs) { - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); m_autoFilters.insert(rhs); return rhs; } void AutoPacketFactory::RemoveSubscriber(const AutoFilterDescriptor& autoFilter) { // Trivial removal from the autofilter set: - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); m_autoFilters.erase(autoFilter); } @@ -186,7 +186,7 @@ size_t AutoPacketFactory::GetOutstandingPacketCount(void) const { } void AutoPacketFactory::RecordPacketDuration(std::chrono::nanoseconds duration) { - std::unique_lock lk(m_lock); + std::unique_lock lk(m_apfLock); m_packetDurationSum += duration.count(); m_packetDurationSqSum += duration.count() * duration.count(); } @@ -202,7 +202,7 @@ double AutoPacketFactory::GetPacketLifetimeStandardDeviation(void) { } void AutoPacketFactory::ResetPacketStatistics(void) { - std::unique_lock lk(m_lock); + std::unique_lock lk(m_apfLock); m_packetCount = 0; m_packetDurationSum = 0.0; m_packetDurationSqSum = 0.0; diff --git a/src/autowiring/AutoPacketFactory.h b/src/autowiring/AutoPacketFactory.h index 4041527fc..5dc679d84 100644 --- a/src/autowiring/AutoPacketFactory.h +++ b/src/autowiring/AutoPacketFactory.h @@ -28,7 +28,7 @@ class AutoPacketFactory: private: // Lock for this type - mutable std::mutex m_lock; + mutable std::mutex m_apfLock; // Internal outstanding reference for issued packet: std::weak_ptr m_outstandingInternal; @@ -60,7 +60,7 @@ class AutoPacketFactory: /// template void AppendAutoFiltersTo(T& container) const { - std::lock_guard lk(m_lock); + std::lock_guard lk(m_apfLock); container.insert(container.end(), m_autoFilters.begin(), m_autoFilters.end()); } diff --git a/src/autowiring/CoreRunnable.cpp b/src/autowiring/CoreRunnable.cpp index 4e759270c..14960c684 100644 --- a/src/autowiring/CoreRunnable.cpp +++ b/src/autowiring/CoreRunnable.cpp @@ -24,9 +24,11 @@ bool CoreRunnable::Start(std::shared_ptr outstanding) { return true; m_wasStarted = true; - m_outstanding = outstanding; + m_outstanding = std::move(outstanding); + outstanding.reset(); if(!OnStart()) { m_shouldStop = true; + m_outstanding.reset(); // Immediately invoke a graceless stop in response @@ -38,20 +40,23 @@ bool CoreRunnable::Start(std::shared_ptr outstanding) { } void CoreRunnable::Stop(bool graceful) { + std::unique_lock lk(m_lock); if (!m_shouldStop) { // Stop flag should be pulled high m_shouldStop = true; // Do not call this method more than once: + lk.unlock(); OnStop(graceful); + lk.lock(); } if (m_outstanding) { // Ensure we do not invoke the outstanding count dtor while holding a lock std::shared_ptr outstanding; - std::lock_guard{m_lock}, outstanding.swap(m_outstanding); } + lk.unlock(); // Everything looks good now m_cv.notify_all(); diff --git a/src/autowiring/CoreThread.cpp b/src/autowiring/CoreThread.cpp index 4cb46b8ae..31fdf62c3 100644 --- a/src/autowiring/CoreThread.cpp +++ b/src/autowiring/CoreThread.cpp @@ -14,6 +14,8 @@ void CoreThread::DoRunLoopCleanup(std::shared_ptr&& ctxt, std::shar // Kill everything in the dispatch queue and also run it down { CurrentContextPusher pshr(ctxt); + // Only allow one thread at a time to clean up the dispatch queue + std::lock_guard lk(m_stoppingLock); Rundown(); } @@ -27,6 +29,9 @@ void CoreThread::Run() { } void CoreThread::OnStop(bool graceful) { + // Only allow one thread at a time to clean up the dispatch queue + std::lock_guard lk(m_stoppingLock); + // Base class handling first: BasicThread::OnStop(graceful); diff --git a/src/autowiring/CoreThread.h b/src/autowiring/CoreThread.h index e7f068c15..317b928e7 100644 --- a/src/autowiring/CoreThread.h +++ b/src/autowiring/CoreThread.h @@ -37,6 +37,11 @@ class CoreThread: virtual ~CoreThread(void); protected: + /// + /// While stopping, make sure we do it exclusively + /// + std::mutex m_stoppingLock; + /// /// Overridden here so we can rundown the dispatch queue /// diff --git a/src/autowiring/Parallel.h b/src/autowiring/Parallel.h index fa6a220f0..608a5a1da 100644 --- a/src/autowiring/Parallel.h +++ b/src/autowiring/Parallel.h @@ -150,15 +150,23 @@ class parallel { operator+=(_Fx&& fx) { using RetType = typename std::remove_cv::type; + auto block = m_block; + if (!block) + return; + // Increment remain jobs. This is decremented by calls to "Pop" - (std::lock_guard)m_block->m_lock, ++m_block->m_outstandingCount; + (std::lock_guard)block->m_lock, ++block->m_outstandingCount; + + block->dq += [this, fx] { + auto block = m_block; + if (!block) + return; - m_block->dq += [this, fx] { auto result = std::make_shared(fx()); - std::lock_guard{m_block->m_lock}, - m_block->m_queue[auto_id_t{}].emplace_back(std::move(result)); - m_block->m_queueUpdated.notify_all(); + std::lock_guard{block->m_lock}, + block->m_queue[auto_id_t{}].emplace_back(std::move(result)); + block->m_queueUpdated.notify_all(); }; } @@ -168,48 +176,68 @@ class parallel { std::is_same::type>::value >::type operator+=(_Fx&& fx) { + auto block = m_block; + if (!block) + return; + // Increment remain jobs. This is decremented by calls to "Pop" - (std::lock_guard)m_block->m_lock, ++m_block->m_outstandingCount; + (std::lock_guard)block->m_lock, ++block->m_outstandingCount; + + block->dq += [this, fx] { + auto block = m_block; + if (!block) + return; - m_block->dq += [this, fx] { fx(); - std::lock_guard{m_block->m_lock}, - m_block->m_nVoidEntries++; - m_block->m_queueUpdated.notify_all(); + std::lock_guard{block->m_lock}, + block->m_nVoidEntries++; + block->m_queueUpdated.notify_all(); }; } // Discard the most recent result. Blocks until the next result arives. template void Pop(void) { - std::unique_lock lk(m_block->m_lock); - if (!m_block->m_outstandingCount) + auto block = m_block; + if (!block) + return; + + std::unique_lock lk(block->m_lock); + if (!block->m_outstandingCount) throw std::out_of_range("No outstanding jobs"); if (std::is_same::value) { - m_block->m_queueUpdated.wait(lk, [this] { return m_block->m_nVoidEntries != 0; }); - m_block->m_nVoidEntries--; + block->m_queueUpdated.wait(lk, [this] { auto block = m_block; return block ? block->m_nVoidEntries != 0 : true; }); + block->m_nVoidEntries--; } else { - auto& qu = m_block->m_queue[auto_id_t{}]; - m_block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); }); + auto& qu = block->m_queue[auto_id_t{}]; + block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); }); qu.pop_front(); } - --m_block->m_outstandingCount; + --block->m_outstandingCount; } // Get the most result from the most recent job. Blocks until a result arrives // if there isn't one already available template T Top(void) { - std::unique_lock lk(m_block->m_lock); + auto block = m_block; + if (!block) + return T{}; + + std::unique_lock lk(block->m_lock); - if (m_block->m_queue[auto_id_t{}].empty()) - m_block->m_queueUpdated.wait(lk, [this]{ - return !m_block->m_queue[auto_id_t{}].empty(); + if (block->m_queue[auto_id_t{}].empty()) + block->m_queueUpdated.wait(lk, [this]{ + auto block = m_block; + if (!block) + return true; + + return !block->m_queue[auto_id_t{}].empty(); }); - return *static_cast(m_block->m_queue[auto_id_t{}].front().ptr()); + return *static_cast(block->m_queue[auto_id_t{}].front().ptr()); } // Get a collection containing all entries of the specified type @@ -225,19 +253,30 @@ class parallel { /// If a stop call has been made, this method will also block until all owned threads have quit /// void barrier(void) { - std::unique_lock lk(m_block->m_lock); - m_block->m_queueUpdated.wait(lk, [this] { - size_t totalReady = m_block->m_nVoidEntries; - for (auto& entry : m_block->m_queue) + auto block = m_block; + if (!block) + return; + + std::unique_lock lk(block->m_lock); + block->m_queueUpdated.wait(lk, [this] { + auto block = m_block; + if (!block) + return true; + size_t totalReady = block->m_nVoidEntries; + for (auto& entry : block->m_queue) totalReady += entry.second.size(); - return m_block->m_outstandingCount == totalReady; + return block->m_outstandingCount == totalReady; }); } // Get an iterator to the begining of out queue of job results template parallel_iterator begin(void) { - return{ *this, m_block->m_outstandingCount }; + auto block = m_block; + if (!block) + return{ *this, 0 }; + + return{ *this, block->m_outstandingCount }; } // Iterator representing no jobs results remaining diff --git a/src/autowiring/auto_id.h b/src/autowiring/auto_id.h index c4bb353f6..8770b92c4 100644 --- a/src/autowiring/auto_id.h +++ b/src/autowiring/auto_id.h @@ -58,6 +58,10 @@ namespace autowiring { ) {} +#if !defined(_MSC_VER) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wc++1z-compat-mangling" +#endif auto_id_block( int index, const std::type_info* ti, @@ -75,6 +79,9 @@ namespace autowiring { pToObj(pToObj), pFromObj(pFromObj) {} +#if !defined(_MSC_VER) +#pragma clang diagnostic pop +#endif // Index and underlying type. Indexes are guaranteed to start at 1. The index value of 0 // is reserved as the invalid index. diff --git a/src/autowiring/test/AnySharedPointerTest.cpp b/src/autowiring/test/AnySharedPointerTest.cpp index 718c4d6f0..ac0ffb840 100644 --- a/src/autowiring/test/AnySharedPointerTest.cpp +++ b/src/autowiring/test/AnySharedPointerTest.cpp @@ -175,6 +175,8 @@ TEST_F(AnySharedPointerTest, CanHoldCoreObject) { ASSERT_EQ(co, x) << "Held CoreObject was not equivalent to constructed instance"; } +template<> const autowiring::fast_pointer_cast_initializer autowiring::fast_pointer_cast_initializer::sc_init; + TEST_F(AnySharedPointerTest, CanFastCastToSelf) { (void)autowiring::fast_pointer_cast_initializer::sc_init; diff --git a/src/autowiring/test/ContextMemberTest.cpp b/src/autowiring/test/ContextMemberTest.cpp index c24eabf2d..df85586d1 100644 --- a/src/autowiring/test/ContextMemberTest.cpp +++ b/src/autowiring/test/ContextMemberTest.cpp @@ -176,6 +176,9 @@ namespace { } TEST_F(ContextMemberTest, PathologicalResetCase) { + if (std::thread::hardware_concurrency() == 1) + return; // Don't bother running on a single-core machine + Autowired* pv; volatile std::atomic nBarr{ 0 }; volatile bool proceed = true; diff --git a/src/autowiring/test/CoreContextTest.cpp b/src/autowiring/test/CoreContextTest.cpp index 8053a122f..d4e0567e6 100644 --- a/src/autowiring/test/CoreContextTest.cpp +++ b/src/autowiring/test/CoreContextTest.cpp @@ -640,7 +640,7 @@ TEST_F(CoreContextTest, AwaitTimed) { namespace { class HoldsMutexAndCount { public: - volatile int hitCount = 0; + std::atomic hitCount{ 0 }; int initCount = 0; int instanceCount = 0; std::mutex lk; @@ -650,13 +650,14 @@ namespace { public: DelaysWithNwa(void) { hmac->hitCount++; - std::lock_guard{ hmac->lk }; + std::lock_guard lk{ hmac->lk }; hmac->initCount++; hmac->instanceCount++; } virtual ~DelaysWithNwa(void) { + std::lock_guard lk{ hmac->lk }; hmac->instanceCount--; } @@ -682,13 +683,15 @@ TEST_F(CoreContextTest, SimultaneousMultiInject) { std::thread b([ctxt] { ctxt->Inject(); }); // Poor man's barrier - while (hmac->hitCount != 2) + const auto limit = std::chrono::steady_clock::now() + std::chrono::seconds(5); + while (hmac->hitCount != 2 && std::chrono::steady_clock::now() < limit) std::this_thread::yield(); lk.unlock(); a.join(); b.join(); + lk.lock(); // Two initializations should have taken place due to the barrier ASSERT_EQ(2, hmac->initCount); diff --git a/src/autowiring/test/CoreJobTest.cpp b/src/autowiring/test/CoreJobTest.cpp index cef9a3424..e98faf71a 100644 --- a/src/autowiring/test/CoreJobTest.cpp +++ b/src/autowiring/test/CoreJobTest.cpp @@ -200,7 +200,8 @@ TEST_F(CoreJobTest, PendFromMultipleThreads) { } ctxt->Initiate(); for (size_t i = 0; i < threads.size(); i++) { - threads[i].join(); + if (threads[i].joinable()) + threads[i].join(); } ctxt->SignalShutdown(true); ASSERT_EQ(times*threads.size(), counter); diff --git a/src/autowiring/test/CoreThreadTest.cpp b/src/autowiring/test/CoreThreadTest.cpp index 9f2e24735..4b5c84f55 100644 --- a/src/autowiring/test/CoreThreadTest.cpp +++ b/src/autowiring/test/CoreThreadTest.cpp @@ -484,7 +484,8 @@ class CoreThreadExtraction: }; TEST_F(CoreThreadTest, SpuriousWakeupTest) { - AutoCurrentContext()->Initiate(); + AutoCurrentContext ctxt; + ctxt->Initiate(); AutoRequired extraction; std::mutex lock; @@ -518,6 +519,8 @@ TEST_F(CoreThreadTest, SpuriousWakeupTest) { ASSERT_TRUE(cv.wait_for(lk, std::chrono::seconds(5), [&] { return ready; })); ASSERT_EQ(2UL, countOnWake) << "Dispatch queue changed size under a spurious wakeup condition"; + + ctxt->SignalShutdown(true); } class BlocksInOnStop: @@ -648,7 +651,9 @@ TEST_F(CoreThreadTest, LambdaHoldAfterTermination) { } TEST_F(CoreThreadTest, CanElevateAnyPriority) { + AutoCurrentContext ctxt; AutoRequired ct; + ctxt->Initiate(); for (int i = (int)ThreadPriority::Default; i < (int)ThreadPriority::Multimedia; i++) { BasicThread::ElevatePriority ep{ *ct, (ThreadPriority)i }; diff --git a/src/autowiring/test/CurrentContextPusherTest.cpp b/src/autowiring/test/CurrentContextPusherTest.cpp index 5d0f9943c..b012baebd 100644 --- a/src/autowiring/test/CurrentContextPusherTest.cpp +++ b/src/autowiring/test/CurrentContextPusherTest.cpp @@ -1,5 +1,6 @@ // Copyright (C) 2012-2017 Leap Motion, Inc. All rights reserved. #include "stdafx.h" +#include "autotesting/AutowiringEnclosure.h" #include #include #include @@ -35,6 +36,6 @@ TEST_F(CurrentContextPusherTest, NoUnexpectedGlobalHold) { } ); rs.join(); - ASSERT_TRUE(ctxt.unique()) << "The current context pointer was not correctly cleaned up on thread exit"; - ASSERT_EQ(initUses, global.use_count()) << "A global reference was unexpectedly leaked by the pusher"; + ASSERT_TRUE(autowiring::autotesting::WaitForUseCount(ctxt, 1L, std::chrono::seconds(5))) << "The current context pointer was not correctly cleaned up on thread exit"; + ASSERT_TRUE(autowiring::autotesting::WaitForUseCount(global, initUses, std::chrono::seconds(5))) << "A global reference was unexpectedly leaked by the pusher"; } diff --git a/src/autowiring/test/LockReducedCollectionTest.cpp b/src/autowiring/test/LockReducedCollectionTest.cpp index 0bb24adbb..cd34d0535 100644 --- a/src/autowiring/test/LockReducedCollectionTest.cpp +++ b/src/autowiring/test/LockReducedCollectionTest.cpp @@ -58,7 +58,8 @@ TEST_F(LockReducedCollectionTest, ConcurrentWritersCheck) { // Wait on all threads: for(size_t i = 0; i < threadCount; i++) - allThreads[i].join(); + if (allThreads[i].joinable()) + allThreads[i].join(); // Trivial size validation first: auto image = collection.GetImage(); diff --git a/src/autowiring/test/PostConstructTest.cpp b/src/autowiring/test/PostConstructTest.cpp index 0242f0474..fecbbe2bd 100644 --- a/src/autowiring/test/PostConstructTest.cpp +++ b/src/autowiring/test/PostConstructTest.cpp @@ -208,6 +208,9 @@ TEST_F(PostConstructTest, TestForwardDeclare) { */ TEST_F(PostConstructTest, NotificationTeardownRace) { + if (std::thread::hardware_concurrency() == 1) + return; // Don't bother running on a single-core machine + std::shared_ptr pContext; auto quit = false; diff --git a/src/autowiring/test/SpinLockTest.cpp b/src/autowiring/test/SpinLockTest.cpp index 19bf54e66..666b6fd60 100644 --- a/src/autowiring/test/SpinLockTest.cpp +++ b/src/autowiring/test/SpinLockTest.cpp @@ -44,7 +44,8 @@ TEST(SpinLockTest, PathologicalExclusion) { ); for (auto& thread : threads) - thread.join(); + if (thread.joinable()) + thread.join(); // Verify the lock did everything it was supposed to do: ASSERT_TRUE(*success) << "Lock failed to exclude multi-access under pathological cases";