Skip to content

Commit

Permalink
Allow stream to skip lumi, ConcurrentGeneratorFilter and ConcurrentHa…
Browse files Browse the repository at this point in the history
…dronizerFilter
  • Loading branch information
wddgit committed Jan 29, 2024
1 parent 203834e commit e1ace0a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
19 changes: 10 additions & 9 deletions GeneratorInterface/Core/interface/ConcurrentGeneratorFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ namespace edm {
HAD hadronizer_;
std::unique_ptr<DEC> decayer_;
unsigned int nEventsInLumiBlock_;
unsigned long long nStreamEndLumis_{0};
bool initialized_ = false;
};
template <typename HAD, typename DEC>
struct GenLumiCache {
gen::GenStreamCache<HAD, DEC>* useInLumi_{nullptr};
unsigned long long nGlobalBeginLumis_{0};
};
} // namespace gen

Expand Down Expand Up @@ -158,12 +158,13 @@ namespace edm {
void initLumi(gen::GenStreamCache<HAD, DEC>* cache, LuminosityBlock const& index, EventSetup const& es) const;
ParameterSet config_;
mutable std::atomic<gen::GenStreamCache<HAD, DEC>*> useInLumi_{nullptr};
mutable std::atomic<unsigned long long> greatestNStreamEndLumis_{0};
mutable std::atomic<unsigned long long> nextNGlobalBeginLumis_{1};
mutable std::atomic<bool> streamEndRunComplete_{true};
// The next two data members are thread safe and can be safely mutable because
// they are only modified/read in globalBeginRun and globalBeginLuminosityBlock.
mutable unsigned long long nGlobalBeginRuns_{0};
mutable unsigned long long nInitializedInGlobalLumiAfterNewRun_{0};
mutable unsigned long long nGlobalBeginLumis_{0};
};

//------------------------------------------------------------------------
Expand Down Expand Up @@ -358,6 +359,8 @@ namespace edm {
while (useInLumi_.load() == nullptr) {
}

++nGlobalBeginLumis_;

// streamEndRun also uses the hadronizer in the stream cache
// so we also need to wait for it to finish if there is a new run
if (nInitializedInGlobalLumiAfterNewRun_ < nGlobalBeginRuns_) {
Expand All @@ -368,6 +371,7 @@ namespace edm {

auto lumiCache = std::make_shared<gen::GenLumiCache<HAD, DEC>>();
lumiCache->useInLumi_ = useInLumi_.load();
lumiCache->nGlobalBeginLumis_ = nGlobalBeginLumis_;
return lumiCache;
}

Expand Down Expand Up @@ -399,7 +403,7 @@ namespace edm {

template <class HAD, class DEC>
void ConcurrentGeneratorFilter<HAD, DEC>::streamEndLuminosityBlockSummary(StreamID id,
LuminosityBlock const&,
LuminosityBlock const& lumi,
EventSetup const&,
gen::GenLumiSummary* iSummary) const {
auto cache = this->streamCache(id);
Expand Down Expand Up @@ -437,13 +441,10 @@ namespace edm {

cache->nEventsInLumiBlock_ = 0;

// The next section of code depends on the Framework behavior that the stream
// lumi transitions are executed for all streams for every lumi even when
// there are no events for a stream to process.
gen::GenStreamCache<HAD, DEC>* streamCachePtr = this->streamCache(id);
unsigned long long expected = streamCachePtr->nStreamEndLumis_;
++streamCachePtr->nStreamEndLumis_;
if (greatestNStreamEndLumis_.compare_exchange_strong(expected, streamCachePtr->nStreamEndLumis_)) {
unsigned long long expected = this->luminosityBlockCache(lumi.index())->nGlobalBeginLumis_;
unsigned long long nextValue = expected + 1;
if (nextNGlobalBeginLumis_.compare_exchange_strong(expected, nextValue)) {
streamEndRunComplete_ = false;
useInLumi_ = streamCachePtr;
}
Expand Down
19 changes: 10 additions & 9 deletions GeneratorInterface/Core/interface/ConcurrentHadronizerFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ namespace edm {
std::unique_ptr<DEC> decayer_;
std::unique_ptr<HepMCFilterDriver> filter_;
unsigned long long nInitializedWithLHERunInfo_{0};
unsigned long long nStreamEndLumis_{0};
bool initialized_ = false;
};
template <typename HAD, typename DEC>
struct LumiCache {
gen::StreamCache<HAD, DEC>* useInLumi_{nullptr};
unsigned long long nGlobalBeginRuns_{0};
unsigned long long nGlobalBeginLumis_{0};
};
} // namespace gen

Expand Down Expand Up @@ -167,12 +167,13 @@ namespace edm {
unsigned int counterRunInfoProducts_;
unsigned int nAttempts_;
mutable std::atomic<gen::StreamCache<HAD, DEC>*> useInLumi_{nullptr};
mutable std::atomic<unsigned long long> greatestNStreamEndLumis_{0};
mutable std::atomic<unsigned long long> nextNGlobalBeginLumis_{1};
mutable std::atomic<bool> streamEndRunComplete_{true};
// The next two data members are thread safe and can be safely mutable because
// they are only modified/read in globalBeginRun and globalBeginLuminosityBlock.
mutable unsigned long long nGlobalBeginRuns_{0};
mutable unsigned long long nInitializedWithLHERunInfo_{0};
mutable unsigned long long nGlobalBeginLumis_{0};
bool const hasFilter_;
};

Expand Down Expand Up @@ -515,6 +516,8 @@ namespace edm {
while (useInLumi_.load() == nullptr) {
}

++nGlobalBeginLumis_;

// streamEndRun also uses the hadronizer in the stream cache
// so we also need to wait for it to finish if there is a new run
if (nInitializedWithLHERunInfo_ < nGlobalBeginRuns_) {
Expand All @@ -527,6 +530,7 @@ namespace edm {
auto lumiCache = std::make_shared<gen::LumiCache<HAD, DEC>>();
lumiCache->useInLumi_ = useInLumi_.load();
lumiCache->nGlobalBeginRuns_ = nGlobalBeginRuns_;
lumiCache->nGlobalBeginLumis_ = nGlobalBeginLumis_;
return lumiCache;
}

Expand All @@ -541,7 +545,7 @@ namespace edm {

template <class HAD, class DEC>
void ConcurrentHadronizerFilter<HAD, DEC>::streamEndLuminosityBlockSummary(StreamID id,
LuminosityBlock const&,
LuminosityBlock const& lumi,
EventSetup const&,
gen::LumiSummary* iSummary) const {
const lhef::LHERunInfo* lheRunInfo = this->streamCache(id)->hadronizer_.getLHERunInfo().get();
Expand Down Expand Up @@ -594,13 +598,10 @@ namespace edm {
}
}

// The next section of code depends on the Framework behavior that the stream
// lumi transitions are executed for all streams for every lumi even when
// there are no events for a stream to process.
gen::StreamCache<HAD, DEC>* streamCachePtr = this->streamCache(id);
unsigned long long expected = streamCachePtr->nStreamEndLumis_;
++streamCachePtr->nStreamEndLumis_;
if (greatestNStreamEndLumis_.compare_exchange_strong(expected, streamCachePtr->nStreamEndLumis_)) {
unsigned long long expected = this->luminosityBlockCache(lumi.index())->nGlobalBeginLumis_;
unsigned long long nextValue = expected + 1;
if (nextNGlobalBeginLumis_.compare_exchange_strong(expected, nextValue)) {
streamEndRunComplete_ = false;
useInLumi_ = streamCachePtr;
}
Expand Down

0 comments on commit e1ace0a

Please sign in to comment.