Skip to content

Commit

Permalink
Eliminate race condition in ContextMapTest.VerifyWithThreads
Browse files Browse the repository at this point in the history
This race condition occurs because we weren't waiting for the context to actually finish terminating before attempting to assess the state of the context map.  This meant that the secondary thread was sometimes still running even when we expected it to have already terminated.
  • Loading branch information
codemercenary committed Jul 10, 2015
1 parent 0e01483 commit 45428bf
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 73 deletions.
20 changes: 1 addition & 19 deletions autowiring/CoreContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class CoreContext:
const t_childList::iterator m_backReference;

// State block for this context:
std::unique_ptr<CoreContextStateBlock> m_stateBlock;
std::shared_ptr<CoreContextStateBlock> m_stateBlock;

enum class State {
// Not yet started
Expand Down Expand Up @@ -246,10 +246,6 @@ class CoreContext:
// Actual core threads:
std::list<CoreRunnable*> m_threads;

// Clever use of shared pointer to expose the number of outstanding CoreRunnable instances.
// Destructor does nothing; this is by design.
std::weak_ptr<CoreObject> m_outstanding;

// The thread pool used by this context. By default, a context inherits the thread pool of
// its parent, and the global context gets the system thread pool.
std::shared_ptr<autowiring::ThreadPool> m_threadPool;
Expand Down Expand Up @@ -387,20 +383,6 @@ class CoreContext:
/// </summary>
void AddPacketSubscriber(const AutoFilterDescriptor& rhs);

/// \internal
/// <summary>
/// Increments the total number of contexts still outstanding
/// </summary>
/// <remarks>
/// This is an indirect incrementation routine. The count will be incremented for as
/// long as the returned shared_ptr is not destroyed. Once it's destroyed, the count
/// is decremented. The caller is encouraged not to copy the return value, as doing
/// so can give inflated values for the current number of outstanding threads.
///
/// The caller is responsible for exterior synchronization
/// </remarks>
std::shared_ptr<CoreObject> IncrementOutstandingThreadCount(void);

/// \internal
/// <summary>
/// Internal type introduction routine
Expand Down
31 changes: 29 additions & 2 deletions autowiring/CoreContextStateBlock.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
// Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved.
#pragma once
#include <memory>
#include MUTEX_HEADER

struct CoreContextStateBlock
class CoreObject;
class CoreContext;

struct CoreContextStateBlock:
std::enable_shared_from_this<CoreContextStateBlock>
{
public:
CoreContextStateBlock(void);
CoreContextStateBlock(std::shared_ptr<CoreContextStateBlock> parent);
~CoreContextStateBlock(void);

// Reference to the parent state block, where appropriate
const std::shared_ptr<CoreContextStateBlock> parent;

// General purpose lock for this class
std::mutex m_lock;

// Condition, signalled when context state has been changed
std::condition_variable m_stateChanged;

// Clever use of shared pointer to expose the number of outstanding CoreRunnable instances.
// Destructor does nothing; this is by design.
std::weak_ptr<CoreObject> m_outstanding;

/// \internal
/// <summary>
/// Increments the total number of contexts still outstanding
/// </summary>
/// <param name="owner">The context that will be held for as long as the outstanding count is valid</param>
/// <remarks>
/// This is an indirect incrementation routine. The count will be incremented for as
/// long as the returned shared_ptr is not destroyed. Once it's destroyed, the count
/// is decremented. The caller is encouraged not to copy the return value, as doing
/// so can give inflated values for the current number of outstanding threads.
///
/// The caller is responsible for exterior synchronization
/// </remarks>
std::shared_ptr<CoreObject> IncrementOutstandingThreadCount(std::shared_ptr<CoreContext> owner);
};

53 changes: 7 additions & 46 deletions src/autowiring/CoreContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static thread_specific_ptr<std::shared_ptr<CoreContext>> autoCurrentContext;
CoreContext::CoreContext(const std::shared_ptr<CoreContext>& pParent, t_childList::iterator backReference) :
m_pParent(pParent),
m_backReference(backReference),
m_stateBlock(new CoreContextStateBlock),
m_stateBlock(std::make_shared<CoreContextStateBlock>(pParent ? pParent->m_stateBlock : nullptr)),
m_junctionBoxManager(new JunctionBoxManager),
m_threadPool(std::make_shared<NullPool>())
{}
Expand Down Expand Up @@ -201,45 +201,6 @@ const std::type_info& CoreContext::GetAutoTypeId(const AnySharedPointer& ptr) co
return *pObjTraits->type;
}

std::shared_ptr<CoreObject> CoreContext::IncrementOutstandingThreadCount(void) {
// Optimistic check
std::shared_ptr<CoreObject> retVal = m_outstanding.lock();
if(retVal)
return retVal;

// Double-check
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
retVal = m_outstanding.lock();
if (retVal)
return retVal;

// Increment the parent's outstanding count as well. This will be held by the lambda, and will cause the enclosing
// context's outstanding thread count to be incremented by one as long as we have any threads still running in our
// context. This property is relied upon in order to get the Wait function to operate properly.
std::shared_ptr<CoreObject> parentCount;
if(m_pParent)
parentCount = m_pParent->IncrementOutstandingThreadCount();

auto self = shared_from_this();
retVal.reset(
(CoreObject*)1,
[this, self, parentCount](CoreObject*) {
// Object being destroyed, notify all recipients
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

// Unfortunately, this destructor callback is made before weak pointers are
// invalidated, which requires that we manually reset the outstanding count
m_outstanding.reset();

// Wake everyone up
m_stateBlock->m_stateChanged.notify_all();
}
);

m_outstanding = retVal;
return retVal;
}

