Skip to content

Commit

Permalink
Customize concurrency for ESSource DataProxies
Browse files Browse the repository at this point in the history
Refactored ESSource specific DataProxies to allow customizing how
concurrency should be handled. Provided implementations to support
either concurrent or non-concurrent running of DataProxies for the
same ESSource.
  • Loading branch information
Dr15Jones committed Dec 20, 2021
1 parent 4cd114c commit ab6d25d
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 57 deletions.
61 changes: 61 additions & 0 deletions FWCore/Framework/interface/ESSourceConcurrentDataProxyTemplate.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#ifndef FWCore_Framework_ESSourceConcurrentDataProxyTemplate_h
#define FWCore_Framework_ESSourceConcurrentDataProxyTemplate_h
// -*- C++ -*-
//
// Package: FWCore/Framework
// Class : ESSourceConcurrentDataProxyTemplate
//
/**\class ESSourceConcurrentDataProxyTemplate ESSourceConcurrentDataProxyTemplate.h "FWCore/Framework/interface/ESSourceConcurrentDataProxyTemplate.h"
Description: An ESSource specific DataProxy which is type safe and can run concurrently with other DataProxies from the same ESSource.
Usage:
Inherit from this class and override
`void prefetch(edm::eventsetup::DataKey const& iKey)`
and
`DataT const* fetch() const`
prefetch is guaranteed to be called before fetch where fetch should return the value obtained during the call to prefetch.
The inheriting class must maintain the lifetime of the object returned from fetch until invalidateCache() is called.
*/
//
// Original Author: Chris Jones
// Created: 17/12/2021
//

// system include files

// user include files
#include "FWCore/Framework/interface/ESSourceDataProxyConcurrentBase.h"

// forward declarations

namespace edm::eventsetup {
template <typename DataT>
class ESSourceConcurrentDataProxyTemplate : public ESSourceDataProxyConcurrentBase {
public:
ESSourceConcurrentDataProxyTemplate() = default;

ESSourceConcurrentDataProxyTemplate(const ESSourceConcurrentDataProxyTemplate&) = delete;
const ESSourceConcurrentDataProxyTemplate& operator=(const ESSourceConcurrentDataProxyTemplate&) = delete;

// ---------- const member functions ---------------------

// ---------- static member functions --------------------

// ---------- member functions ---------------------------
protected:
/** Inheriting classes must also override
void prefetch(edm::eventsetup::DataKey const& iKey, EventSetupRecordDetails) override;
*/

/** returns the data obtained in the call to prefetch */
virtual DataT const* fetch() const = 0;

private:
void const* getAfterPrefetchImpl() const final { return fetch(); }
};
} // namespace edm::eventsetup

