diff --git a/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc b/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc index ab04ed08f2ed8..2cdef3f00b30b 100644 --- a/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc +++ b/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc @@ -12,6 +12,7 @@ #include "FWCore/Framework/interface/RunForOutput.h" #include "FWCore/Framework/interface/LuminosityBlockForOutput.h" #include "FWCore/Framework/interface/LuminosityBlock.h" +#include "FWCore/Utilities/interface/UnixSignalHandlers.h" #include "FWCore/Concurrency/interface/SerialTaskQueue.h" @@ -56,6 +57,7 @@ namespace evf { } void doOutputEventAsync(std::unique_ptr msg, edm::WaitingTaskHolder iHolder) { + throttledCheck(); auto group = iHolder.group(); writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() { try { @@ -68,6 +70,18 @@ namespace evf { }); } + inline void throttledCheck() { + unsigned int counter = 0; + while (edm::Service()->inputThrottled()) { + if (edm::shutdown_flag.load(std::memory_order_relaxed)) + break; + if (!(counter % 100)) + edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused..."; + usleep(100000); + counter++; + } + } + uint32 get_adler32() const { return stream_writer_events_->adler32(); } std::string const& getFilePath() const { return filePath_; } diff --git a/EventFilter/Utilities/src/EvFOutputModule.cc b/EventFilter/Utilities/src/EvFOutputModule.cc index 1d6d8a7311416..48a3fe9b61ec4 100644 --- a/EventFilter/Utilities/src/EvFOutputModule.cc +++ b/EventFilter/Utilities/src/EvFOutputModule.cc @@ -16,6 +16,7 @@ #include "IOPool/Streamer/interface/InitMsgBuilder.h" #include "IOPool/Streamer/interface/EventMsgBuilder.h" +#include "FWCore/Utilities/interface/UnixSignalHandlers.h" #include #include @@ -225,6 +226,16 @@ namespace evf { } void EvFOutputModule::write(edm::EventForOutput const& e) { + unsigned int counter = 0; + while (edm::Service()->inputThrottled()) { + if (edm::shutdown_flag.load(std::memory_order_relaxed)) + break; + if (!(counter % 100)) + edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused..."; + usleep(100000); + counter++; + } + edm::Handle const& triggerResults = getTriggerResults(trToken_, e); //auto lumiWriter = const_cast(luminosityBlockCache(e.getLuminosityBlock().index() )); diff --git a/EventFilter/Utilities/test/startFU.py b/EventFilter/Utilities/test/startFU.py index 86d09e09b615a..9585a4b7ee856 100644 --- a/EventFilter/Utilities/test/startFU.py +++ b/EventFilter/Utilities/test/startFU.py @@ -136,7 +136,7 @@ SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) ) -process.streamC = cms.OutputModule("ShmStreamConsumer", +process.streamC = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) ) diff --git a/EventFilter/Utilities/test/unittest_FU.py b/EventFilter/Utilities/test/unittest_FU.py index f16b1f1f47167..6cd547f2b5759 100644 --- a/EventFilter/Utilities/test/unittest_FU.py +++ b/EventFilter/Utilities/test/unittest_FU.py @@ -145,7 +145,7 @@ SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) ) -process.streamC = cms.OutputModule("ShmStreamConsumer", +process.streamC = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) )