void CoreContext::AddInternal(const CoreObjectDescriptor& traits) {
{
std::unique_lock<std::mutex> lk(m_stateBlock->m_lock);
Expand Down Expand Up @@ -490,8 +451,8 @@ void CoreContext::Initiate(void) {
std::swap(m_startToken, startToken);
}

{
auto outstanding = IncrementOutstandingThreadCount();
if (beginning != m_threads.end()) {
auto outstanding = m_stateBlock->IncrementOutstandingThreadCount(shared_from_this());
for (auto q = beginning; q != m_threads.end(); ++q)
(*q)->Start(outstanding);
}
Expand Down Expand Up @@ -594,12 +555,12 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) {

void CoreContext::Wait(void) {
std::unique_lock<std::mutex> lk(m_stateBlock->m_lock);
m_stateBlock->m_stateChanged.wait(lk, [this] {return IsShutdown() && this->m_outstanding.expired(); });
m_stateBlock->m_stateChanged.wait(lk, [this] {return IsShutdown() && m_stateBlock->m_outstanding.expired(); });
}

bool CoreContext::Wait(const std::chrono::nanoseconds duration) {
std::unique_lock<std::mutex> lk(m_stateBlock->m_lock);
return m_stateBlock->m_stateChanged.wait_for(lk, duration, [this] {return IsShutdown() && this->m_outstanding.expired(); });
return m_stateBlock->m_stateChanged.wait_for(lk, duration, [this] {return IsShutdown() && m_stateBlock->m_outstanding.expired(); });
}

bool CoreContext::DelayUntilInitiated(void) {
Expand Down Expand Up @@ -641,7 +602,7 @@ void CoreContext::AddCoreRunnable(const std::shared_ptr<CoreRunnable>& ptr) {

// Run this thread without the lock
if(shouldRun)
ptr->Start(IncrementOutstandingThreadCount());
ptr->Start(m_stateBlock->IncrementOutstandingThreadCount(shared_from_this()));


// Check if the stop signal was sent between the time we started running until now. If so, then
Expand Down Expand Up @@ -1199,7 +1160,7 @@ void CoreContext::TryTransitionChildrenState(void) {
child->m_stateBlock->m_stateChanged.notify_all();

childLk.unlock();
auto outstanding = child->IncrementOutstandingThreadCount();
auto outstanding = child->m_stateBlock->IncrementOutstandingThreadCount(child);

while (q != child->m_threads.end()) {
(*q)->Start(outstanding);
Expand Down
50 changes: 48 additions & 2 deletions src/autowiring/CoreContextStateBlock.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,53 @@
// Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved.
#include "stdafx.h"
#include "CoreContextStateBlock.h"
#include "CoreContext.h"

CoreContextStateBlock::CoreContextStateBlock(void){}
CoreContextStateBlock::CoreContextStateBlock(std::shared_ptr<CoreContextStateBlock> parent) :
parent(parent)
{}

CoreContextStateBlock::~CoreContextStateBlock(void){}
CoreContextStateBlock::~CoreContextStateBlock(void) {}

std::shared_ptr<CoreObject> CoreContextStateBlock::IncrementOutstandingThreadCount(std::shared_ptr<CoreContext> owner) {
// Optimistic check
std::shared_ptr<CoreObject> retVal = m_outstanding.lock();
if (retVal)
return retVal;

// Double-check
std::lock_guard<std::mutex> lk(m_lock);
retVal = m_outstanding.lock();
if (retVal)
return retVal;

// Increment the parent's outstanding count as well. This will be held by the lambda, and will cause the enclosing
// context's outstanding thread count to be incremented by one as long as we have any threads still running in our
// context. This property is relied upon in order to get the Wait function to operate properly.
std::shared_ptr<CoreObject> parentCount;
if (parent)
parentCount = parent->IncrementOutstandingThreadCount(owner->GetParentContext());

auto self = shared_from_this();
retVal.reset(
(CoreObject*) 1,
[this, self, parentCount, owner](CoreObject*) mutable {
// Reset the owner before performing any other type of notification, we don't want to hold a reference
// to the owner and prevent its destruction before signalling or resetting anything
owner.reset();

// Object being destroyed, notify all recipients
std::lock_guard<std::mutex> lk(m_lock);

// Unfortunately, this destructor callback is made before weak pointers are
// invalidated, which requires that we manually reset the outstanding count
m_outstanding.reset();

// Wake everyone up
m_stateChanged.notify_all();
}
);

m_outstanding = retVal;
return retVal;
}
11 changes: 7 additions & 4 deletions src/autowiring/test/ContextMapTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ TEST_F(ContextMapTest, VerifyWithThreads) {
ASSERT_TRUE(!!context.get()) << "Map evicted a context before expected";

// Relock the weak context, verify that we get back the same pointer:
auto relocked = weakContext.lock();
ASSERT_EQ(relocked, context) << "Mapped context pointer was not identical to a previously stored context pointer";
ASSERT_EQ(weakContext.lock(), context) << "Mapped context pointer was not identical to a previously stored context pointer";

// Terminate whole context
context->SignalTerminate();
// Terminate whole context, wait for it to respond
context->SignalShutdown();
context->Wait();
ASSERT_EQ(1UL, context.use_count()) << "Context reference should have been unique after thread expiration";
}

// Release our threaded entity:
ASSERT_EQ(1UL, threaded.use_count()) << "Thread was holding a self-reference even after context termination has completed";
threaded.reset();
ASSERT_TRUE(weakContext.expired()) << "Context still existed even after the last reference to it should have been gone";

{
// Verify that the context is gone now that everything in it has stopped running
Expand Down

0 comments on commit 45428bf

Please sign in to comment.