diff --git a/src/cuda/Framework/EDProducer.h b/src/cuda/Framework/EDProducer.h index 8160b8c96..1cc63f58d 100644 --- a/src/cuda/Framework/EDProducer.h +++ b/src/cuda/Framework/EDProducer.h @@ -1,6 +1,9 @@ #ifndef EDProducerBase_h #define EDProducerBase_h +#include + +#include "Framework/EventRange.h" #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -14,9 +17,13 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } virtual void produce(Event& event, EventSetup const& eventSetup) = 0; @@ -27,27 +34,157 @@ namespace edm { private: }; + class EDBatchingProducer { + public: + EDBatchingProducer() = default; + virtual ~EDBatchingProducer() = default; + + bool hasAcquire() const { return false; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template class EDProducerExternalWork { public: + using AsyncState = T; + EDProducerExternalWork() = default; virtual ~EDProducerExternalWork() = default; bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder)); + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + if (events.size() > statesSize_) { + statesSize_ = events.size(); + states_ = std::make_unique(statesSize_); + } + for (size_t i = 0; i < events.size(); ++i) { + acquire(events[i], eventSetup, holder, states_[i]); + } } - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + virtual void acquire(Event const& event, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (size_t i = 0; i < events.size(); ++i) { + produce(events[i], eventSetup, states_[i]); + } + } + + virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + size_t statesSize_ = 0; + std::unique_ptr states_; + }; + + template <> + class EDProducerExternalWork { + public: + EDProducerExternalWork() = default; + virtual ~EDProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + for (Event const& event : events) { + acquire(event, eventSetup, holder); + } + } + + virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0; virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template + class EDBatchingProducerExternalWork { + public: + using AsyncState = T; + + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder, state_); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); } + + virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDBatchingProducerExternalWork { + public: + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + virtual void endJob() {} private: }; + } // namespace edm #endif diff --git a/src/cuda/Framework/EventBatch.h b/src/cuda/Framework/EventBatch.h new file mode 100644 index 000000000..3632c41ea --- /dev/null +++ b/src/cuda/Framework/EventBatch.h @@ -0,0 +1,36 @@ +#ifndef EventBatch_h +#define EventBatch_h + +#include + +#include "Framework/Event.h" +#include "Framework/EventRange.h" + +namespace edm { + + class EventBatch { + public: + Event& emplace(int streamId, int eventId, ProductRegistry const& reg) { + events_.emplace_back(streamId, eventId, reg); + return events_.back(); + } + + void reserve(size_t capacity) { events_.reserve(capacity); } + + void clear() { events_.clear(); } + + size_t size() const { return events_.size(); } + + bool empty() const { return events_.empty(); } + + EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); } + + ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); } + + private: + std::vector events_; + }; + +} // namespace edm + +#endif // EventBatch_h diff --git a/src/cuda/Framework/EventRange.h b/src/cuda/Framework/EventRange.h new file mode 100644 index 000000000..9a9cca645 --- /dev/null +++ b/src/cuda/Framework/EventRange.h @@ -0,0 +1,96 @@ +#ifndef EventRange_h +#define EventRange_h + +#include +#include +#include + +#include "Framework/Event.h" + +namespace edm { + + class EventRange { + public: + EventRange(Event* begin, Event* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + Event* begin() { return begin_; } + + Event const* begin() const { return begin_; } + + Event* end() { return end_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& at(size_t index) const { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event& operator[](size_t index) { return begin_[index]; } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event* begin_; + Event* end_; + }; + + class ConstEventRange { + public: + ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {} + + Event const* begin() const { return begin_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event const& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size() + << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event const* begin_; + Event const* end_; + }; + +} // namespace edm + +#endif // EventRange_h diff --git a/src/cuda/Framework/WaitingTaskHolder.h b/src/cuda/Framework/WaitingTaskHolder.h index d74c5d8ed..9cc1b5c66 100644 --- a/src/cuda/Framework/WaitingTaskHolder.h +++ b/src/cuda/Framework/WaitingTaskHolder.h @@ -34,7 +34,7 @@ namespace edm { explicit WaitingTaskHolder(edm::WaitingTask* iTask) : m_task(iTask) { m_task->increment_ref_count(); } ~WaitingTaskHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -66,10 +66,7 @@ namespace edm { } } - void doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void doneWaiting() { //spawn can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -81,6 +78,13 @@ namespace edm { } } + void doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + private: // ---------- member data -------------------------------- WaitingTask* m_task; diff --git a/src/cuda/Framework/WaitingTaskWithArenaHolder.cc b/src/cuda/Framework/WaitingTaskWithArenaHolder.cc index 7c852d041..d7c88001e 100644 --- a/src/cuda/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/cuda/Framework/WaitingTaskWithArenaHolder.cc @@ -24,7 +24,7 @@ namespace edm { WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -58,10 +58,7 @@ namespace edm { // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage // the task. doneWaiting can be called from a non-TBB thread. - void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void WaitingTaskWithArenaHolder::doneWaiting() { //enqueue can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -75,6 +72,17 @@ namespace edm { } } + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + // This next function is useful if you know from the context that // m_arena (which is set when the constructor was executes) is the // same arena in which you want to execute the doneWaiting function. diff --git a/src/cuda/Framework/WaitingTaskWithArenaHolder.h b/src/cuda/Framework/WaitingTaskWithArenaHolder.h index 4b14febfb..bab615686 100644 --- a/src/cuda/Framework/WaitingTaskWithArenaHolder.h +++ b/src/cuda/Framework/WaitingTaskWithArenaHolder.h @@ -48,6 +48,12 @@ namespace edm { WaitingTaskWithArenaHolder& operator=(WaitingTaskWithArenaHolder&& iRHS); + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void doneWaiting(); + // This spawns the task. The arena is needed to get the task spawned // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage diff --git a/src/cuda/Framework/Worker.cc b/src/cuda/Framework/Worker.cc index b3b3df74c..9c1c5c3c4 100644 --- a/src/cuda/Framework/Worker.cc +++ b/src/cuda/Framework/Worker.cc @@ -1,7 +1,8 @@ +//#include #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) { + void Worker::prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { @@ -10,7 +11,7 @@ namespace edm { iTask->increment_ref_count(); for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; - dep->doWorkAsync(event, eventSetup, iTask); + dep->doWorkAsync(events, eventSetup, iTask); } auto count = iTask->decrement_ref_count(); diff --git a/src/cuda/Framework/Worker.h b/src/cuda/Framework/Worker.h index 0a03670a4..26bdfed0b 100644 --- a/src/cuda/Framework/Worker.h +++ b/src/cuda/Framework/Worker.h @@ -2,9 +2,10 @@ #define Worker_h #include -#include //#include +#include +#include "Framework/EventRange.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" #include "Framework/WaitingTaskList.h" @@ -23,10 +24,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask); + void prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) = 0; + virtual void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,7 +51,7 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) override { + void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) override { waitingTasksWork_.add(iTask); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; @@ -58,14 +59,14 @@ namespace edm { //std::cout << "first doWorkAsync call" << std::endl; WaitingTask* moduleTask = make_waiting_task( - tbb::task::allocate_root(), [this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + tbb::task::allocate_root(), [this, events, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { //std::cout << "calling doProduce " << this << std::endl; - producer_.doProduce(event, eventSetup); + producer_.doProduce(events, eventSetup); } catch (...) { exceptionPtr = std::current_exception(); } @@ -75,24 +76,25 @@ namespace edm { }); if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{moduleTask}; - moduleTask = make_waiting_task(tbb::task::allocate_root(), - [this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( - std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - producer_.doAcquire(event, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + moduleTask = make_waiting_task( + tbb::task::allocate_root(), + [this, events = ConstEventRange(events), &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + std::exception_ptr const* iPtr) mutable { + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(events, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, moduleTask); + prefetchAsync(events, eventSetup, moduleTask); } } diff --git a/src/cuda/bin/EventProcessor.cc b/src/cuda/bin/EventProcessor.cc index a50cc6570..e1157472c 100644 --- a/src/cuda/bin/EventProcessor.cc +++ b/src/cuda/bin/EventProcessor.cc @@ -6,13 +6,14 @@ #include "EventProcessor.h" namespace edm { - EventProcessor::EventProcessor(int maxEvents, + EventProcessor::EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, std::filesystem::path const& datadir, bool validation) - : source_(maxEvents, registry_, datadir, validation) { + : source_(batchEvents, maxEvents, registry_, datadir, validation) { for (auto const& name : esproducers) { pluginManager_.load(name); auto esp = ESPluginFactory::create(name, datadir); diff --git a/src/cuda/bin/EventProcessor.h b/src/cuda/bin/EventProcessor.h index 614e60c38..bb5cbfc3e 100644 --- a/src/cuda/bin/EventProcessor.h +++ b/src/cuda/bin/EventProcessor.h @@ -14,7 +14,8 @@ namespace edm { class EventProcessor { public: - explicit EventProcessor(int maxEvents, + explicit EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, diff --git a/src/cuda/bin/PluginManager.cc b/src/cuda/bin/PluginManager.cc index 7977cdbc2..ecb28032b 100644 --- a/src/cuda/bin/PluginManager.cc +++ b/src/cuda/bin/PluginManager.cc @@ -1,5 +1,5 @@ -#include #include +//#include #include "PluginManager.h" diff --git a/src/cuda/bin/Source.cc b/src/cuda/bin/Source.cc index 64d1a3b5c..276652b4f 100644 --- a/src/cuda/bin/Source.cc +++ b/src/cuda/bin/Source.cc @@ -23,8 +23,13 @@ namespace { } // namespace namespace edm { - Source::Source(int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) - : maxEvents_(maxEvents), numEvents_(0), rawToken_(reg.produces()), validation_(validation) { + Source::Source( + int batchEvents, int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) + : batchEvents_(batchEvents), + maxEvents_(maxEvents), + numEvents_(0), + rawToken_(reg.produces()), + validation_(validation) { std::ifstream in_raw(datadir / "raw.bin", std::ios::binary); std::ifstream in_digiclusters; std::ifstream in_tracks; @@ -74,27 +79,45 @@ namespace edm { assert(raw_.size() == vertices_.size()); } + if (batchEvents_ < 1) { + batchEvents_ = 1; + } + if (maxEvents_ < 0) { maxEvents_ = raw_.size(); } } - std::unique_ptr Source::produce(int streamId, ProductRegistry const ®) { - const int old = numEvents_.fetch_add(1); - const int iev = old + 1; - if (old >= maxEvents_) { - return nullptr; + EventBatch Source::produce(int streamId, ProductRegistry const ®) { + + // atomically increase the event counter, without overflowing over maxEvents_ + int old_value = numEvents_; + int new_value; + do { + new_value = std::min(old_value + batchEvents_, maxEvents_); + } while (not numEvents_.compare_exchange_weak(old_value, new_value)); + + // check how many events should be read + const int size = new_value - old_value; + EventBatch events; + if (size <= 0) { + return events; } - auto ev = std::make_unique(streamId, iev, reg); - const int index = old % raw_.size(); - ev->emplace(rawToken_, raw_[index]); - if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + events.reserve(size); + for (int i = 1; i <= size; ++i) { + const int iev = old_value + i; + Event &event = events.emplace(streamId, iev, reg); + const int index = (iev - 1) % raw_.size(); + + event.emplace(rawToken_, raw_[index]); + if (validation_) { + event.emplace(digiClusterToken_, digiclusters_[index]); + event.emplace(trackToken_, tracks_[index]); + event.emplace(vertexToken_, vertices_[index]); + } } - return ev; + return events; } } // namespace edm diff --git a/src/cuda/bin/Source.h b/src/cuda/bin/Source.h index c13534f33..a3c99798d 100644 --- a/src/cuda/bin/Source.h +++ b/src/cuda/bin/Source.h @@ -5,8 +5,10 @@ #include #include #include +#include #include "Framework/Event.h" +#include "Framework/EventBatch.h" #include "DataFormats/FEDRawDataCollection.h" #include "DataFormats/DigiClusterCount.h" #include "DataFormats/TrackCount.h" @@ -15,14 +17,16 @@ namespace edm { class Source { public: - explicit Source(int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); + explicit Source( + int batchEvents, int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); int maxEvents() const { return maxEvents_; } // thread safe - std::unique_ptr produce(int streamId, ProductRegistry const& reg); + EventBatch produce(int streamId, ProductRegistry const& reg); private: + int batchEvents_; int maxEvents_; std::atomic numEvents_; EDPutTokenT const rawToken_; diff --git a/src/cuda/bin/StreamSchedule.cc b/src/cuda/bin/StreamSchedule.cc index 8636bce24..1f34f0f58 100644 --- a/src/cuda/bin/StreamSchedule.cc +++ b/src/cuda/bin/StreamSchedule.cc @@ -1,4 +1,5 @@ -//#include +#include +#include #include @@ -44,7 +45,7 @@ namespace edm { void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { auto task = - make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processOneEventAsync(std::move(h)); }); + make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processEventBatchAsync(std::move(h)); }); if (streamId_ == 0) { tbb::task::spawn(*task); } else { @@ -52,26 +53,26 @@ namespace edm { } } - void StreamSchedule::processOneEventAsync(WaitingTaskHolder h) { - auto event = source_->produce(streamId_, registry_); - if (event) { - // Pass the event object ownership to the "end-of-event" task - // Pass a non-owning pointer to the event to preceding tasks - //std::cout << "Begin processing event " << event->eventID() << std::endl; - auto eventPtr = event.get(); - auto nextEventTask = - make_waiting_task(tbb::task::allocate_root(), - [this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); - if (iPtr) { - h.doneWaiting(*iPtr); - } else { - for (auto const& worker : path_) { - worker->reset(); - } - processOneEventAsync(std::move(h)); - } - }); + void StreamSchedule::processEventBatchAsync(WaitingTaskHolder h) { + auto events = source_->produce(streamId_, registry_); + if (not events.empty()) { + // Pass the event batch ownership to the "end-of-event" task + // Pass a non-owning event range to the preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.range().at(0).eventID() << std::endl; + auto eventsRange = events.range(); + auto nextEventTask = make_waiting_task( + tbb::task::allocate_root(), + [this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { + events.clear(); + if (iPtr) { + h.doneWaiting(*iPtr); + } else { + for (auto const& worker : path_) { + worker->reset(); + } + processEventBatchAsync(std::move(h)); + } + }); // To guarantee that the nextEventTask is spawned also in // absence of Workers, and also to prevent spawning it before // all workers have been processed (should not happen though) @@ -79,10 +80,10 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTask); + (*iWorker)->doWorkAsync(eventsRange, *eventSetup_, nextEventTask); } } else { - h.doneWaiting(std::exception_ptr{}); + h.doneWaiting(); } } diff --git a/src/cuda/bin/StreamSchedule.h b/src/cuda/bin/StreamSchedule.h index 1bd364c70..a62dc3961 100644 --- a/src/cuda/bin/StreamSchedule.h +++ b/src/cuda/bin/StreamSchedule.h @@ -38,7 +38,7 @@ namespace edm { void endJob(); private: - void processOneEventAsync(WaitingTaskHolder h); + void processEventBatchAsync(WaitingTaskHolder h); ProductRegistry registry_; Source* source_; diff --git a/src/cuda/bin/main.cc b/src/cuda/bin/main.cc index c8a76eee5..61a7eb05c 100644 --- a/src/cuda/bin/main.cc +++ b/src/cuda/bin/main.cc @@ -21,7 +21,8 @@ namespace { "[--histogram] [--empty]\n\n" << "Options\n" << " --numberOfThreads Number of threads to use (default 1)\n" - << " --numberOfStreams Number of concurrent events (default 0=numberOfThreads)\n" + << " --numberOfStreams Number of concurrent batch of events (default 0=numberOfThreads)\n" + << " --batchEvents Number of events to process in a batch (default 1 for individual events)\n" << " --maxEvents Number of events to process (default -1 for all events in the input file)\n" << " --data Path to the 'data' directory (default 'data' in the directory of the executable)\n" << " --transfer Transfer results from GPU to CPU (default is to leave them on GPU)\n" @@ -37,6 +38,7 @@ int main(int argc, char** argv) { std::vector args(argv, argv + argc); int numberOfThreads = 1; int numberOfStreams = 0; + int batchEvents = 1; int maxEvents = -1; std::filesystem::path datadir; bool transfer = false; @@ -53,6 +55,9 @@ int main(int argc, char** argv) { } else if (*i == "--numberOfStreams") { ++i; numberOfStreams = std::stoi(*i); + } else if (*i == "--batchEvents") { + ++i; + batchEvents = std::stoul(*i); } else if (*i == "--maxEvents") { ++i; maxEvents = std::stoi(*i); @@ -119,7 +124,7 @@ int main(int argc, char** argv) { } } edm::EventProcessor processor( - maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + batchEvents, maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); maxEvents = processor.maxEvents(); std::cout << "Processing " << maxEvents << " events, of which " << numberOfStreams << " concurrently, with " diff --git a/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc b/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc index 66e93f818..943dd44ce 100644 --- a/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc +++ b/src/cuda/plugin-PixelTrackFitting/PixelTrackSoAFromCUDA.cc @@ -9,7 +9,9 @@ #include "Framework/EDProducer.h" #include "CUDACore/ScopedContext.h" -class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { +using PixelTrackSoAFromCUDA_AsyncState = cms::cuda::host::unique_ptr; + +class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit PixelTrackSoAFromCUDA(edm::ProductRegistry& reg); ~PixelTrackSoAFromCUDA() override = default; @@ -17,13 +19,12 @@ class PixelTrackSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) override; edm::EDGetTokenT> tokenCUDA_; edm::EDPutTokenT tokenSOA_; - - cms::cuda::host::unique_ptr m_soa; }; PixelTrackSoAFromCUDA::PixelTrackSoAFromCUDA(edm::ProductRegistry& reg) @@ -32,17 +33,18 @@ PixelTrackSoAFromCUDA::PixelTrackSoAFromCUDA(edm::ProductRegistry& reg) void PixelTrackSoAFromCUDA::acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { cms::cuda::Product const& inputDataWrapped = iEvent.get(tokenCUDA_); cms::cuda::ScopedContextAcquire ctx{inputDataWrapped, std::move(waitingTaskHolder)}; auto const& inputData = ctx.get(inputDataWrapped); - m_soa = inputData.toHostAsync(ctx.stream()); + state = inputData.toHostAsync(ctx.stream()); } -void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup) { +void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) { /* - auto const & tsoa = *m_soa; + auto const & tsoa = *state; auto maxTracks = tsoa.stride(); std::cout << "size of SoA" << sizeof(tsoa) << " stride " << maxTracks << std::endl; @@ -57,9 +59,9 @@ void PixelTrackSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& i */ // DO NOT make a copy (actually TWO....) - iEvent.emplace(tokenSOA_, PixelTrackHeterogeneous(std::move(m_soa))); + iEvent.emplace(tokenSOA_, PixelTrackHeterogeneous(std::move(state))); - assert(!m_soa); + assert(!state); } DEFINE_FWK_MODULE(PixelTrackSoAFromCUDA); diff --git a/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc b/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc index d709f0c5e..cbb4da507 100644 --- a/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc +++ b/src/cuda/plugin-PixelVertexFinding/PixelVertexSoAFromCUDA.cc @@ -10,7 +10,9 @@ #include "Framework/RunningAverage.h" #include "CUDACore/ScopedContext.h" -class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { +using PixelVertexSoAFromCUDA_AsyncState = cms::cuda::host::unique_ptr; + +class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit PixelVertexSoAFromCUDA(edm::ProductRegistry& reg); ~PixelVertexSoAFromCUDA() override = default; @@ -18,13 +20,12 @@ class PixelVertexSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) override; edm::EDGetTokenT> tokenCUDA_; edm::EDPutTokenT tokenSOA_; - - cms::cuda::host::unique_ptr m_soa; }; PixelVertexSoAFromCUDA::PixelVertexSoAFromCUDA(edm::ProductRegistry& reg) @@ -33,17 +34,18 @@ PixelVertexSoAFromCUDA::PixelVertexSoAFromCUDA(edm::ProductRegistry& reg) void PixelVertexSoAFromCUDA::acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { auto const& inputDataWrapped = iEvent.get(tokenCUDA_); cms::cuda::ScopedContextAcquire ctx{inputDataWrapped, std::move(waitingTaskHolder)}; auto const& inputData = ctx.get(inputDataWrapped); - m_soa = inputData.toHostAsync(ctx.stream()); + state = inputData.toHostAsync(ctx.stream()); } -void PixelVertexSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup) { +void PixelVertexSoAFromCUDA::produce(edm::Event& iEvent, edm::EventSetup const& iSetup, AsyncState& state) { // No copies.... - iEvent.emplace(tokenSOA_, ZVertexHeterogeneous(std::move(m_soa))); + iEvent.emplace(tokenSOA_, ZVertexHeterogeneous(std::move(state))); } DEFINE_FWK_MODULE(PixelVertexSoAFromCUDA); diff --git a/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc b/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc index 06624744e..ff387e1b4 100644 --- a/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc +++ b/src/cuda/plugin-SiPixelClusterizer/SiPixelRawToClusterCUDA.cc @@ -22,7 +22,13 @@ #include #include -class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { +struct SiPixelRawToClusterCUDA_AsyncState { + cms::cuda::ContextState ctx; + pixelgpudetails::SiPixelRawToClusterGPUKernel gpuAlgo; + pixelgpudetails::SiPixelRawToClusterGPUKernel::WordFedAppender wordFedAppender; +}; + +class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { public: explicit SiPixelRawToClusterCUDA(edm::ProductRegistry& reg); ~SiPixelRawToClusterCUDA() override = default; @@ -30,19 +36,14 @@ class SiPixelRawToClusterCUDA : public edm::EDProducerExternalWork { private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; - - cms::cuda::ContextState ctxState_; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; - edm::EDGetTokenT rawGetToken_; - edm::EDPutTokenT> digiPutToken_; + const edm::EDGetTokenT rawGetToken_; + const edm::EDPutTokenT> digiPutToken_; edm::EDPutTokenT> digiErrorPutToken_; - edm::EDPutTokenT> clusterPutToken_; - - pixelgpudetails::SiPixelRawToClusterGPUKernel gpuAlgo_; - std::unique_ptr wordFedAppender_; - PixelFormatterErrors errors_; + const edm::EDPutTokenT> clusterPutToken_; const bool isRun2_; const bool includeErrors_; @@ -59,14 +60,13 @@ SiPixelRawToClusterCUDA::SiPixelRawToClusterCUDA(edm::ProductRegistry& reg) if (includeErrors_) { digiErrorPutToken_ = reg.produces>(); } - - wordFedAppender_ = std::make_unique(); } void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { - cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder), ctxState_}; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { + cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder), state.ctx}; auto const& hgpuMap = iSetup.get(); if (hgpuMap.hasQuality() != useQuality_) { @@ -85,7 +85,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const auto& buffers = iEvent.get(rawGetToken_); - errors_.clear(); + PixelFormatterErrors errors; // GPU specific: Data extraction for RawToDigi GPU unsigned int wordCounterGPU = 0; @@ -115,7 +115,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, // check CRC bit const uint64_t* trailer = reinterpret_cast(rawData.data()) + (nWords - 1); - if (not errorcheck.checkCRC(errorsInEvent, fedId, trailer, errors_)) { + if (not errorcheck.checkCRC(errorsInEvent, fedId, trailer, errors)) { continue; } @@ -125,7 +125,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, bool moreHeaders = true; while (moreHeaders) { header++; - bool headerStatus = errorcheck.checkHeader(errorsInEvent, fedId, header, errors_); + bool headerStatus = errorcheck.checkHeader(errorsInEvent, fedId, header, errors); moreHeaders = headerStatus; } @@ -134,7 +134,7 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, trailer++; while (moreTrailers) { trailer--; - bool trailerStatus = errorcheck.checkTrailer(errorsInEvent, fedId, nWords, trailer, errors_); + bool trailerStatus = errorcheck.checkTrailer(errorsInEvent, fedId, nWords, trailer, errors); moreTrailers = trailerStatus; } @@ -142,33 +142,33 @@ void SiPixelRawToClusterCUDA::acquire(const edm::Event& iEvent, const uint32_t* ew = (const uint32_t*)(trailer); assert(0 == (ew - bw) % 2); - wordFedAppender_->initializeWordFed(fedId, wordCounterGPU, bw, (ew - bw)); + state.wordFedAppender.initializeWordFed(fedId, wordCounterGPU, bw, (ew - bw)); wordCounterGPU += (ew - bw); } // end of for loop - gpuAlgo_.makeClustersAsync(isRun2_, - gpuMap, - gpuModulesToUnpack, - gpuGains, - *wordFedAppender_, - std::move(errors_), - wordCounterGPU, - fedCounter, - useQuality_, - includeErrors_, - false, // debug - ctx.stream()); + state.gpuAlgo.makeClustersAsync(isRun2_, + gpuMap, + gpuModulesToUnpack, + gpuGains, + state.wordFedAppender, + std::move(errors), + wordCounterGPU, + fedCounter, + useQuality_, + includeErrors_, + false, // debug + ctx.stream()); } -void SiPixelRawToClusterCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { - cms::cuda::ScopedContextProduce ctx{ctxState_}; +void SiPixelRawToClusterCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { + cms::cuda::ScopedContextProduce ctx{state.ctx}; - auto tmp = gpuAlgo_.getResults(); + auto tmp = state.gpuAlgo.getResults(); ctx.emplace(iEvent, digiPutToken_, std::move(tmp.first)); ctx.emplace(iEvent, clusterPutToken_, std::move(tmp.second)); if (includeErrors_) { - ctx.emplace(iEvent, digiErrorPutToken_, gpuAlgo_.getErrors()); + ctx.emplace(iEvent, digiErrorPutToken_, state.gpuAlgo.getErrors()); } } diff --git a/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc b/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc index 448f4b797..d3e806a49 100644 --- a/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc +++ b/src/cuda/plugin-SiPixelRawToDigi/SiPixelDigisSoAFromCUDA.cc @@ -8,7 +8,15 @@ #include "CUDACore/ScopedContext.h" #include "CUDACore/host_unique_ptr.h" -class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { +struct SiPixelDigisSoAFromCUDA_AsyncState { + cms::cuda::host::unique_ptr pdigi; + cms::cuda::host::unique_ptr rawIdArr; + cms::cuda::host::unique_ptr adc; + cms::cuda::host::unique_ptr clus; + size_t nDigis; +}; + +class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { public: explicit SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg); ~SiPixelDigisSoAFromCUDA() override = default; @@ -16,18 +24,12 @@ class SiPixelDigisSoAFromCUDA : public edm::EDProducerExternalWork { private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; - - edm::EDGetTokenT> digiGetToken_; - edm::EDPutTokenT digiPutToken_; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; - cms::cuda::host::unique_ptr pdigi_; - cms::cuda::host::unique_ptr rawIdArr_; - cms::cuda::host::unique_ptr adc_; - cms::cuda::host::unique_ptr clus_; - - size_t nDigis_; + const edm::EDGetTokenT> digiGetToken_; + const edm::EDPutTokenT digiPutToken_; }; SiPixelDigisSoAFromCUDA::SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg) @@ -36,20 +38,20 @@ SiPixelDigisSoAFromCUDA::SiPixelDigisSoAFromCUDA(edm::ProductRegistry& reg) void SiPixelDigisSoAFromCUDA::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { // Do the transfer in a CUDA stream parallel to the computation CUDA stream cms::cuda::ScopedContextAcquire ctx{iEvent.streamID(), std::move(waitingTaskHolder)}; const auto& gpuDigis = ctx.get(iEvent, digiGetToken_); - - nDigis_ = gpuDigis.nDigis(); - pdigi_ = gpuDigis.pdigiToHostAsync(ctx.stream()); - rawIdArr_ = gpuDigis.rawIdArrToHostAsync(ctx.stream()); - adc_ = gpuDigis.adcToHostAsync(ctx.stream()); - clus_ = gpuDigis.clusToHostAsync(ctx.stream()); + state.pdigi = gpuDigis.pdigiToHostAsync(ctx.stream()); + state.rawIdArr = gpuDigis.rawIdArrToHostAsync(ctx.stream()); + state.adc = gpuDigis.adcToHostAsync(ctx.stream()); + state.clus = gpuDigis.clusToHostAsync(ctx.stream()); + state.nDigis = gpuDigis.nDigis(); } -void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { +void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { // The following line copies the data from the pinned host memory to // regular host memory. In principle that feels unnecessary (why not // just use the pinned host memory?). There are a few arguments for @@ -60,12 +62,8 @@ void SiPixelDigisSoAFromCUDA::produce(edm::Event& iEvent, const edm::EventSetup& // host memory to be allocated without a CUDA stream // - What if a CPU algorithm would produce the same SoA? We can't // use cudaMallocHost without a GPU... - iEvent.emplace(digiPutToken_, nDigis_, pdigi_.get(), rawIdArr_.get(), adc_.get(), clus_.get()); - - pdigi_.reset(); - rawIdArr_.reset(); - adc_.reset(); - clus_.reset(); + iEvent.emplace( + digiPutToken_, state.nDigis, state.pdigi.get(), state.rawIdArr.get(), state.adc.get(), state.clus.get()); } // define as framework plugin diff --git a/src/cuda/plugin-Validation/HistoValidator.cc b/src/cuda/plugin-Validation/HistoValidator.cc index d7b11d4b2..90b18f2dc 100644 --- a/src/cuda/plugin-Validation/HistoValidator.cc +++ b/src/cuda/plugin-Validation/HistoValidator.cc @@ -15,15 +15,29 @@ #include #include -class HistoValidator : public edm::EDProducerExternalWork { +struct HistoValidator_AsyncState { + uint32_t nDigis; + uint32_t nModules; + uint32_t nClusters; + uint32_t nHits; + cms::cuda::host::unique_ptr adc; + cms::cuda::host::unique_ptr clusInModule; + cms::cuda::host::unique_ptr localCoord; + cms::cuda::host::unique_ptr globalCoord; + cms::cuda::host::unique_ptr charge; + cms::cuda::host::unique_ptr size; +}; + +class HistoValidator : public edm::EDProducerExternalWork { public: explicit HistoValidator(edm::ProductRegistry& reg); private: void acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) override; - void produce(edm::Event& iEvent, const edm::EventSetup& iSetup) override; + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const override; + void produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) override; void endJob() override; edm::EDGetTokenT> digiToken_; @@ -32,17 +46,6 @@ class HistoValidator : public edm::EDProducerExternalWork { edm::EDGetTokenT trackToken_; edm::EDGetTokenT vertexToken_; - uint32_t nDigis; - uint32_t nModules; - uint32_t nClusters; - uint32_t nHits; - cms::cuda::host::unique_ptr h_adc; - cms::cuda::host::unique_ptr h_clusInModule; - cms::cuda::host::unique_ptr h_localCoord; - cms::cuda::host::unique_ptr h_globalCoord; - cms::cuda::host::unique_ptr h_charge; - cms::cuda::host::unique_ptr h_size; - static std::map histos; }; @@ -90,61 +93,65 @@ HistoValidator::HistoValidator(edm::ProductRegistry& reg) void HistoValidator::acquire(const edm::Event& iEvent, const edm::EventSetup& iSetup, - edm::WaitingTaskWithArenaHolder waitingTaskHolder) { + edm::WaitingTaskWithArenaHolder waitingTaskHolder, + AsyncState& state) const { auto const& pdigis = iEvent.get(digiToken_); cms::cuda::ScopedContextAcquire ctx{pdigis, std::move(waitingTaskHolder)}; auto const& digis = ctx.get(iEvent, digiToken_); auto const& clusters = ctx.get(iEvent, clusterToken_); auto const& hits = ctx.get(iEvent, hitToken_); - nDigis = digis.nDigis(); - nModules = digis.nModules(); - h_adc = digis.adcToHostAsync(ctx.stream()); - - nClusters = clusters.nClusters(); - h_clusInModule = cms::cuda::make_host_unique(nModules, ctx.stream()); - cudaCheck(cudaMemcpyAsync( - h_clusInModule.get(), clusters.clusInModule(), sizeof(uint32_t) * nModules, cudaMemcpyDefault, ctx.stream())); - - nHits = hits.nHits(); - h_localCoord = hits.localCoordToHostAsync(ctx.stream()); - h_globalCoord = hits.globalCoordToHostAsync(ctx.stream()); - h_charge = hits.chargeToHostAsync(ctx.stream()); - h_size = hits.sizeToHostAsync(ctx.stream()); + state.nDigis = digis.nDigis(); + state.nModules = digis.nModules(); + state.adc = digis.adcToHostAsync(ctx.stream()); + + state.nClusters = clusters.nClusters(); + state.clusInModule = cms::cuda::make_host_unique(state.nModules, ctx.stream()); + cudaCheck(cudaMemcpyAsync(state.clusInModule.get(), + clusters.clusInModule(), + sizeof(uint32_t) * state.nModules, + cudaMemcpyDefault, + ctx.stream())); + + state.nHits = hits.nHits(); + state.localCoord = hits.localCoordToHostAsync(ctx.stream()); + state.globalCoord = hits.globalCoordToHostAsync(ctx.stream()); + state.charge = hits.chargeToHostAsync(ctx.stream()); + state.size = hits.sizeToHostAsync(ctx.stream()); } -void HistoValidator::produce(edm::Event& iEvent, const edm::EventSetup& iSetup) { - histos["digi_n"].fill(nDigis); - for (uint32_t i = 0; i < nDigis; ++i) { - histos["digi_adc"].fill(h_adc[i]); +void HistoValidator::produce(edm::Event& iEvent, const edm::EventSetup& iSetup, AsyncState& state) { + histos["digi_n"].fill(state.nDigis); + for (uint32_t i = 0; i < state.nDigis; ++i) { + histos["digi_adc"].fill(state.adc[i]); } - h_adc.reset(); - histos["module_n"].fill(nModules); + //adc.reset(); + histos["module_n"].fill(state.nModules); - histos["cluster_n"].fill(nClusters); - for (uint32_t i = 0; i < nModules; ++i) { - histos["cluster_per_module_n"].fill(h_clusInModule[i]); + histos["cluster_n"].fill(state.nClusters); + for (uint32_t i = 0; i < state.nModules; ++i) { + histos["cluster_per_module_n"].fill(state.clusInModule[i]); } - h_clusInModule.reset(); - - histos["hit_n"].fill(nHits); - for (uint32_t i = 0; i < nHits; ++i) { - histos["hit_lx"].fill(h_localCoord[i]); - histos["hit_ly"].fill(h_localCoord[i + nHits]); - histos["hit_lex"].fill(h_localCoord[i + 2 * nHits]); - histos["hit_ley"].fill(h_localCoord[i + 3 * nHits]); - histos["hit_gx"].fill(h_globalCoord[i]); - histos["hit_gy"].fill(h_globalCoord[i + nHits]); - histos["hit_gz"].fill(h_globalCoord[i + 2 * nHits]); - histos["hit_gr"].fill(h_globalCoord[i + 3 * nHits]); - histos["hit_charge"].fill(h_charge[i]); - histos["hit_sizex"].fill(h_size[i]); - histos["hit_sizey"].fill(h_size[i + nHits]); + //clusInModule.reset(); + + histos["hit_n"].fill(state.nHits); + for (uint32_t i = 0; i < state.nHits; ++i) { + histos["hit_lx"].fill(state.localCoord[i]); + histos["hit_ly"].fill(state.localCoord[i + state.nHits]); + histos["hit_lex"].fill(state.localCoord[i + 2 * state.nHits]); + histos["hit_ley"].fill(state.localCoord[i + 3 * state.nHits]); + histos["hit_gx"].fill(state.globalCoord[i]); + histos["hit_gy"].fill(state.globalCoord[i + state.nHits]); + histos["hit_gz"].fill(state.globalCoord[i + 2 * state.nHits]); + histos["hit_gr"].fill(state.globalCoord[i + 3 * state.nHits]); + histos["hit_charge"].fill(state.charge[i]); + histos["hit_sizex"].fill(state.size[i]); + histos["hit_sizey"].fill(state.size[i + state.nHits]); } - h_localCoord.reset(); - h_globalCoord.reset(); - h_charge.reset(); - h_size.reset(); + //state.localCoord.reset(); + //state.globalCoord.reset(); + //state.charge.reset(); + //state.size.reset(); { auto const& tracks = iEvent.get(trackToken_); diff --git a/src/fwtest/Framework/EDProducer.h b/src/fwtest/Framework/EDProducer.h index 8160b8c96..1cc63f58d 100644 --- a/src/fwtest/Framework/EDProducer.h +++ b/src/fwtest/Framework/EDProducer.h @@ -1,6 +1,9 @@ #ifndef EDProducerBase_h #define EDProducerBase_h +#include + +#include "Framework/EventRange.h" #include "Framework/WaitingTaskWithArenaHolder.h" namespace edm { @@ -14,9 +17,13 @@ namespace edm { bool hasAcquire() const { return false; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } virtual void produce(Event& event, EventSetup const& eventSetup) = 0; @@ -27,27 +34,157 @@ namespace edm { private: }; + class EDBatchingProducer { + public: + EDBatchingProducer() = default; + virtual ~EDBatchingProducer() = default; + + bool hasAcquire() const { return false; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) {} + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template class EDProducerExternalWork { public: + using AsyncState = T; + EDProducerExternalWork() = default; virtual ~EDProducerExternalWork() = default; bool hasAcquire() const { return true; } - void doAcquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { - acquire(event, eventSetup, std::move(holder)); + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + if (events.size() > statesSize_) { + statesSize_ = events.size(); + states_ = std::make_unique(statesSize_); + } + for (size_t i = 0; i < events.size(); ++i) { + acquire(events[i], eventSetup, holder, states_[i]); + } } - void doProduce(Event& event, EventSetup const& eventSetup) { produce(event, eventSetup); } + virtual void acquire(Event const& event, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (size_t i = 0; i < events.size(); ++i) { + produce(events[i], eventSetup, states_[i]); + } + } + + virtual void produce(Event& event, EventSetup const& eventSetup, AsyncState& state) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + size_t statesSize_ = 0; + std::unique_ptr states_; + }; + + template <> + class EDProducerExternalWork { + public: + EDProducerExternalWork() = default; + virtual ~EDProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + for (Event const& event : events) { + acquire(event, eventSetup, holder); + } + } + + virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { + for (Event& event : events) { + produce(event, eventSetup); + } + } - virtual void acquire(Event const& event, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) = 0; virtual void produce(Event& event, EventSetup const& eventSetup) = 0; void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + }; + + template + class EDBatchingProducerExternalWork { + public: + using AsyncState = T; + + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder, state_); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder, + AsyncState& state) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup, state_); } + + virtual void produce(EventRange events, EventSetup const& eventSetup, AsyncState& states) = 0; + + void doEndJob() { endJob(); } + + virtual void endJob() {} + + private: + AsyncState state_; + }; + + template <> + class EDBatchingProducerExternalWork { + public: + EDBatchingProducerExternalWork() = default; + virtual ~EDBatchingProducerExternalWork() = default; + + bool hasAcquire() const { return true; } + + void doAcquire(ConstEventRange events, EventSetup const& eventSetup, WaitingTaskWithArenaHolder holder) { + acquire(events, eventSetup, holder); + } + + virtual void acquire(ConstEventRange events, + EventSetup const& eventSetup, + WaitingTaskWithArenaHolder holder) const = 0; + + void doProduce(EventRange events, EventSetup const& eventSetup) { produce(events, eventSetup); } + + virtual void produce(EventRange events, EventSetup const& eventSetup) = 0; + + void doEndJob() { endJob(); } + virtual void endJob() {} private: }; + } // namespace edm #endif diff --git a/src/fwtest/Framework/EventBatch.h b/src/fwtest/Framework/EventBatch.h new file mode 100644 index 000000000..3632c41ea --- /dev/null +++ b/src/fwtest/Framework/EventBatch.h @@ -0,0 +1,36 @@ +#ifndef EventBatch_h +#define EventBatch_h + +#include + +#include "Framework/Event.h" +#include "Framework/EventRange.h" + +namespace edm { + + class EventBatch { + public: + Event& emplace(int streamId, int eventId, ProductRegistry const& reg) { + events_.emplace_back(streamId, eventId, reg); + return events_.back(); + } + + void reserve(size_t capacity) { events_.reserve(capacity); } + + void clear() { events_.clear(); } + + size_t size() const { return events_.size(); } + + bool empty() const { return events_.empty(); } + + EventRange range() { return EventRange(&events_.front(), &events_.back() + 1); } + + ConstEventRange range() const { return ConstEventRange(&events_.front(), &events_.back() + 1); } + + private: + std::vector events_; + }; + +} // namespace edm + +#endif // EventBatch_h diff --git a/src/fwtest/Framework/EventRange.h b/src/fwtest/Framework/EventRange.h new file mode 100644 index 000000000..9a9cca645 --- /dev/null +++ b/src/fwtest/Framework/EventRange.h @@ -0,0 +1,96 @@ +#ifndef EventRange_h +#define EventRange_h + +#include +#include +#include + +#include "Framework/Event.h" + +namespace edm { + + class EventRange { + public: + EventRange(Event* begin, Event* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + Event* begin() { return begin_; } + + Event const* begin() const { return begin_; } + + Event* end() { return end_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& at(size_t index) const { + if (index >= size()) { + std::stringstream msg; + msg << "EventRange::at() range check failed: index " << index << " is outside the range [0.." << size() << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event& operator[](size_t index) { return begin_[index]; } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event* begin_; + Event* end_; + }; + + class ConstEventRange { + public: + ConstEventRange(Event const* begin, Event const* end) : begin_(begin), end_(end) { + assert(begin_); + assert(end_); + assert(end_ >= begin_); + } + + ConstEventRange(EventRange range) : begin_(range.begin()), end_(range.end()) {} + + Event const* begin() const { return begin_; } + + Event const* end() const { return end_; } + + size_t size() const { return end_ - begin_; } + + bool empty() const { return end_ == begin_; } + + Event const& at(size_t index) { + if (index >= size()) { + std::stringstream msg; + msg << "ConstEventRange::at() range check failed: index " << index << " is outside the range [0.." << size() + << ")"; + throw std::out_of_range(msg.str()); + } + return begin_[index]; + } + + Event const& operator[](size_t index) const { return begin_[index]; } + + private: + Event const* begin_; + Event const* end_; + }; + +} // namespace edm + +#endif // EventRange_h diff --git a/src/fwtest/Framework/WaitingTaskHolder.h b/src/fwtest/Framework/WaitingTaskHolder.h index d74c5d8ed..9cc1b5c66 100644 --- a/src/fwtest/Framework/WaitingTaskHolder.h +++ b/src/fwtest/Framework/WaitingTaskHolder.h @@ -34,7 +34,7 @@ namespace edm { explicit WaitingTaskHolder(edm::WaitingTask* iTask) : m_task(iTask) { m_task->increment_ref_count(); } ~WaitingTaskHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -66,10 +66,7 @@ namespace edm { } } - void doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void doneWaiting() { //spawn can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -81,6 +78,13 @@ namespace edm { } } + void doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + private: // ---------- member data -------------------------------- WaitingTask* m_task; diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc index 7c852d041..d7c88001e 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.cc @@ -24,7 +24,7 @@ namespace edm { WaitingTaskWithArenaHolder::~WaitingTaskWithArenaHolder() { if (m_task) { - doneWaiting(std::exception_ptr{}); + doneWaiting(); } } @@ -58,10 +58,7 @@ namespace edm { // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage // the task. doneWaiting can be called from a non-TBB thread. - void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { - if (iExcept) { - m_task->dependentTaskFailed(iExcept); - } + void WaitingTaskWithArenaHolder::doneWaiting() { //enqueue can run the task before we finish // doneWaiting and some other thread might // try to reuse this object. Resetting @@ -75,6 +72,17 @@ namespace edm { } } + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void WaitingTaskWithArenaHolder::doneWaiting(std::exception_ptr iExcept) { + if (iExcept) { + m_task->dependentTaskFailed(iExcept); + } + doneWaiting(); + } + // This next function is useful if you know from the context that // m_arena (which is set when the constructor was executes) is the // same arena in which you want to execute the doneWaiting function. diff --git a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h index 4b14febfb..bab615686 100644 --- a/src/fwtest/Framework/WaitingTaskWithArenaHolder.h +++ b/src/fwtest/Framework/WaitingTaskWithArenaHolder.h @@ -48,6 +48,12 @@ namespace edm { WaitingTaskWithArenaHolder& operator=(WaitingTaskWithArenaHolder&& iRHS); + // This spawns the task. The arena is needed to get the task spawned + // into the correct arena of threads. Use of the arena allows doneWaiting + // to be called from a thread outside the arena of threads that will manage + // the task. doneWaiting can be called from a non-TBB thread. + void doneWaiting(); + // This spawns the task. The arena is needed to get the task spawned // into the correct arena of threads. Use of the arena allows doneWaiting // to be called from a thread outside the arena of threads that will manage diff --git a/src/fwtest/Framework/Worker.cc b/src/fwtest/Framework/Worker.cc index b3b3df74c..9c1c5c3c4 100644 --- a/src/fwtest/Framework/Worker.cc +++ b/src/fwtest/Framework/Worker.cc @@ -1,7 +1,8 @@ +//#include #include "Framework/Worker.h" namespace edm { - void Worker::prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) { + void Worker::prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) { //std::cout << "prefetchAsync for " << this << " iTask " << iTask << std::endl; bool expected = false; if (prefetchRequested_.compare_exchange_strong(expected, true)) { @@ -10,7 +11,7 @@ namespace edm { iTask->increment_ref_count(); for (Worker* dep : itemsToGet_) { //std::cout << "calling doWorkAsync for " << dep << " with " << iTask << std::endl; - dep->doWorkAsync(event, eventSetup, iTask); + dep->doWorkAsync(events, eventSetup, iTask); } auto count = iTask->decrement_ref_count(); diff --git a/src/fwtest/Framework/Worker.h b/src/fwtest/Framework/Worker.h index 0a03670a4..26bdfed0b 100644 --- a/src/fwtest/Framework/Worker.h +++ b/src/fwtest/Framework/Worker.h @@ -2,9 +2,10 @@ #define Worker_h #include -#include //#include +#include +#include "Framework/EventRange.h" #include "Framework/WaitingTask.h" #include "Framework/WaitingTaskHolder.h" #include "Framework/WaitingTaskList.h" @@ -23,10 +24,10 @@ namespace edm { void setItemsToGet(std::vector workers) { itemsToGet_ = std::move(workers); } // thread safe - void prefetchAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask); + void prefetchAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask); // not thread safe - virtual void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) = 0; + virtual void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) = 0; // not thread safe virtual void doEndJob() = 0; @@ -50,7 +51,7 @@ namespace edm { public: explicit WorkerT(ProductRegistry& reg) : producer_(reg) {} - void doWorkAsync(Event& event, EventSetup const& eventSetup, WaitingTask* iTask) override { + void doWorkAsync(EventRange events, EventSetup const& eventSetup, WaitingTask* iTask) override { waitingTasksWork_.add(iTask); //std::cout << "doWorkAsync for " << this << " with iTask " << iTask << std::endl; bool expected = false; @@ -58,14 +59,14 @@ namespace edm { //std::cout << "first doWorkAsync call" << std::endl; WaitingTask* moduleTask = make_waiting_task( - tbb::task::allocate_root(), [this, &event, &eventSetup](std::exception_ptr const* iPtr) mutable { + tbb::task::allocate_root(), [this, events, &eventSetup](std::exception_ptr const* iPtr) mutable { if (iPtr) { waitingTasksWork_.doneWaiting(*iPtr); } else { std::exception_ptr exceptionPtr; try { //std::cout << "calling doProduce " << this << std::endl; - producer_.doProduce(event, eventSetup); + producer_.doProduce(events, eventSetup); } catch (...) { exceptionPtr = std::current_exception(); } @@ -75,24 +76,25 @@ namespace edm { }); if (producer_.hasAcquire()) { WaitingTaskWithArenaHolder runProduceHolder{moduleTask}; - moduleTask = make_waiting_task(tbb::task::allocate_root(), - [this, &event, &eventSetup, runProduceHolder = std::move(runProduceHolder)]( - std::exception_ptr const* iPtr) mutable { - if (iPtr) { - runProduceHolder.doneWaiting(*iPtr); - } else { - std::exception_ptr exceptionPtr; - try { - producer_.doAcquire(event, eventSetup, runProduceHolder); - } catch (...) { - exceptionPtr = std::current_exception(); - } - runProduceHolder.doneWaiting(exceptionPtr); - } - }); + moduleTask = make_waiting_task( + tbb::task::allocate_root(), + [this, events = ConstEventRange(events), &eventSetup, runProduceHolder = std::move(runProduceHolder)]( + std::exception_ptr const* iPtr) mutable { + if (iPtr) { + runProduceHolder.doneWaiting(*iPtr); + } else { + std::exception_ptr exceptionPtr; + try { + producer_.doAcquire(events, eventSetup, runProduceHolder); + } catch (...) { + exceptionPtr = std::current_exception(); + } + runProduceHolder.doneWaiting(exceptionPtr); + } + }); } //std::cout << "calling prefetchAsync " << this << " with moduleTask " << moduleTask << std::endl; - prefetchAsync(event, eventSetup, moduleTask); + prefetchAsync(events, eventSetup, moduleTask); } } diff --git a/src/fwtest/Makefile.deps b/src/fwtest/Makefile.deps index b531eb578..3c07a802b 100644 --- a/src/fwtest/Makefile.deps +++ b/src/fwtest/Makefile.deps @@ -1,3 +1,3 @@ fwtest_EXTERNAL_DEPENDS := TBB -Test1_DEPENDS := Framework DataFormats -Test2_DEPENDS := Framework +Test_DEPENDS := Framework DataFormats +TestBatching_DEPENDS := Framework diff --git a/src/fwtest/bin/EventProcessor.cc b/src/fwtest/bin/EventProcessor.cc index a50cc6570..e1157472c 100644 --- a/src/fwtest/bin/EventProcessor.cc +++ b/src/fwtest/bin/EventProcessor.cc @@ -6,13 +6,14 @@ #include "EventProcessor.h" namespace edm { - EventProcessor::EventProcessor(int maxEvents, + EventProcessor::EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, std::filesystem::path const& datadir, bool validation) - : source_(maxEvents, registry_, datadir, validation) { + : source_(batchEvents, maxEvents, registry_, datadir, validation) { for (auto const& name : esproducers) { pluginManager_.load(name); auto esp = ESPluginFactory::create(name, datadir); diff --git a/src/fwtest/bin/EventProcessor.h b/src/fwtest/bin/EventProcessor.h index 614e60c38..bb5cbfc3e 100644 --- a/src/fwtest/bin/EventProcessor.h +++ b/src/fwtest/bin/EventProcessor.h @@ -14,7 +14,8 @@ namespace edm { class EventProcessor { public: - explicit EventProcessor(int maxEvents, + explicit EventProcessor(int batchEvents, + int maxEvents, int numberOfStreams, std::vector const& path, std::vector const& esproducers, diff --git a/src/fwtest/bin/PluginManager.cc b/src/fwtest/bin/PluginManager.cc index 7977cdbc2..ecb28032b 100644 --- a/src/fwtest/bin/PluginManager.cc +++ b/src/fwtest/bin/PluginManager.cc @@ -1,5 +1,5 @@ -#include #include +//#include #include "PluginManager.h" diff --git a/src/fwtest/bin/Source.cc b/src/fwtest/bin/Source.cc index 64d1a3b5c..276652b4f 100644 --- a/src/fwtest/bin/Source.cc +++ b/src/fwtest/bin/Source.cc @@ -23,8 +23,13 @@ namespace { } // namespace namespace edm { - Source::Source(int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) - : maxEvents_(maxEvents), numEvents_(0), rawToken_(reg.produces()), validation_(validation) { + Source::Source( + int batchEvents, int maxEvents, ProductRegistry ®, std::filesystem::path const &datadir, bool validation) + : batchEvents_(batchEvents), + maxEvents_(maxEvents), + numEvents_(0), + rawToken_(reg.produces()), + validation_(validation) { std::ifstream in_raw(datadir / "raw.bin", std::ios::binary); std::ifstream in_digiclusters; std::ifstream in_tracks; @@ -74,27 +79,45 @@ namespace edm { assert(raw_.size() == vertices_.size()); } + if (batchEvents_ < 1) { + batchEvents_ = 1; + } + if (maxEvents_ < 0) { maxEvents_ = raw_.size(); } } - std::unique_ptr Source::produce(int streamId, ProductRegistry const ®) { - const int old = numEvents_.fetch_add(1); - const int iev = old + 1; - if (old >= maxEvents_) { - return nullptr; + EventBatch Source::produce(int streamId, ProductRegistry const ®) { + + // atomically increase the event counter, without overflowing over maxEvents_ + int old_value = numEvents_; + int new_value; + do { + new_value = std::min(old_value + batchEvents_, maxEvents_); + } while (not numEvents_.compare_exchange_weak(old_value, new_value)); + + // check how many events should be read + const int size = new_value - old_value; + EventBatch events; + if (size <= 0) { + return events; } - auto ev = std::make_unique(streamId, iev, reg); - const int index = old % raw_.size(); - ev->emplace(rawToken_, raw_[index]); - if (validation_) { - ev->emplace(digiClusterToken_, digiclusters_[index]); - ev->emplace(trackToken_, tracks_[index]); - ev->emplace(vertexToken_, vertices_[index]); + events.reserve(size); + for (int i = 1; i <= size; ++i) { + const int iev = old_value + i; + Event &event = events.emplace(streamId, iev, reg); + const int index = (iev - 1) % raw_.size(); + + event.emplace(rawToken_, raw_[index]); + if (validation_) { + event.emplace(digiClusterToken_, digiclusters_[index]); + event.emplace(trackToken_, tracks_[index]); + event.emplace(vertexToken_, vertices_[index]); + } } - return ev; + return events; } } // namespace edm diff --git a/src/fwtest/bin/Source.h b/src/fwtest/bin/Source.h index c13534f33..a3c99798d 100644 --- a/src/fwtest/bin/Source.h +++ b/src/fwtest/bin/Source.h @@ -5,8 +5,10 @@ #include #include #include +#include #include "Framework/Event.h" +#include "Framework/EventBatch.h" #include "DataFormats/FEDRawDataCollection.h" #include "DataFormats/DigiClusterCount.h" #include "DataFormats/TrackCount.h" @@ -15,14 +17,16 @@ namespace edm { class Source { public: - explicit Source(int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); + explicit Source( + int batchEvents, int maxEvents, ProductRegistry& reg, std::filesystem::path const& datadir, bool validation); int maxEvents() const { return maxEvents_; } // thread safe - std::unique_ptr produce(int streamId, ProductRegistry const& reg); + EventBatch produce(int streamId, ProductRegistry const& reg); private: + int batchEvents_; int maxEvents_; std::atomic numEvents_; EDPutTokenT const rawToken_; diff --git a/src/fwtest/bin/StreamSchedule.cc b/src/fwtest/bin/StreamSchedule.cc index 8636bce24..1f34f0f58 100644 --- a/src/fwtest/bin/StreamSchedule.cc +++ b/src/fwtest/bin/StreamSchedule.cc @@ -1,4 +1,5 @@ -//#include +#include +#include #include @@ -44,7 +45,7 @@ namespace edm { void StreamSchedule::runToCompletionAsync(WaitingTaskHolder h) { auto task = - make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processOneEventAsync(std::move(h)); }); + make_functor_task(tbb::task::allocate_root(), [this, h]() mutable { processEventBatchAsync(std::move(h)); }); if (streamId_ == 0) { tbb::task::spawn(*task); } else { @@ -52,26 +53,26 @@ namespace edm { } } - void StreamSchedule::processOneEventAsync(WaitingTaskHolder h) { - auto event = source_->produce(streamId_, registry_); - if (event) { - // Pass the event object ownership to the "end-of-event" task - // Pass a non-owning pointer to the event to preceding tasks - //std::cout << "Begin processing event " << event->eventID() << std::endl; - auto eventPtr = event.get(); - auto nextEventTask = - make_waiting_task(tbb::task::allocate_root(), - [this, h = std::move(h), ev = std::move(event)](std::exception_ptr const* iPtr) mutable { - ev.reset(); - if (iPtr) { - h.doneWaiting(*iPtr); - } else { - for (auto const& worker : path_) { - worker->reset(); - } - processOneEventAsync(std::move(h)); - } - }); + void StreamSchedule::processEventBatchAsync(WaitingTaskHolder h) { + auto events = source_->produce(streamId_, registry_); + if (not events.empty()) { + // Pass the event batch ownership to the "end-of-event" task + // Pass a non-owning event range to the preceding tasks + //std::cout << "Begin processing a batch of " << events.size() << " events starting from " << events.range().at(0).eventID() << std::endl; + auto eventsRange = events.range(); + auto nextEventTask = make_waiting_task( + tbb::task::allocate_root(), + [this, h = std::move(h), events = std::move(events)](std::exception_ptr const* iPtr) mutable { + events.clear(); + if (iPtr) { + h.doneWaiting(*iPtr); + } else { + for (auto const& worker : path_) { + worker->reset(); + } + processEventBatchAsync(std::move(h)); + } + }); // To guarantee that the nextEventTask is spawned also in // absence of Workers, and also to prevent spawning it before // all workers have been processed (should not happen though) @@ -79,10 +80,10 @@ namespace edm { for (auto iWorker = path_.rbegin(); iWorker != path_.rend(); ++iWorker) { //std::cout << "calling doWorkAsync for " << iWorker->get() << " with nextEventTask " << nextEventTask << std::endl; - (*iWorker)->doWorkAsync(*eventPtr, *eventSetup_, nextEventTask); + (*iWorker)->doWorkAsync(eventsRange, *eventSetup_, nextEventTask); } } else { - h.doneWaiting(std::exception_ptr{}); + h.doneWaiting(); } } diff --git a/src/fwtest/bin/StreamSchedule.h b/src/fwtest/bin/StreamSchedule.h index 1bd364c70..a62dc3961 100644 --- a/src/fwtest/bin/StreamSchedule.h +++ b/src/fwtest/bin/StreamSchedule.h @@ -38,7 +38,7 @@ namespace edm { void endJob(); private: - void processOneEventAsync(WaitingTaskHolder h); + void processEventBatchAsync(WaitingTaskHolder h); ProductRegistry registry_; Source* source_; diff --git a/src/fwtest/bin/main.cc b/src/fwtest/bin/main.cc index b91b11d67..32572df99 100644 --- a/src/fwtest/bin/main.cc +++ b/src/fwtest/bin/main.cc @@ -18,7 +18,8 @@ namespace { "[--empty]\n\n" << "Options\n" << " --numberOfThreads Number of threads to use (default 1)\n" - << " --numberOfStreams Number of concurrent events (default 0=numberOfThreads)\n" + << " --numberOfStreams Number of concurrent batch of events (default 0=numberOfThreads)\n" + << " --batchEvents Number of events to process in a batch (default 1 for individual events)\n" << " --maxEvents Number of events to process (default -1 for all events in the input file)\n" << " --data Path to the 'data' directory (default 'data' in the directory of the executable)\n" << " --transfer Transfer results from GPU to CPU (default is to leave them on GPU)\n" @@ -33,6 +34,7 @@ int main(int argc, char** argv) { std::vector args(argv, argv + argc); int numberOfThreads = 1; int numberOfStreams = 0; + int batchEvents = 1; int maxEvents = -1; std::filesystem::path datadir; bool transfer = false; @@ -48,6 +50,9 @@ int main(int argc, char** argv) { } else if (*i == "--numberOfStreams") { ++i; numberOfStreams = std::stoi(*i); + } else if (*i == "--batchEvents") { + ++i; + batchEvents = std::stoul(*i); } else if (*i == "--maxEvents") { ++i; maxEvents = std::stoi(*i); @@ -82,14 +87,15 @@ int main(int argc, char** argv) { std::vector edmodules; std::vector esmodules; if (not empty) { - edmodules = {"TestProducer", "TestProducer3", "TestProducer2"}; + edmodules = { + "TestProducer", "TestProducerExternalWork", "TestBatchingProducer", "TestBatchingProducerExternalWork"}; esmodules = {"IntESProducer"}; if (transfer) { // add modules for transfer } } edm::EventProcessor processor( - maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); + batchEvents, maxEvents, numberOfStreams, std::move(edmodules), std::move(esmodules), datadir, validation); maxEvents = processor.maxEvents(); std::cout << "Processing " << maxEvents << " events, of which " << numberOfStreams << " concurrently, with " diff --git a/src/fwtest/plugin-Test1/IntESProducer.cc b/src/fwtest/plugin-Test/IntESProducer.cc similarity index 100% rename from src/fwtest/plugin-Test1/IntESProducer.cc rename to src/fwtest/plugin-Test/IntESProducer.cc diff --git a/src/fwtest/plugin-Test1/TestProducer.cc b/src/fwtest/plugin-Test/TestProducer.cc similarity index 100% rename from src/fwtest/plugin-Test1/TestProducer.cc rename to src/fwtest/plugin-Test/TestProducer.cc diff --git a/src/fwtest/plugin-Test/TestProducerExternalWork.cc b/src/fwtest/plugin-Test/TestProducerExternalWork.cc new file mode 100644 index 000000000..37ce351d3 --- /dev/null +++ b/src/fwtest/plugin-Test/TestProducerExternalWork.cc @@ -0,0 +1,68 @@ +#include +#include +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/PluginFactory.h" + +namespace { + std::atomic nevents = 0; +} + +using TestProducerExternalWorkAsyncState = std::future; + +class TestProducerExternalWork : public edm::EDProducerExternalWork { +public: + explicit TestProducerExternalWork(edm::ProductRegistry& reg); + +private: + void acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) override; + + void endJob() override; + + const edm::EDGetTokenT getToken_; +}; + +TestProducerExternalWork::TestProducerExternalWork(edm::ProductRegistry& reg) + : getToken_(reg.consumes()) {} + +void TestProducerExternalWork::acquire(edm::Event const& event, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { + auto const value = event.get(getToken_); + assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); + + state = std::async([holder = std::move(holder)]() mutable { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + holder.doneWaiting(); + return 42; + }); + +#ifndef FWTEST_SILENT + std::cout << "TestProducerExternalWork::acquire Event " << event.eventID() << " stream " << event.streamID() + << " value " << value << std::endl; +#endif +} + +void TestProducerExternalWork::produce(edm::Event& event, edm::EventSetup const& eventSetup, AsyncState& state) { +#ifndef FWTEST_SILENT + std::cout << "TestProducerExternalWork::produce Event " << event.eventID() << " stream " << event.streamID() + << " from future " << state.get() << std::endl; +#endif + ++nevents; +} + +void TestProducerExternalWork::endJob() { + std::cout << "TestProducerExternalWork::endJob processed " << nevents.load() << " events" << std::endl; +} + +DEFINE_FWK_MODULE(TestProducerExternalWork); diff --git a/src/fwtest/plugin-Test2/TestProducer2.cc b/src/fwtest/plugin-Test2/TestProducer2.cc deleted file mode 100644 index 6df7bdb0d..000000000 --- a/src/fwtest/plugin-Test2/TestProducer2.cc +++ /dev/null @@ -1,64 +0,0 @@ -#include -#include -#include -#include -#include - -#include "Framework/EDProducer.h" -#include "Framework/Event.h" -#include "Framework/PluginFactory.h" - -namespace { - std::atomic nevents = 0; -} - -class TestProducer2 : public edm::EDProducerExternalWork { -public: - explicit TestProducer2(edm::ProductRegistry& reg); - -private: - void acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) override; - void produce(edm::Event& event, edm::EventSetup const& eventSetup) override; - - void endJob() override; - - edm::EDGetTokenT getToken_; - std::future future_; -}; - -TestProducer2::TestProducer2(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} - -void TestProducer2::acquire(edm::Event const& event, - edm::EventSetup const& eventSetup, - edm::WaitingTaskWithArenaHolder holder) { - auto const value = event.get(getToken_); - assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); - - future_ = std::async([holder = std::move(holder)]() mutable { - using namespace std::chrono_literals; - std::this_thread::sleep_for(1s); - holder.doneWaiting(std::exception_ptr()); - return 42; - }); - -#ifndef FWTEST_SILENT - std::cout << "TestProducer2::acquire Event " << event.eventID() << " stream " << event.streamID() << " value " - << value << std::endl; -#endif -} - -void TestProducer2::produce(edm::Event& event, edm::EventSetup const& eventSetup) { -#ifndef FWTEST_SILENT - std::cout << "TestProducer2::produce Event " << event.eventID() << " stream " << event.streamID() << " from future " - << future_.get() << std::endl; -#endif - ++nevents; -} - -void TestProducer2::endJob() { - std::cout << "TestProducer2::endJob processed " << nevents.load() << " events" << std::endl; -} - -DEFINE_FWK_MODULE(TestProducer2); diff --git a/src/fwtest/plugin-Test2/TestProducer3.cc b/src/fwtest/plugin-Test2/TestProducer3.cc deleted file mode 100644 index ef8354058..000000000 --- a/src/fwtest/plugin-Test2/TestProducer3.cc +++ /dev/null @@ -1,29 +0,0 @@ -#include -#include -#include - -#include "Framework/EDProducer.h" -#include "Framework/Event.h" -#include "Framework/PluginFactory.h" - -class TestProducer3 : public edm::EDProducer { -public: - explicit TestProducer3(edm::ProductRegistry& reg); - -private: - void produce(edm::Event& event, edm::EventSetup const& eventSetup) override; - - edm::EDGetTokenT getToken_; -}; - -TestProducer3::TestProducer3(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} - -void TestProducer3::produce(edm::Event& event, edm::EventSetup const& eventSetup) { - auto const value = event.get(getToken_); -#ifndef FWTEST_SILENT - std::cout << "TestProducer3 Event " << event.eventID() << " stream " << event.streamID() << " value " << value - << std::endl; -#endif -} - -DEFINE_FWK_MODULE(TestProducer3); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc new file mode 100644 index 000000000..485929c0f --- /dev/null +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducer.cc @@ -0,0 +1,32 @@ +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/EventRange.h" +#include "Framework/PluginFactory.h" + +class TestBatchingProducer : public edm::EDBatchingProducer { +public: + explicit TestBatchingProducer(edm::ProductRegistry& reg); + +private: + void produce(edm::EventRange events, edm::EventSetup const& eventSetup) override; + + edm::EDGetTokenT getToken_; +}; + +TestBatchingProducer::TestBatchingProducer(edm::ProductRegistry& reg) : getToken_(reg.consumes()) {} + +void TestBatchingProducer::produce(edm::EventRange events, edm::EventSetup const& eventSetup) { + for (edm::Event& event : events) { + auto const value = event.get(getToken_); +#ifndef FWTEST_SILENT + std::cout << "TestBatchingProducer Event " << event.eventID() << " stream " << event.streamID() << " value " + << value << std::endl; +#endif + } +} + +DEFINE_FWK_MODULE(TestBatchingProducer); diff --git a/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc new file mode 100644 index 000000000..f76d87417 --- /dev/null +++ b/src/fwtest/plugin-TestBatching/TestBatchingProducerExternalWork.cc @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include +#include + +#include "Framework/EDProducer.h" +#include "Framework/Event.h" +#include "Framework/EventRange.h" +#include "Framework/PluginFactory.h" + +namespace { + std::atomic nevents = 0; +} + +// test using an std::map instead of a simpler std::vector +using TestBatchingProducerExternalWorkAsyncState = std::map>; + +class TestBatchingProducerExternalWork + : public edm::EDBatchingProducerExternalWork { +public: + explicit TestBatchingProducerExternalWork(edm::ProductRegistry& reg); + +private: + void acquire(edm::ConstEventRange events, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const override; + void produce(edm::EventRange events, edm::EventSetup const& eventSetup, AsyncState& state) override; + + void endJob() override; + + const edm::EDGetTokenT getToken_; +}; + +TestBatchingProducerExternalWork::TestBatchingProducerExternalWork(edm::ProductRegistry& reg) + : getToken_(reg.consumes()) {} + +void TestBatchingProducerExternalWork::acquire(edm::ConstEventRange events, + edm::EventSetup const& eventSetup, + edm::WaitingTaskWithArenaHolder holder, + AsyncState& state) const { + for (edm::Event const& event : events) { + auto const value = event.get(getToken_); + assert(value == static_cast(event.eventID() + 10 * event.streamID() + 100)); + + // cannot move form the holder as it is used more than once + state[event.eventID()] = std::async([holder]() mutable { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + holder.doneWaiting(); + return 42; + }); + +#ifndef FWTEST_SILENT + std::cout << "TestBatchingProducerExternalWork::acquire Event " << event.eventID() << " stream " << event.streamID() + << " value " << value << std::endl; +#endif + } +} + +void TestBatchingProducerExternalWork::produce(edm::EventRange events, + edm::EventSetup const& eventSetup, + AsyncState& state) { +#ifndef FWTEST_SILENT + for (edm::Event& event : events) { + std::cout << "TestBatchingProducerExternalWork::produce Event " << event.eventID() << " stream " << event.streamID() + << " from future " << state[event.eventID()].get() << std::endl; + } +#endif + ++nevents; +} + +void TestBatchingProducerExternalWork::endJob() { + std::cout << "TestBatchingProducerExternalWork::endJob processed " << nevents.load() << " events" << std::endl; +} + +DEFINE_FWK_MODULE(TestBatchingProducerExternalWork); diff --git a/src/fwtest/plugins.txt b/src/fwtest/plugins.txt index 42f92a9a4..f01f94ac3 100644 --- a/src/fwtest/plugins.txt +++ b/src/fwtest/plugins.txt @@ -1,4 +1,5 @@ -IntESProducer pluginTest1.so -TestProducer pluginTest1.so -TestProducer2 pluginTest2.so -TestProducer3 pluginTest2.so +IntESProducer pluginTest.so +TestProducer pluginTest.so +TestProducerExternalWork pluginTest.so +TestBatchingProducer pluginTestBatching.so +TestBatchingProducerExternalWork pluginTestBatching.so