diff --git a/FWCore/Framework/interface/EventProcessor.h b/FWCore/Framework/interface/EventProcessor.h index 7740f0afbd2b2..c7cf9b2ed83f7 100644 --- a/FWCore/Framework/interface/EventProcessor.h +++ b/FWCore/Framework/interface/EventProcessor.h @@ -241,6 +241,8 @@ namespace edm { bool shouldWeStop() const; + void tryToMakeOtherStreamsSkipLumi(unsigned int iStreamIndex, LuminosityBlockProcessingStatus& iStatus); + void setExceptionMessageFiles(std::string& message); void setExceptionMessageRuns(); void setExceptionMessageLumis(); diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 76de43cb7a500..ccaccb12efd87 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1732,6 +1732,9 @@ namespace edm { streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable { for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) { streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable { + if (!status->setStreamStartedLumi(i)) { + return; + } streamQueues_[i].pause(); auto& event = principalCache_.eventPrincipal(i); @@ -2256,6 +2259,7 @@ namespace edm { // If appropriate, this will also start the next run. endRunAsync(streamRunStatus_[iStreamIndex], iTask); } + tryToMakeOtherStreamsSkipLumi(iStreamIndex, *status); } streamEndLumiAsync(iTask, iStreamIndex); } else { @@ -2386,6 +2390,19 @@ namespace edm { return schedule_->terminate(); } + void EventProcessor::tryToMakeOtherStreamsSkipLumi(unsigned int iStreamIndex, + LuminosityBlockProcessingStatus& iStatus) { + for (unsigned int stream = 0; stream < preallocations_.numberOfStreams(); ++stream) { + if (stream == iStreamIndex) { + continue; + } + if (iStatus.setStreamStartedLumi(stream)) { + // This cannot be the last stream to end the lumi, but we must decrement the counter + iStatus.streamFinishedLumi(); + } + } + } + void EventProcessor::setExceptionMessageFiles(std::string& message) { exceptionMessageFiles_ = message; } void EventProcessor::setExceptionMessageRuns() { exceptionMessageRuns_ = true; } diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc index 1b77c83f35e88..a808bfa7757b9 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc @@ -14,6 +14,14 @@ #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" namespace edm { + + LuminosityBlockProcessingStatus::LuminosityBlockProcessingStatus(unsigned int iNStreams) + : streamStartedLumi_(iNStreams), nStreamsStillProcessingLumi_(iNStreams) { + for (auto& item : streamStartedLumi_) { + item.store(false); + } + } + void LuminosityBlockProcessingStatus::resetResources() { endIOVWaitingTasks_.doneWaiting(std::exception_ptr{}); for (auto& iter : eventSetupImpls_) { @@ -26,6 +34,11 @@ namespace edm { globalEndRunHolder_ = std::move(holder); } + bool LuminosityBlockProcessingStatus::setStreamStartedLumi(unsigned int iStream) { + bool expected = false; + return streamStartedLumi_.at(iStream).compare_exchange_strong(expected, true); + } + void LuminosityBlockProcessingStatus::setEndTime() { constexpr char kUnset = 0; constexpr char kSetting = 1; diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h index b35b4c5258049..ad0f030bdd1bc 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h @@ -39,7 +39,7 @@ namespace edm { class LuminosityBlockProcessingStatus { public: - LuminosityBlockProcessingStatus(unsigned int iNStreams) : nStreamsStillProcessingLumi_(iNStreams) {} + LuminosityBlockProcessingStatus(unsigned int iNStreams); LuminosityBlockProcessingStatus(LuminosityBlockProcessingStatus const&) = delete; LuminosityBlockProcessingStatus const& operator=(LuminosityBlockProcessingStatus const&) = delete; @@ -70,6 +70,8 @@ namespace edm { void setGlobalEndRunHolder(WaitingTaskHolder); void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); } + bool setStreamStartedLumi(unsigned int iStream); + bool streamFinishedLumi() { return 0 == (--nStreamsStillProcessingLumi_); } //These should only be called while in the InputSource's task queue @@ -103,6 +105,7 @@ namespace edm { std::vector> eventSetupImpls_; WaitingTaskList endIOVWaitingTasks_; edm::WaitingTaskHolder globalEndRunHolder_; + std::vector> streamStartedLumi_; std::atomic nStreamsStillProcessingLumi_{0}; //read/write as streams finish lumi so must be atomic edm::Timestamp endTime_{}; std::atomic endTimeSetStatus_{0};