#endif
57 changes: 38 additions & 19 deletions FWCore/Framework/interface/ESSourceDataProxyBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
//
/**\class ESSourceDataProxyBase ESSourceDataProxyBase.h "FWCore/Framework/interface/ESSourceDataProxyBase.h"
Description: Base class for DataProxies for ESSources that require synchronization
Description: Base class for DataProxies for ESSources that can be specialized based on concurrency needs
Usage:
The ESSourceDataProxyBase uses a SerialTaskQueue to serialize all DataProxies for the ESSource and a
std::mutex to protect from concurrent calls to a DataProxy and the ESSource itself. Such concurrent calls
can happen if concurrent LuminosityBlocks are being used.
The ESSourceDataProxyBase provides the bases for DataProxies needed for ESSources. It allows customization of synchronization needs via the use of template parameters.
NOTE: if inheriting classes override `void invalidateCache()` they must be sure to call this classes
implementation as part of the call.
Expand All @@ -24,25 +22,19 @@
//

// system include files
#include <mutex>
#include <atomic>

// user include files
#include "FWCore/Framework/interface/DataProxy.h"
#include "FWCore/Framework/interface/EventSetupRecordDetails.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"

// forward declarations

namespace edm::eventsetup {
class ESSourceDataProxyBase : public DataProxy {
public:
ESSourceDataProxyBase(edm::SerialTaskQueue* iQueue, std::mutex* iMutex)
: m_queue(iQueue), m_mutex(iMutex), m_prefetching{false} {}

edm::SerialTaskQueue* queue() const { return m_queue; }
std::mutex* mutex() const { return m_mutex; }
ESSourceDataProxyBase() : m_prefetching{false} {}

protected:
void invalidateCache() override {
Expand All @@ -53,19 +45,46 @@ namespace edm::eventsetup {

virtual void prefetch(edm::eventsetup::DataKey const& iKey, EventSetupRecordDetails) = 0;

//Should call from prefetchAsyncImpl
template <typename ASYNC, typename GUARD>
void prefetchAsyncImplTemplate(ASYNC iAsync,
GUARD iGuardFactory,
edm::WaitingTaskHolder iTask,
edm::eventsetup::EventSetupRecordImpl const& iRecord,
edm::eventsetup::DataKey const& iKey,
edm::ESParentContext const& iContext) {
auto group = iTask.group();
if (needToPrefetch(std::move(iTask))) {
iAsync(*group, [this, iGuardFactory, &iRecord, iKey, iContext]() {
try {
guardPrefetch(iGuardFactory, iRecord, iKey, iContext);
m_waitingList.doneWaiting(std::exception_ptr{});
} catch (...) {
m_waitingList.doneWaiting(std::current_exception());
}
});
}
}

private:
void prefetchAsyncImpl(edm::WaitingTaskHolder iTask,
edm::eventsetup::EventSetupRecordImpl const&,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*,
edm::ServiceToken const&,
edm::ESParentContext const&) final;
template <typename GUARD>
void guardPrefetch(GUARD iGuardFactory,
edm::eventsetup::EventSetupRecordImpl const& iES,
edm::eventsetup::DataKey const& iKey,
edm::ESParentContext const& iContext) {
[[maybe_unused]] auto guard = iGuardFactory();
doPrefetchAndSignals(iES, iKey, iContext);
}

bool needToPrefetch(edm::WaitingTaskHolder iTask);

void doPrefetchAndSignals(edm::eventsetup::EventSetupRecordImpl const&,
edm::eventsetup::DataKey const& iKey,
edm::ESParentContext const&);

// ---------- member data --------------------------------

edm::WaitingTaskList m_waitingList;
edm::SerialTaskQueue* m_queue;
std::mutex* m_mutex;
std::atomic<bool> m_prefetching;
};
} // namespace edm::eventsetup
Expand Down
47 changes: 47 additions & 0 deletions FWCore/Framework/interface/ESSourceDataProxyConcurrentBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef FWCore_Framework_ESSourceDataProxyConcurrentBase_h
#define FWCore_Framework_ESSourceDataProxyConcurrentBase_h
// -*- C++ -*-
//
// Package: FWCore/Framework
// Class : ESSourceDataProxyConcurrentBase
//
/**\class ESSourceDataProxyConcurrentBase ESSourceDataProxyConcurrentBase.h "FWCore/Framework/interface/ESSourceDataProxyConcurrentBase.h"
Description: Base class for DataProxies for ESSources that require no synchronization
Usage:
The ESSourceDataProxyConcurrentBase allows DataProxies from the same ESSource to be called concurrently.
NOTE: if inheriting classes override `void invalidateCache()` they must be sure to call this classes
implementation as part of the call.
*/
//
// Original Author: Chris Jones
// Created: 14/05/2020
//

// system include files

// user include files
#include "FWCore/Framework/interface/ESSourceDataProxyBase.h"

// forward declarations

namespace edm::eventsetup {
class ESSourceDataProxyConcurrentBase : public ESSourceDataProxyBase {
public:
ESSourceDataProxyConcurrentBase() {}

private:
void prefetchAsyncImpl(edm::WaitingTaskHolder iTask,
edm::eventsetup::EventSetupRecordImpl const& iES,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*,
edm::ServiceToken const&,
edm::ESParentContext const&) final;

// ---------- member data --------------------------------
};
} // namespace edm::eventsetup
#endif
58 changes: 58 additions & 0 deletions FWCore/Framework/interface/ESSourceDataProxyNonConcurrentBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#ifndef FWCore_Framework_ESSourceDataProxyNonConcurrentBase_h
#define FWCore_Framework_ESSourceDataProxyNonConcurrentBase_h
// -*- C++ -*-
//
// Package: FWCore/Framework
// Class : ESSourceDataProxyNonConcurrentBase
//
/**\class ESSourceDataProxyNonConcurrentBase ESSourceDataProxyNonConcurrentBase.h "FWCore/Framework/interface/ESSourceDataProxyNonConcurrentBase.h"
Description: Base class for DataProxies for ESSources that require synchronization
Usage:
The ESSourceDataProxyNonConcurrentBase uses a SerialTaskQueue to serialize all DataProxies for the ESSource and a
std::mutex to protect from concurrent calls to a DataProxy and the ESSource itself. Such concurrent calls
can happen if concurrent LuminosityBlocks are being used.
NOTE: if inheriting classes override `void invalidateCache()` they must be sure to call this classes
implementation as part of the call.
*/
//
// Original Author: Chris Jones
// Created: 14/05/2020
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/ESSourceDataProxyBase.h"
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"

// forward declarations

namespace edm::eventsetup {
class ESSourceDataProxyNonConcurrentBase : public ESSourceDataProxyBase {
public:
ESSourceDataProxyNonConcurrentBase(edm::SerialTaskQueue* iQueue, std::mutex* iMutex)
: m_queue(iQueue), m_mutex(iMutex) {}

edm::SerialTaskQueue* queue() const { return m_queue; }
std::mutex* mutex() const { return m_mutex; }

private:
void prefetchAsyncImpl(edm::WaitingTaskHolder iTask,
edm::eventsetup::EventSetupRecordImpl const& iES,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*,
edm::ServiceToken const&,
edm::ESParentContext const&) final;

// ---------- member data --------------------------------

edm::SerialTaskQueue* m_queue;
std::mutex* m_mutex;
};
} // namespace edm::eventsetup
#endif
6 changes: 3 additions & 3 deletions FWCore/Framework/interface/ESSourceDataProxyTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
// system include files

