Skip to content

Commit

Permalink
- check throttling also in the output module on each event.
Browse files Browse the repository at this point in the history
This reduces the chance to overshoot and fill the disk even if check is
done periodically in the input source.
- updated unit tests to use new global evf output module
  • Loading branch information
smorovic committed Jan 31, 2022
1 parent a0bdd3d commit 8694d32
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 2 deletions.
14 changes: 14 additions & 0 deletions EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "FWCore/Framework/interface/RunForOutput.h"
#include "FWCore/Framework/interface/LuminosityBlockForOutput.h"
#include "FWCore/Framework/interface/LuminosityBlock.h"
#include "FWCore/Utilities/interface/UnixSignalHandlers.h"

#include "FWCore/Concurrency/interface/SerialTaskQueue.h"

Expand Down Expand Up @@ -56,6 +57,7 @@ namespace evf {
}

void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
throttledCheck();
auto group = iHolder.group();
writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
try {
Expand All @@ -68,6 +70,18 @@ namespace evf {
});
}

inline void throttledCheck() {
unsigned int counter = 0;
while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
if (edm::shutdown_flag.load(std::memory_order_relaxed))
break;
if (!(counter % 100))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
usleep(100000);
counter++;
}
}

uint32 get_adler32() const { return stream_writer_events_->adler32(); }

std::string const& getFilePath() const { return filePath_; }
Expand Down
11 changes: 11 additions & 0 deletions EventFilter/Utilities/src/EvFOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "IOPool/Streamer/interface/InitMsgBuilder.h"
#include "IOPool/Streamer/interface/EventMsgBuilder.h"
#include "FWCore/Utilities/interface/UnixSignalHandlers.h"

#include <sys/stat.h>
#include <filesystem>
Expand Down Expand Up @@ -225,6 +226,16 @@ namespace evf {
}

void EvFOutputModule::write(edm::EventForOutput const& e) {
unsigned int counter = 0;
while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
if (edm::shutdown_flag.load(std::memory_order_relaxed))
break;
if (!(counter % 100))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
usleep(100000);
counter++;
}

edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);

//auto lumiWriter = const_cast<EvFOutputEventWriter*>(luminosityBlockCache(e.getLuminosityBlock().index() ));
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/test/startFU.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' ))
)

process.streamC = cms.OutputModule("ShmStreamConsumer",
process.streamC = cms.OutputModule("GlobalEvFOutputModule",
SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' ))
)

Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/test/unittest_FU.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' ))
)

process.streamC = cms.OutputModule("ShmStreamConsumer",
process.streamC = cms.OutputModule("GlobalEvFOutputModule",
SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' ))
)

Expand Down

0 comments on commit 8694d32

Please sign in to comment.