Skip to content

Commit

Permalink
Merge pull request #43522 from wddgit/allowStreamsToSkipLumis
Browse files Browse the repository at this point in the history
Allow a stream to skip a lumi if all events processed
  • Loading branch information
cmsbuild authored Mar 14, 2024
2 parents aff4c51 + f89e621 commit 8404f1a
Show file tree
Hide file tree
Showing 17 changed files with 399 additions and 71 deletions.
7 changes: 6 additions & 1 deletion FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,7 @@ namespace edm {
edm::WaitingTaskHolder iHolder) {
actReg_->esSyncIOVQueuingSignal_.emit(iSync);

auto status = std::make_shared<LuminosityBlockProcessingStatus>(preallocations_.numberOfStreams());
auto status = std::make_shared<LuminosityBlockProcessingStatus>();
chain::first([this, &iSync, &status](auto nextTask) {
espController_->runOrQueueEventSetupForInstanceAsync(iSync,
nextTask,
Expand Down Expand Up @@ -1767,6 +1767,9 @@ namespace edm {
streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
if (!status->shouldStreamStartLumi()) {
return;
}
streamQueues_[i].pause();

auto& event = principalCache_.eventPrincipal(i);
Expand Down Expand Up @@ -1979,6 +1982,7 @@ namespace edm {
if (streamLumiActive_ > 0) {
FinalWaitingTask globalWaitTask{taskGroup_};
assert(streamLumiActive_ == preallocations_.numberOfStreams());
streamLumiStatus_[0]->noMoreEventsInLumi();
streamLumiStatus_[0]->setCleaningUpAfterException(cleaningUpAfterException);
for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) {
streamEndLumiAsync(WaitingTaskHolder{taskGroup_, &globalWaitTask}, i);
Expand Down Expand Up @@ -2305,6 +2309,7 @@ namespace edm {
// the stream will stop processing this lumi now
if (status->eventProcessingState() == LuminosityBlockProcessingStatus::EventProcessingState::kStopLumi) {
if (not status->haveStartedNextLumiOrEndedRun()) {
status->noMoreEventsInLumi();
status->startNextLumiOrEndRun();
if (lastTransitionType() == InputSource::ItemType::IsLumi && !iTask.taskHasFailed()) {
CMS_SA_ALLOW try {
Expand Down
28 changes: 28 additions & 0 deletions FWCore/Framework/src/LuminosityBlockProcessingStatus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,34 @@ namespace edm {
globalEndRunHolder_ = std::move(holder);
}

bool LuminosityBlockProcessingStatus::shouldStreamStartLumi() {
if (state_ == State::kNoMoreEvents)
return false;

bool changed = false;
do {
auto expected = State::kRunning;
changed = state_.compare_exchange_strong(expected, State::kUpdating);
if (expected == State::kNoMoreEvents)
return false;
} while (changed == false);

++nStreamsProcessingLumi_;
state_ = State::kRunning;
return true;
}

void LuminosityBlockProcessingStatus::noMoreEventsInLumi() {
bool changed = false;
do {
auto expected = State::kRunning;
changed = state_.compare_exchange_strong(expected, State::kUpdating);
assert(expected != State::kNoMoreEvents);
} while (changed == false);
nStreamsStillProcessingLumi_.store(nStreamsProcessingLumi_);
state_ = State::kNoMoreEvents;
}

void LuminosityBlockProcessingStatus::setEndTime() {
constexpr char kUnset = 0;
constexpr char kSetting = 1;
Expand Down
11 changes: 8 additions & 3 deletions FWCore/Framework/src/LuminosityBlockProcessingStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace edm {

class LuminosityBlockProcessingStatus {
public:
LuminosityBlockProcessingStatus(unsigned int iNStreams) : nStreamsStillProcessingLumi_(iNStreams) {}
LuminosityBlockProcessingStatus() = default;

LuminosityBlockProcessingStatus(LuminosityBlockProcessingStatus const&) = delete;
LuminosityBlockProcessingStatus const& operator=(LuminosityBlockProcessingStatus const&) = delete;
Expand Down Expand Up @@ -70,6 +70,8 @@ namespace edm {
void setGlobalEndRunHolder(WaitingTaskHolder);
void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); }

bool shouldStreamStartLumi();
void noMoreEventsInLumi();
bool streamFinishedLumi() { return 0 == (--nStreamsStillProcessingLumi_); }

//These should only be called while in the InputSource's task queue
Expand Down Expand Up @@ -103,10 +105,13 @@ namespace edm {
std::vector<std::shared_ptr<const EventSetupImpl>> eventSetupImpls_;
WaitingTaskList endIOVWaitingTasks_;
edm::WaitingTaskHolder globalEndRunHolder_;
std::atomic<unsigned int> nStreamsStillProcessingLumi_{0}; //read/write as streams finish lumi so must be atomic
edm::Timestamp endTime_{};
std::atomic<char> endTimeSetStatus_{0};
CMS_THREAD_GUARD(state_) unsigned int nStreamsProcessingLumi_{0};
std::atomic<unsigned int> nStreamsStillProcessingLumi_{0};
enum class State { kRunning, kUpdating, kNoMoreEvents };
std::atomic<State> state_{State::kRunning};
EventProcessingState eventProcessingState_{EventProcessingState::kProcessing};
std::atomic<char> endTimeSetStatus_{0};
std::atomic<bool> startedNextLumiOrEndedRun_{false};
bool globalBeginSucceeded_{false};
bool cleaningUpAfterException_{false};
Expand Down
16 changes: 8 additions & 8 deletions FWCore/Framework/test/modules_2_concurrent_lumis_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,45 @@
iterations = cms.uint32(50*1000*20) )

process.LumiSumIntProd = cms.EDProducer("edmtest::global::LumiSummaryIntProducer",
transitions = cms.int32(60)
transitions = cms.int32(44)
,cachevalue = cms.int32(2)
)

process.LumiSumLumiProd = cms.EDProducer("edmtest::global::LumiSummaryLumiProducer",
transitions = cms.int32(70)
transitions = cms.int32(34)
,cachevalue = cms.int32(2)
)

process.LumiSumIntFilter = cms.EDFilter("edmtest::global::LumiSummaryIntFilter",
transitions = cms.int32(84)
transitions = cms.int32(44)
,cachevalue = cms.int32(2)
)

process.LumiSumIntAnalyzer = cms.EDAnalyzer("edmtest::global::LumiSummaryIntAnalyzer",
transitions = cms.int32(84)
transitions = cms.int32(44)
,cachevalue = cms.int32(2)
)

process.LimitedLumiSumIntProd = cms.EDProducer("edmtest::limited::LumiSummaryIntProducer",
transitions = cms.int32(60)
transitions = cms.int32(44)
,cachevalue = cms.int32(2)
,concurrencyLimit = cms.untracked.uint32(1)
)

process.LimitedLumiSumLumiProd = cms.EDProducer("edmtest::limited::LumiSummaryLumiProducer",
transitions = cms.int32(70)
transitions = cms.int32(34)
,cachevalue = cms.int32(2)
,concurrencyLimit = cms.untracked.uint32(1)
)

process.LimitedLumiSumIntFilter = cms.EDFilter("edmtest::limited::LumiSummaryIntFilter",
transitions = cms.int32(84)
transitions = cms.int32(44)
,cachevalue = cms.int32(2)
,concurrencyLimit = cms.untracked.uint32(1)
)

process.LimitedLumiSumIntAnalyzer = cms.EDAnalyzer("edmtest::limited::LumiSummaryIntAnalyzer",
transitions = cms.int32(84)
transitions = cms.int32(44)
,cachevalue = cms.int32(2)
,concurrencyLimit = cms.untracked.uint32(1)
)
Expand Down
40 changes: 36 additions & 4 deletions FWCore/Framework/test/stubs/TestGlobalAnalyzers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ namespace edmtest {

class StreamIntAnalyzer : public edm::global::EDAnalyzer<edm::StreamCache<UnsafeCache>> {
public:
explicit StreamIntAnalyzer(edm::ParameterSet const& p) : trans_(p.getParameter<int>("transitions")) {
explicit StreamIntAnalyzer(edm::ParameterSet const& p)
: trans_(p.getParameter<int>("transitions")), nLumis_(p.getUntrackedParameter<unsigned int>("nLumis", 1)) {
bool verbose = p.getUntrackedParameter<bool>("verbose", true);
callWhenNewProductsRegistered([verbose](edm::BranchDescription const& desc) {
if (verbose) {
Expand All @@ -58,10 +59,15 @@ namespace edmtest {
});
}
const unsigned int trans_;
const unsigned int nLumis_;
mutable std::atomic<unsigned int> m_count{0};
mutable std::atomic<unsigned int> m_countStreams{0};
mutable std::atomic<unsigned int> m_countStreamBeginLumiTransitions{0};
mutable std::atomic<unsigned int> m_countStreamEndLumiTransitions{0};

std::unique_ptr<UnsafeCache> beginStream(edm::StreamID iID) const override {
++m_count;
++m_countStreams;
auto pCache = std::make_unique<UnsafeCache>();
pCache->value = iID.value();
return pCache;
Expand All @@ -78,7 +84,7 @@ namespace edmtest {
void streamBeginLuminosityBlock(edm::StreamID iID,
edm::LuminosityBlock const&,
edm::EventSetup const&) const override {
++m_count;
++m_countStreamBeginLumiTransitions;
if ((streamCache(iID))->value != iID.value()) {
throw cms::Exception("cache value")
<< "StreamIntAnalyzer cache value " << (streamCache(iID))->value << " but it was supposed to be " << iID;
Expand All @@ -96,7 +102,7 @@ namespace edmtest {
void streamEndLuminosityBlock(edm::StreamID iID,
edm::LuminosityBlock const&,
edm::EventSetup const&) const override {
++m_count;
++m_countStreamEndLumiTransitions;
if ((streamCache(iID))->value != iID.value()) {
throw cms::Exception("cache value")
<< "StreamIntAnalyzer cache value " << (streamCache(iID))->value << " but it was supposed to be " << iID;
Expand Down Expand Up @@ -124,6 +130,19 @@ namespace edmtest {
throw cms::Exception("transitions")
<< "StreamIntAnalyzer transitions " << m_count << " but it was supposed to be " << trans_;
}
unsigned int nStreamBeginLumiTransitions = m_countStreamBeginLumiTransitions.load();
unsigned int nStreamEndLumiTransitions = m_countStreamEndLumiTransitions.load();
unsigned int nStreams = m_countStreams.load();
if (nStreamBeginLumiTransitions < nLumis_ || nStreamBeginLumiTransitions > (nLumis_ * nStreams)) {
throw cms::Exception("transitions")
<< "StreamIntAnalyzer stream begin lumi transitions " << nStreamBeginLumiTransitions
<< " but it was supposed to be between " << nLumis_ << " and " << nLumis_ * nStreams;
}
if (nStreamEndLumiTransitions != nStreamBeginLumiTransitions) {
throw cms::Exception("transitions")
<< "StreamIntAnalyzer stream end lumi transitions " << nStreamEndLumiTransitions
<< " does not equal stream begin lumi transitions " << nStreamBeginLumiTransitions;
}
}
};

Expand Down Expand Up @@ -260,15 +279,20 @@ namespace edmtest {
const unsigned int trans_;
const unsigned int cvalue_;
mutable std::atomic<unsigned int> m_count{0};
mutable std::atomic<unsigned int> m_countLumis{0};
mutable std::atomic<unsigned int> m_countStreams{0};
mutable std::atomic<unsigned int> m_countStreamLumiTransitions{0};

std::unique_ptr<UnsafeCache> beginStream(edm::StreamID) const override {
++m_count;
++m_countStreams;
return std::make_unique<UnsafeCache>();
}

std::shared_ptr<UnsafeCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const& iLB,
edm::EventSetup const&) const override {
++m_count;
++m_countLumis;
auto gCache = std::make_shared<UnsafeCache>();
gCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
return gCache;
Expand All @@ -283,7 +307,7 @@ namespace edmtest {
edm::LuminosityBlock const& iLB,
edm::EventSetup const&,
UnsafeCache* gCache) const override {
++m_count;
++m_countStreamLumiTransitions;
if (gCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
throw cms::Exception("UnexpectedValue")
<< "streamEndLuminosityBlockSummary unexpected lumi number in Stream " << iID.value();
Expand All @@ -310,6 +334,14 @@ namespace edmtest {
throw cms::Exception("transitions")
<< "LumiSummaryIntAnalyzer transitions " << m_count << " but it was supposed to be " << trans_;
}
unsigned int nStreamLumiTransitions = m_countStreamLumiTransitions.load();
unsigned int nLumis = m_countLumis.load();
unsigned int nStreams = m_countStreams.load();
if (nStreamLumiTransitions < nLumis || nStreamLumiTransitions > (nLumis * nStreams)) {
throw cms::Exception("transitions")
<< "LumiSummaryIntAnalyzer stream lumi transitions " << nStreamLumiTransitions
<< " but it was supposed to be between " << nLumis << " and " << nLumis * nStreams;
}
}
};

Expand Down
40 changes: 36 additions & 4 deletions FWCore/Framework/test/stubs/TestGlobalFilters.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,21 @@ namespace edmtest {

class StreamIntFilter : public edm::global::EDFilter<edm::StreamCache<UnsafeCache>> {
public:
explicit StreamIntFilter(edm::ParameterSet const& p) : trans_(p.getParameter<int>("transitions")) {
explicit StreamIntFilter(edm::ParameterSet const& p)
: trans_(p.getParameter<int>("transitions")), nLumis_(p.getUntrackedParameter<unsigned int>("nLumis", 1)) {
produces<unsigned int>();
}

const unsigned int trans_;
const unsigned int nLumis_;
mutable std::atomic<unsigned int> m_count{0};
mutable std::atomic<unsigned int> m_countStreams{0};
mutable std::atomic<unsigned int> m_countStreamBeginLumiTransitions{0};
mutable std::atomic<unsigned int> m_countStreamEndLumiTransitions{0};

std::unique_ptr<UnsafeCache> beginStream(edm::StreamID iID) const override {
++m_count;
++m_countStreams;
auto sCache = std::make_unique<UnsafeCache>();
++(sCache->strm);
sCache->value = iID.value();
Expand All @@ -85,7 +91,7 @@ namespace edmtest {
void streamBeginLuminosityBlock(edm::StreamID iID,
edm::LuminosityBlock const&,
edm::EventSetup const&) const override {
++m_count;
++m_countStreamBeginLumiTransitions;
auto sCache = streamCache(iID);
if (sCache->value != iID.value()) {
throw cms::Exception("cache value") << (streamCache(iID))->value << " but it was supposed to be " << iID;
Expand Down Expand Up @@ -114,7 +120,7 @@ namespace edmtest {
void streamEndLuminosityBlock(edm::StreamID iID,
edm::LuminosityBlock const&,
edm::EventSetup const&) const override {
++m_count;
++m_countStreamEndLumiTransitions;
auto sCache = streamCache(iID);
if (sCache->value != iID.value()) {
throw cms::Exception("cache value") << (streamCache(iID))->value << " but it was supposed to be " << iID;
Expand Down Expand Up @@ -157,6 +163,19 @@ namespace edmtest {
throw cms::Exception("transitions")
<< "StreamIntFilter transitions " << m_count << " but it was supposed to be " << trans_;
}
unsigned int nStreamBeginLumiTransitions = m_countStreamBeginLumiTransitions.load();
unsigned int nStreamEndLumiTransitions = m_countStreamEndLumiTransitions.load();
unsigned int nStreams = m_countStreams.load();
if (nStreamBeginLumiTransitions < nLumis_ || nStreamBeginLumiTransitions > (nLumis_ * nStreams)) {
throw cms::Exception("transitions")
<< "StreamIntFilter stream begin lumi transitions " << nStreamBeginLumiTransitions
<< " but it was supposed to be between " << nLumis_ << " and " << nLumis_ * nStreams;
}
if (nStreamEndLumiTransitions != nStreamBeginLumiTransitions) {
throw cms::Exception("transitions")
<< "StreamIntFilter stream end lumi transitions " << nStreamEndLumiTransitions
<< " does not equal stream begin lumi transitions " << nStreamBeginLumiTransitions;
}
}
};

Expand Down Expand Up @@ -364,15 +383,20 @@ namespace edmtest {
const unsigned int trans_;
const unsigned int cvalue_;
mutable std::atomic<unsigned int> m_count{0};
mutable std::atomic<unsigned int> m_countLumis{0};
mutable std::atomic<unsigned int> m_countStreams{0};
mutable std::atomic<unsigned int> m_countStreamLumiTransitions{0};

std::unique_ptr<Cache> beginStream(edm::StreamID) const override {
++m_count;
++m_countStreams;
return std::make_unique<Cache>();
}

std::shared_ptr<UnsafeCache> globalBeginLuminosityBlockSummary(edm::LuminosityBlock const& iLB,
edm::EventSetup const&) const override {
++m_count;
++m_countLumis;
auto gCache = std::make_shared<UnsafeCache>();
gCache->lumi = iLB.luminosityBlockAuxiliary().luminosityBlock();
return gCache;
Expand All @@ -388,7 +412,7 @@ namespace edmtest {
edm::LuminosityBlock const& iLB,
edm::EventSetup const&,
UnsafeCache* gCache) const override {
++m_count;
++m_countStreamLumiTransitions;
if (gCache->lumi != iLB.luminosityBlockAuxiliary().luminosityBlock()) {
throw cms::Exception("out of sequence")
<< "streamEndLuminosityBlockSummary unexpected lumi number in Stream " << iID.value();
Expand Down Expand Up @@ -416,6 +440,14 @@ namespace edmtest {
throw cms::Exception("transitions")
<< "LumiSummaryIntFilter transitions " << m_count << " but it was supposed to be " << trans_;
}
unsigned int nStreamLumiTransitions = m_countStreamLumiTransitions.load();
unsigned int nLumis = m_countLumis.load();
unsigned int nStreams = m_countStreams.load();
if (nStreamLumiTransitions < nLumis || nStreamLumiTransitions > (nLumis * nStreams)) {
throw cms::Exception("transitions")
<< "LumiSummaryIntFilter stream lumi transitions " << nStreamLumiTransitions
<< " but it was supposed to be between " << nLumis << " and " << nLumis * nStreams;
}
}
};

Expand Down
Loading

0 comments on commit 8404f1a

Please sign in to comment.