// user include files
#include "FWCore/Framework/interface/ESSourceDataProxyBase.h"
#include "FWCore/Framework/interface/ESSourceDataProxyNonConcurrentBase.h"

// forward declarations

namespace edm::eventsetup {
template <typename DataT>
class ESSourceDataProxyTemplate : public ESSourceDataProxyBase {
class ESSourceDataProxyTemplate : public ESSourceDataProxyNonConcurrentBase {
public:
ESSourceDataProxyTemplate(edm::SerialTaskQueue* iQueue, std::mutex* iMutex)
: ESSourceDataProxyBase(iQueue, iMutex) {}
: ESSourceDataProxyNonConcurrentBase(iQueue, iMutex) {}

ESSourceDataProxyTemplate(const ESSourceDataProxyTemplate&) = delete;
const ESSourceDataProxyTemplate& operator=(const ESSourceDataProxyTemplate&) = delete;
Expand Down
48 changes: 18 additions & 30 deletions FWCore/Framework/src/ESSourceDataProxyBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,25 @@
// member functions
//

void edm::eventsetup::ESSourceDataProxyBase::prefetchAsyncImpl(edm::WaitingTaskHolder iTask,
edm::eventsetup::EventSetupRecordImpl const& iRecord,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*,
edm::ServiceToken const&,
edm::ESParentContext const& iParent) {
bool edm::eventsetup::ESSourceDataProxyBase::needToPrefetch(edm::WaitingTaskHolder iTask) {
m_waitingList.add(std::move(iTask));
bool expected = false;
auto doPrefetch = m_prefetching.compare_exchange_strong(expected, true);
m_waitingList.add(iTask);
if (doPrefetch) {
m_queue->push(*iTask.group(), [this, iKey, &iRecord, iParent]() {
try {
{
std::lock_guard<std::mutex> guard(*m_mutex);
edm::ESModuleCallingContext context(providerDescription(), ESModuleCallingContext::State::kRunning, iParent);
iRecord.activityRegistry()->preESModuleSignal_.emit(iRecord.key(), context);
struct EndGuard {
EndGuard(EventSetupRecordImpl const& iRecord, ESModuleCallingContext const& iContext)
: record_{iRecord}, context_{iContext} {}
~EndGuard() { record_.activityRegistry()->postESModuleSignal_.emit(record_.key(), context_); }
EventSetupRecordImpl const& record_;
ESModuleCallingContext const& context_;
} guardAR(iRecord, context);
prefetch(iKey, EventSetupRecordDetails(&iRecord));
}
m_waitingList.doneWaiting(std::exception_ptr{});
} catch (...) {
m_waitingList.doneWaiting(std::current_exception());
}
});
}
return m_prefetching.compare_exchange_strong(expected, true);
}

void edm::eventsetup::ESSourceDataProxyBase::doPrefetchAndSignals(edm::eventsetup::EventSetupRecordImpl const& iRecord,
edm::eventsetup::DataKey const& iKey,
edm::ESParentContext const& iParent) {
edm::ESModuleCallingContext context(providerDescription(), ESModuleCallingContext::State::kRunning, iParent);
iRecord.activityRegistry()->preESModuleSignal_.emit(iRecord.key(), context);
struct EndGuard {
EndGuard(EventSetupRecordImpl const& iRecord, ESModuleCallingContext const& iContext)
: record_{iRecord}, context_{iContext} {}
~EndGuard() { record_.activityRegistry()->postESModuleSignal_.emit(record_.key(), context_); }
EventSetupRecordImpl const& record_;
ESModuleCallingContext const& context_;
} guardAR(iRecord, context);
prefetch(iKey, EventSetupRecordDetails(&iRecord));
}

//
Expand Down
35 changes: 35 additions & 0 deletions FWCore/Framework/src/ESSourceDataProxyConcurrentBase.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// -*- C++ -*-
//
// Package: FWCore/Framework
// Class : __class__
//
// Implementation:
// [Notes on implementation]
//
// Original Author: __author__
// Created: __date__
//

// system include files

// user include files
#include "FWCore/Framework/interface/ESSourceDataProxyConcurrentBase.h"

//
// member functions
//

void edm::eventsetup::ESSourceDataProxyConcurrentBase::prefetchAsyncImpl(
edm::WaitingTaskHolder iTask,
edm::eventsetup::EventSetupRecordImpl const& iRecord,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*,
edm::ServiceToken const&,
edm::ESParentContext const& iParent) {
prefetchAsyncImplTemplate([](auto& iGroup, auto iActivity) { iGroup.run(std::move(iActivity)); },
[]() { return true; },
std::move(iTask),
iRecord,
iKey,
iParent);
}
Loading

0 comments on commit ab6d25d

Please sign in to comment.