Skip to content

Commit

Permalink
Allow a stream to skip a lumi if all events processed
Browse files Browse the repository at this point in the history
  • Loading branch information
wddgit committed Dec 7, 2023
1 parent 316ce5b commit 6c17d4e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ namespace edm {

bool shouldWeStop() const;

void tryToMakeOtherStreamsSkipLumi(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iStatus);

void setExceptionMessageFiles(std::string& message);
void setExceptionMessageRuns();
void setExceptionMessageLumis();
Expand Down
17 changes: 17 additions & 0 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,9 @@ namespace edm {
streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
if (!status->setStreamStartedLumi(i)) {
return;
}
streamQueues_[i].pause();

auto& event = principalCache_.eventPrincipal(i);
Expand Down Expand Up @@ -2256,6 +2259,7 @@ namespace edm {
// If appropriate, this will also start the next run.
endRunAsync(streamRunStatus_[iStreamIndex], iTask);
}
tryToMakeOtherStreamsSkipLumi(iStreamIndex, *status);
}
streamEndLumiAsync(iTask, iStreamIndex);
} else {
Expand Down Expand Up @@ -2386,6 +2390,19 @@ namespace edm {
return schedule_->terminate();
}

void EventProcessor::tryToMakeOtherStreamsSkipLumi(unsigned int iStreamIndex,
LuminosityBlockProcessingStatus& iStatus) {
for (unsigned int stream = 0; stream < preallocations_.numberOfStreams(); ++stream) {
if (stream == iStreamIndex) {
continue;
}
if (iStatus.setStreamStartedLumi(stream)) {
// This cannot be the last stream to end the lumi, but we must decrement the counter
iStatus.streamFinishedLumi();
}
}
}

void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; }

void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; }
Expand Down
13 changes: 13 additions & 0 deletions FWCore/Framework/src/LuminosityBlockProcessingStatus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
#include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"

namespace edm {

LuminosityBlockProcessingStatus::LuminosityBlockProcessingStatus(unsigned int iNStreams)
: streamStartedLumi_(iNStreams), nStreamsStillProcessingLumi_(iNStreams) {
for (auto& item : streamStartedLumi_) {
item.store(false);
}
}

void LuminosityBlockProcessingStatus::resetResources() {
endIOVWaitingTasks_.doneWaiting(std::exception_ptr{});
for (auto& iter : eventSetupImpls_) {
Expand All @@ -26,6 +34,11 @@ namespace edm {
globalEndRunHolder_ = std::move(holder);
}

bool LuminosityBlockProcessingStatus::setStreamStartedLumi(unsigned int iStream) {
bool expected = false;
return streamStartedLumi_.at(iStream).compare_exchange_strong(expected, true);
}

void LuminosityBlockProcessingStatus::setEndTime() {
constexpr char kUnset = 0;
constexpr char kSetting = 1;
Expand Down
5 changes: 4 additions & 1 deletion FWCore/Framework/src/LuminosityBlockProcessingStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace edm {

class LuminosityBlockProcessingStatus {
public:
LuminosityBlockProcessingStatus(unsigned int iNStreams) : nStreamsStillProcessingLumi_(iNStreams) {}
LuminosityBlockProcessingStatus(unsigned int iNStreams);

LuminosityBlockProcessingStatus(LuminosityBlockProcessingStatus const&) = delete;
LuminosityBlockProcessingStatus const& operator=(LuminosityBlockProcessingStatus const&) = delete;
Expand Down Expand Up @@ -70,6 +70,8 @@ namespace edm {
void setGlobalEndRunHolder(WaitingTaskHolder);
void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); }

bool setStreamStartedLumi(unsigned int iStream);

bool streamFinishedLumi() { return 0 == (--nStreamsStillProcessingLumi_); }

//These should only be called while in the InputSource's task queue
Expand Down Expand Up @@ -103,6 +105,7 @@ namespace edm {
std::vector<std::shared_ptr<const EventSetupImpl>> eventSetupImpls_;
WaitingTaskList endIOVWaitingTasks_;
edm::WaitingTaskHolder globalEndRunHolder_;
std::vector<std::atomic<bool>> streamStartedLumi_;
std::atomic<unsigned int> nStreamsStillProcessingLumi_{0}; //read/write as streams finish lumi so must be atomic
edm::Timestamp endTime_{};
std::atomic<char> endTimeSetStatus_{0};
Expand Down

0 comments on commit 6c17d4e

Please sign in to comment.