Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellaneous minor fixes #1033

Merged
merged 12 commits into from
Nov 7, 2017
20 changes: 10 additions & 10 deletions src/autowiring/AutoPacketFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ bool AutoPacketFactory::IsRunning(void) const {
}

std::shared_ptr<AutoPacket> AutoPacketFactory::CurrentPacket(void) {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
return m_curPacket.lock();
}

std::shared_ptr<AutoPacket> AutoPacketFactory::NewPacket(void) {
std::shared_ptr<AutoPacketInternal> retVal;
bool isFirstPacket;
{
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);

if (ShouldStop())
throw autowiring_error("Attempted to create a packet on an AutoPacketFactory that was already terminated");
Expand Down Expand Up @@ -84,14 +84,14 @@ std::shared_ptr<void> AutoPacketFactory::GetInternalOutstanding(void) {
}

std::vector<AutoFilterDescriptor> AutoPacketFactory::GetAutoFilters(void) const {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
std::vector<AutoFilterDescriptor> retVal;
retVal.assign(m_autoFilters.begin(), m_autoFilters.end());
return retVal;
}

SatCounter* AutoPacketFactory::CreateSatCounterList(void) const {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);

// Trivial return check
if (m_autoFilters.empty())
Expand All @@ -113,7 +113,7 @@ SatCounter* AutoPacketFactory::CreateSatCounterList(void) const {

bool AutoPacketFactory::OnStart(void) {
// Initialize first packet
std::lock_guard<std::mutex>{m_lock},
std::lock_guard<std::mutex>{m_apfLock},
m_nextPacket = ConstructPacket();
return true;
}
Expand All @@ -124,7 +124,7 @@ void AutoPacketFactory::OnStop(bool graceful) {
std::shared_ptr<AutoPacketInternal> nextPacket;

// Lock destruction precedes local variables
std::lock_guard<std::mutex>{m_lock},
std::lock_guard<std::mutex>{m_apfLock},
autoFilters.swap(m_autoFilters),
nextPacket.swap(m_nextPacket);
}
Expand Down Expand Up @@ -156,14 +156,14 @@ void AutoPacketFactory::Clear(void) {
}

const AutoFilterDescriptor& AutoPacketFactory::AddSubscriber(const AutoFilterDescriptor& rhs) {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
m_autoFilters.insert(rhs);
return rhs;
}

void AutoPacketFactory::RemoveSubscriber(const AutoFilterDescriptor& autoFilter) {
// Trivial removal from the autofilter set:
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
m_autoFilters.erase(autoFilter);
}

Expand All @@ -186,7 +186,7 @@ size_t AutoPacketFactory::GetOutstandingPacketCount(void) const {
}

void AutoPacketFactory::RecordPacketDuration(std::chrono::nanoseconds duration) {
std::unique_lock<std::mutex> lk(m_lock);
std::unique_lock<std::mutex> lk(m_apfLock);
m_packetDurationSum += duration.count();
m_packetDurationSqSum += duration.count() * duration.count();
}
Expand All @@ -202,7 +202,7 @@ double AutoPacketFactory::GetPacketLifetimeStandardDeviation(void) {
}

void AutoPacketFactory::ResetPacketStatistics(void) {
std::unique_lock<std::mutex> lk(m_lock);
std::unique_lock<std::mutex> lk(m_apfLock);
m_packetCount = 0;
m_packetDurationSum = 0.0;
m_packetDurationSqSum = 0.0;
Expand Down
4 changes: 2 additions & 2 deletions src/autowiring/AutoPacketFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class AutoPacketFactory:

private:
// Lock for this type
mutable std::mutex m_lock;
mutable std::mutex m_apfLock;

// Internal outstanding reference for issued packet:
std::weak_ptr<void> m_outstandingInternal;
Expand Down Expand Up @@ -60,7 +60,7 @@ class AutoPacketFactory:
/// </summary>
template<class T>
void AppendAutoFiltersTo(T& container) const {
std::lock_guard<std::mutex> lk(m_lock);
std::lock_guard<std::mutex> lk(m_apfLock);
container.insert(container.end(), m_autoFilters.begin(), m_autoFilters.end());
}

Expand Down
9 changes: 7 additions & 2 deletions src/autowiring/CoreRunnable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ bool CoreRunnable::Start(std::shared_ptr<CoreObject> outstanding) {
return true;

m_wasStarted = true;
m_outstanding = outstanding;
m_outstanding = std::move(outstanding);
outstanding.reset();
if(!OnStart()) {
m_shouldStop = true;

m_outstanding.reset();

// Immediately invoke a graceless stop in response
Expand All @@ -38,20 +40,23 @@ bool CoreRunnable::Start(std::shared_ptr<CoreObject> outstanding) {
}

void CoreRunnable::Stop(bool graceful) {
std::unique_lock<std::mutex> lk(m_lock);
if (!m_shouldStop) {
// Stop flag should be pulled high
m_shouldStop = true;

// Do not call this method more than once:
lk.unlock();
OnStop(graceful);
lk.lock();
}

if (m_outstanding) {
// Ensure we do not invoke the outstanding count dtor while holding a lock
std::shared_ptr<CoreObject> outstanding;
std::lock_guard<std::mutex>{m_lock},
outstanding.swap(m_outstanding);
}
lk.unlock();

// Everything looks good now
m_cv.notify_all();
Expand Down
5 changes: 5 additions & 0 deletions src/autowiring/CoreThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ void CoreThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::shar
// Kill everything in the dispatch queue and also run it down
{
CurrentContextPusher pshr(ctxt);
// Only allow one thread at a time to clean up the dispatch queue
std::lock_guard<std::mutex> lk(m_stoppingLock);
Rundown();
}

Expand All @@ -27,6 +29,9 @@ void CoreThread::Run() {
}

void CoreThread::OnStop(bool graceful) {
// Only allow one thread at a time to clean up the dispatch queue
std::lock_guard<std::mutex> lk(m_stoppingLock);

// Base class handling first:
BasicThread::OnStop(graceful);

Expand Down
5 changes: 5 additions & 0 deletions src/autowiring/CoreThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class CoreThread:
virtual ~CoreThread(void);

protected:
/// <summary>
/// While stopping, make sure we do it exclusively
/// </summary>
std::mutex m_stoppingLock;

/// <summary>
/// Overridden here so we can rundown the dispatch queue
/// </summary>
Expand Down
95 changes: 67 additions & 28 deletions src/autowiring/Parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,23 @@ class parallel {
operator+=(_Fx&& fx) {
using RetType = typename std::remove_cv<decltype(fx())>::type;

auto block = m_block;
if (!block)
return;

// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_block->m_lock, ++m_block->m_outstandingCount;
(std::lock_guard<std::mutex>)block->m_lock, ++block->m_outstandingCount;

block->dq += [this, fx] {
auto block = m_block;
if (!block)
return;

m_block->dq += [this, fx] {
auto result = std::make_shared<RetType>(fx());

std::lock_guard<std::mutex>{m_block->m_lock},
m_block->m_queue[auto_id_t<RetType>{}].emplace_back(std::move(result));
m_block->m_queueUpdated.notify_all();
std::lock_guard<std::mutex>{block->m_lock},
block->m_queue[auto_id_t<RetType>{}].emplace_back(std::move(result));
block->m_queueUpdated.notify_all();
};
}

Expand All @@ -168,48 +176,68 @@ class parallel {
std::is_same<void, typename std::result_of<_Fx()>::type>::value
>::type
operator+=(_Fx&& fx) {
auto block = m_block;
if (!block)
return;

// Increment remain jobs. This is decremented by calls to "Pop"
(std::lock_guard<std::mutex>)m_block->m_lock, ++m_block->m_outstandingCount;
(std::lock_guard<std::mutex>)block->m_lock, ++block->m_outstandingCount;

block->dq += [this, fx] {
auto block = m_block;
if (!block)
return;

m_block->dq += [this, fx] {
fx();

std::lock_guard<std::mutex>{m_block->m_lock},
m_block->m_nVoidEntries++;
m_block->m_queueUpdated.notify_all();
std::lock_guard<std::mutex>{block->m_lock},
block->m_nVoidEntries++;
block->m_queueUpdated.notify_all();
};
}

// Discard the most recent result. Blocks until the next result arives.
template<typename T>
void Pop(void) {
std::unique_lock<std::mutex> lk(m_block->m_lock);
if (!m_block->m_outstandingCount)
auto block = m_block;
if (!block)
return;

std::unique_lock<std::mutex> lk(block->m_lock);
if (!block->m_outstandingCount)
throw std::out_of_range("No outstanding jobs");

if (std::is_same<void, T>::value) {
m_block->m_queueUpdated.wait(lk, [this] { return m_block->m_nVoidEntries != 0; });
m_block->m_nVoidEntries--;
block->m_queueUpdated.wait(lk, [this] { auto block = m_block; return block ? block->m_nVoidEntries != 0 : true; });
block->m_nVoidEntries--;
} else {
auto& qu = m_block->m_queue[auto_id_t<T>{}];
m_block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
auto& qu = block->m_queue[auto_id_t<T>{}];
block->m_queueUpdated.wait(lk, [&qu] { return !qu.empty(); });
qu.pop_front();
}

--m_block->m_outstandingCount;
--block->m_outstandingCount;
}

// Get the most result from the most recent job. Blocks until a result arrives
// if there isn't one already available
template<typename T>
T Top(void) {
std::unique_lock<std::mutex> lk(m_block->m_lock);
auto block = m_block;
if (!block)
return T{};

std::unique_lock<std::mutex> lk(block->m_lock);

if (m_block->m_queue[auto_id_t<T>{}].empty())
m_block->m_queueUpdated.wait(lk, [this]{
return !m_block->m_queue[auto_id_t<T>{}].empty();
if (block->m_queue[auto_id_t<T>{}].empty())
block->m_queueUpdated.wait(lk, [this]{
auto block = m_block;
if (!block)
return true;

return !block->m_queue[auto_id_t<T>{}].empty();
});
return *static_cast<T*>(m_block->m_queue[auto_id_t<T>{}].front().ptr());
return *static_cast<T*>(block->m_queue[auto_id_t<T>{}].front().ptr());
}

// Get a collection containing all entries of the specified type
Expand All @@ -225,19 +253,30 @@ class parallel {
/// If a stop call has been made, this method will also block until all owned threads have quit
/// </remarks>
void barrier(void) {
std::unique_lock<std::mutex> lk(m_block->m_lock);
m_block->m_queueUpdated.wait(lk, [this] {
size_t totalReady = m_block->m_nVoidEntries;
for (auto& entry : m_block->m_queue)
auto block = m_block;
if (!block)
return;

std::unique_lock<std::mutex> lk(block->m_lock);
block->m_queueUpdated.wait(lk, [this] {
auto block = m_block;
if (!block)
return true;
size_t totalReady = block->m_nVoidEntries;
for (auto& entry : block->m_queue)
totalReady += entry.second.size();
return m_block->m_outstandingCount == totalReady;
return block->m_outstandingCount == totalReady;
});
}

// Get an iterator to the begining of out queue of job results
template<typename T>
parallel_iterator<T> begin(void) {
return{ *this, m_block->m_outstandingCount };
auto block = m_block;
if (!block)
return{ *this, 0 };

return{ *this, block->m_outstandingCount };
}

// Iterator representing no jobs results remaining
Expand Down
7 changes: 7 additions & 0 deletions src/autowiring/auto_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ namespace autowiring {
)
{}

#if !defined(_MSC_VER)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wc++1z-compat-mangling"
#endif
auto_id_block(
int index,
const std::type_info* ti,
Expand All @@ -75,6 +79,9 @@ namespace autowiring {
pToObj(pToObj),
pFromObj(pFromObj)
{}
#if !defined(_MSC_VER)
#pragma clang diagnostic pop
#endif

// Index and underlying type. Indexes are guaranteed to start at 1. The index value of 0
// is reserved as the invalid index.
Expand Down
2 changes: 2 additions & 0 deletions src/autowiring/test/AnySharedPointerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ TEST_F(AnySharedPointerTest, CanHoldCoreObject) {
ASSERT_EQ(co, x) << "Held CoreObject was not equivalent to constructed instance";
}

template<> const autowiring::fast_pointer_cast_initializer<CoreObject, CoreObject> autowiring::fast_pointer_cast_initializer<CoreObject, CoreObject>::sc_init;

TEST_F(AnySharedPointerTest, CanFastCastToSelf) {
(void)autowiring::fast_pointer_cast_initializer<CoreObject, CoreObject>::sc_init;

Expand Down
3 changes: 3 additions & 0 deletions src/autowiring/test/ContextMemberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ namespace {
}

TEST_F(ContextMemberTest, PathologicalResetCase) {
if (std::thread::hardware_concurrency() == 1)
return; // Don't bother running on a single-core machine

Autowired<TypeThatIsNotInjected>* pv;
volatile std::atomic<size_t> nBarr{ 0 };
volatile bool proceed = true;
Expand Down
Loading