diff --git a/Common/SimConfig/include/SimConfig/SimConfig.h b/Common/SimConfig/include/SimConfig/SimConfig.h index da2f978ddf319..d70fca2400399 100644 --- a/Common/SimConfig/include/SimConfig/SimConfig.h +++ b/Common/SimConfig/include/SimConfig/SimConfig.h @@ -83,6 +83,7 @@ struct SimConfigData { bool mNoGeant = false; // if Geant transport should be turned off (when one is only interested in the generated events) bool mIsUpgrade = false; // true if the simulation is for Run 5 std::string mFromCollisionContext = ""; // string denoting a collision context file; If given, this file will be used to determine number of events + // bool mForwardKine = false; // true if tracks and event headers are to be published on a FairMQ channel (for reading by other consumers) bool mWriteToDisc = true; // whether we write simulation products (kine, hits) to disc VertexMode mVertexMode = VertexMode::kDiamondParam; // by default we should use die InteractionDiamond parameter @@ -177,6 +178,10 @@ class SimConfig bool writeToDisc() const { return mConfigData.mWriteToDisc; } VertexMode getVertexMode() const { return mConfigData.mVertexMode; } + // returns the pair of collision context filename as well as event prefix encoded + // in the mFromCollisionContext string. Returns empty string if information is not available or set. + std::pair getCollContextFilenameAndEventPrefix() const; + private: SimConfigData mConfigData; //! diff --git a/Common/SimConfig/src/SimConfig.cxx b/Common/SimConfig/src/SimConfig.cxx index 9a10b26547ce6..be21c38c5efc8 100644 --- a/Common/SimConfig/src/SimConfig.cxx +++ b/Common/SimConfig/src/SimConfig.cxx @@ -76,7 +76,7 @@ void SimConfig::initOptions(boost::program_options::options_description& options "noGeant", bpo::bool_switch(), "prohibits any Geant transport/physics (by using tight cuts)")( "forwardKine", bpo::bool_switch(), "forward kinematics on a FairMQ channel")( "noDiscOutput", bpo::bool_switch(), "switch off writing sim results to disc (useful in combination with forwardKine)"); - options.add_options()("fromCollContext", bpo::value()->default_value(""), "Use a pregenerated collision context to infer number of events to simulate, how to embedd them, the vertex position etc. Takes precedence of other options such as \"--nEvents\"."); + options.add_options()("fromCollContext", bpo::value()->default_value(""), "Use a pregenerated collision context to infer number of events to simulate, how to embedd them, the vertex position etc. Takes precedence of other options such as \"--nEvents\". The format is COLLISIONCONTEXTFILE.root[:SIGNALNAME] where SIGNALNAME is the event part in the context which is relevant."); } void SimConfig::determineActiveModules(std::vector const& inputargs, std::vector const& skippedModules, std::vector& activeModules, bool isUpgrade) @@ -270,6 +270,21 @@ void SimConfig::determineReadoutDetectors(std::vector const& active } } +std::pair SimConfig::getCollContextFilenameAndEventPrefix() const +{ + // we decompose the argument to fetch + // (a) collision contextfilename + // (b) sim prefix to use from the context + auto pos = mConfigData.mFromCollisionContext.find(':'); + std::string collcontextfile{mConfigData.mFromCollisionContext}; + std::string simprefix{mConfigData.mOutputPrefix}; + if (pos != std::string::npos) { + collcontextfile = mConfigData.mFromCollisionContext.substr(0, pos); + simprefix = mConfigData.mFromCollisionContext.substr(pos + 1); + } + return std::make_pair(collcontextfile, simprefix); +} + bool SimConfig::resetFromParsedMap(boost::program_options::variables_map const& vm) { using o2::detectors::DetID; @@ -333,17 +348,8 @@ bool SimConfig::resetFromParsedMap(boost::program_options::variables_map const& mConfigData.mFilterNoHitEvents = true; } mConfigData.mFromCollisionContext = vm["fromCollContext"].as(); - // we decompose the argument to fetch - // (a) collision contextfilename - // (b) sim prefix to use from the context - auto pos = mConfigData.mFromCollisionContext.find(':'); - std::string collcontextfile{mConfigData.mFromCollisionContext}; - std::string simprefix{mConfigData.mOutputPrefix}; - if (pos != std::string::npos) { - collcontextfile = mConfigData.mFromCollisionContext.substr(0, pos); - simprefix = mConfigData.mFromCollisionContext.substr(pos + 1); - } - adjustFromCollContext(collcontextfile, simprefix); + auto collcontext_simprefix = getCollContextFilenameAndEventPrefix(); + adjustFromCollContext(collcontext_simprefix.first, collcontext_simprefix.second); // analyse vertex options if (!parseVertexModeString(vm["vertexMode"].as(), mConfigData.mVertexMode)) { diff --git a/DataFormats/simulation/src/DigitizationContext.cxx b/DataFormats/simulation/src/DigitizationContext.cxx index ba1fda53e179b..3fb6b757aeea3 100644 --- a/DataFormats/simulation/src/DigitizationContext.cxx +++ b/DataFormats/simulation/src/DigitizationContext.cxx @@ -578,5 +578,7 @@ DigitizationContext DigitizationContext::extractSingleTimeframe(int timeframeid, } catch (std::exception) { LOG(warn) << "No such timeframe id in collision context. Returing empty object"; } + // fix number of collisions + r.setNCollisions(r.mEventRecords.size()); return r; } diff --git a/Detectors/GlobalTrackingWorkflow/study/include/GlobalTrackingStudy/V0Ext.h b/Detectors/GlobalTrackingWorkflow/study/include/GlobalTrackingStudy/V0Ext.h index 99b35247081e6..79221b893882d 100644 --- a/Detectors/GlobalTrackingWorkflow/study/include/GlobalTrackingStudy/V0Ext.h +++ b/Detectors/GlobalTrackingWorkflow/study/include/GlobalTrackingStudy/V0Ext.h @@ -17,9 +17,7 @@ #include "ReconstructionDataFormats/V0.h" #include "SimulationDataFormat/MCCompLabel.h" -namespace o2 -{ -namespace dataformats +namespace o2::dataformats { struct ProngInfoExt { @@ -40,10 +38,10 @@ struct V0Ext { V0Index v0ID; std::array prInfo{}; const ProngInfoExt& getPrInfo(int i) const { return prInfo[i]; } - ClassDefNV(V0Ext, 1); + int mcPID = -1; + ClassDefNV(V0Ext, 2); }; -} // namespace dataformats -} // namespace o2 +} // namespace o2::dataformats #endif diff --git a/Detectors/GlobalTrackingWorkflow/study/src/SVStudy.cxx b/Detectors/GlobalTrackingWorkflow/study/src/SVStudy.cxx index 8ce1c1cec3e01..17b33c86e61ad 100644 --- a/Detectors/GlobalTrackingWorkflow/study/src/SVStudy.cxx +++ b/Detectors/GlobalTrackingWorkflow/study/src/SVStudy.cxx @@ -22,6 +22,7 @@ #include "DetectorsBase/GeometryManager.h" #include "SimulationDataFormat/MCEventLabel.h" #include "SimulationDataFormat/MCUtils.h" +#include "SimulationDataFormat/MCTrack.h" #include "CommonDataFormat/BunchFilling.h" #include "CommonUtils/NameConf.h" #include "DataFormatsFT0/RecPoints.h" @@ -86,7 +87,7 @@ class SVStudySpec : public Task float mBz = 0; GTrackID::mask_t mTracksSrc{}; o2::vertexing::DCAFitterN<2> mFitterV0; - o2::steer::MCKinematicsReader mcReader; // reader of MC information + std::unique_ptr mcReader; // reader of MC information }; void SVStudySpec::init(InitContext& ic) @@ -96,6 +97,9 @@ void SVStudySpec::init(InitContext& ic) mRefit = ic.options().get("refit"); mSelK0 = ic.options().get("sel-k0"); mMaxEta = ic.options().get("max-eta"); + if (mUseMC) { + mcReader = std::make_unique("collisioncontext.root"); + } } void SVStudySpec::run(ProcessingContext& pc) @@ -161,23 +165,24 @@ o2::dataformats::V0Ext SVStudySpec::processV0(int iv, o2::globaltracking::RecoCo v0ext.v0 = v0sel; } v0ext.v0ID = v0id; - o2::MCCompLabel lb; + o2::MCCompLabel lb[2]; + const o2::MCTrack* mcTrks[2]; for (int ip = 0; ip < 2; ip++) { auto& prInfo = v0ext.prInfo[ip]; auto gid = v0ext.v0ID.getProngID(ip); auto gidset = recoData.getSingleDetectorRefs(gid); - lb = recoData.getTrackMCLabel(gid); - if (lb.isValid()) { - prInfo.corrGlo = !lb.isFake(); + lb[ip] = recoData.getTrackMCLabel(gid); + if (lb[ip].isValid()) { + prInfo.corrGlo = !lb[ip].isFake(); } // get TPC tracks, if any if (gidset[GTrackID::TPC].isSourceSet()) { const auto& tpcTr = recoData.getTPCTrack(gidset[GTrackID::TPC]); prInfo.trackTPC = tpcTr; prInfo.nClTPC = tpcTr.getNClusters(); - lb = recoData.getTrackMCLabel(gidset[GTrackID::TPC]); - if (lb.isValid()) { - prInfo.corrTPC = !lb.isFake(); + lb[ip] = recoData.getTrackMCLabel(gidset[GTrackID::TPC]); + if (lb[ip].isValid()) { + prInfo.corrTPC = !lb[ip].isFake(); } } // get ITS tracks, if any @@ -186,9 +191,9 @@ o2::dataformats::V0Ext SVStudySpec::processV0(int iv, o2::globaltracking::RecoCo if (gidset[GTrackID::ITS].isSourceSet()) { const auto& itsTr = recoData.getITSTrack(gidset[GTrackID::ITS]); prInfo.nClITS = itsTr.getNClusters(); - lb = recoData.getTrackMCLabel(gidset[GTrackID::ITS]); - if (lb.isValid()) { - prInfo.corrITS = !lb.isFake(); + lb[ip] = recoData.getTrackMCLabel(gidset[GTrackID::ITS]); + if (lb[ip].isValid()) { + prInfo.corrITS = !lb[ip].isFake(); } for (int il = 0; il < 7; il++) { if (itsTr.hasHitOnLayer(il)) { @@ -198,9 +203,9 @@ o2::dataformats::V0Ext SVStudySpec::processV0(int iv, o2::globaltracking::RecoCo } else { const auto& itsTrf = recoData.getITSABRefs()[gidset[GTrackID::ITSAB]]; prInfo.nClITS = itsTrf.getNClusters(); - lb = recoData.getTrackMCLabel(gidset[GTrackID::ITSAB]); - if (lb.isValid()) { - prInfo.corrITS = !lb.isFake(); + lb[ip] = recoData.getTrackMCLabel(gidset[GTrackID::ITSAB]); + if (lb[ip].isValid()) { + prInfo.corrITS = !lb[ip].isFake(); } for (int il = 0; il < 7; il++) { if (itsTrf.hasHitOnLayer(il)) { @@ -211,13 +216,24 @@ o2::dataformats::V0Ext SVStudySpec::processV0(int iv, o2::globaltracking::RecoCo } if (gidset[GTrackID::ITSTPC].isSourceSet()) { auto mtc = recoData.getTPCITSTrack(gidset[GTrackID::ITSTPC]); - lb = recoData.getTrackMCLabel(gidset[GTrackID::ITSTPC]); + lb[ip] = recoData.getTrackMCLabel(gidset[GTrackID::ITSTPC]); prInfo.chi2ITSTPC = mtc.getChi2Match(); - if (lb.isValid()) { - prInfo.corrITSTPC = !lb.isFake(); + if (lb[ip].isValid()) { + prInfo.corrITSTPC = !lb[ip].isFake(); } } } + if (mUseMC && lb[ip].isValid()) { // temp store of mctrks + mcTrks[ip] = mcReader->getTrack(lb[ip]); + } + } + if (mUseMC && (mcTrks[0] != nullptr) && (mcTrks[1] != nullptr)) { + // check majority vote on mother particle otherwise leave pdg -1 + if (lb[0].getSourceID() == lb[1].getSourceID() && lb[0].getEventID() == lb[1].getEventID() && + mcTrks[0]->getMotherTrackId() == mcTrks[1]->getMotherTrackId() && mcTrks[0]->getMotherTrackId() >= 0) { + const auto mother = mcReader->getTrack(lb[0].getSourceID(), lb[0].getEventID(), mcTrks[0]->getMotherTrackId()); + v0ext.mcPID = mother->GetPdgCode(); + } } return v0ext; } diff --git a/Detectors/MUON/MID/Calibration/macros/README.md b/Detectors/MUON/MID/Calibration/macros/README.md index 7009e99086419..83e88f18ecf48 100644 --- a/Detectors/MUON/MID/Calibration/macros/README.md +++ b/Detectors/MUON/MID/Calibration/macros/README.md @@ -60,6 +60,53 @@ root -l .x build_rejectlist.C+(1716436103391,1721272208000,"localhost:8083") ``` +### Add custom bad channels + +The macro `build_rejectlist.C` scans the QCDB and the CCDB in search of issues. +However, the QCDB flag is based on local boards with empty signals. +It can happen that a local board is problematic, but not completely dead and, therefore, it is not correctly spotted by the macro. +It is therefore important to have a way to add the issues by hand. +This can be done with a json file in the form: + +```json +{ + "startRun": 557251, + "endRun": 557926, + "rejectList": [ + { + "deId": 4, + "columnId": 2, + "patterns": [ + "0x0", + "0xFFFF", + "0x0", + "0x0", + "0x0" + ] + }, + { + "deId": 13, + "columnId": 2, + "patterns": [ + "0x0", + "0xFFFF", + "0x0", + "0x0", + "0x0" + ] + } + ] +} +``` + +The path to the file is then given to the macro with: + +```shell +.x build_rejectlist.C+(1726299038000,1727386238000,"http://localhost:8083","http://alice-ccdb.cern.ch","http://localhost:8080","rejectlist.json") +``` + +The macro will then merge the manual reject list from the file with the reject list that it finds by scanning the QCDB and CCDB. + ## Running the local CCDB The local CCDB server can be easily built through alibuild. diff --git a/Detectors/MUON/MID/Calibration/macros/build_rejectlist.C b/Detectors/MUON/MID/Calibration/macros/build_rejectlist.C index 0782a08a3822d..7a395d2c099da 100644 --- a/Detectors/MUON/MID/Calibration/macros/build_rejectlist.C +++ b/Detectors/MUON/MID/Calibration/macros/build_rejectlist.C @@ -21,6 +21,8 @@ #include #include #include +#include "rapidjson/document.h" +#include "rapidjson/istreamwrapper.h" #include "TCanvas.h" #include "TH1.h" #include "TGraph.h" @@ -29,15 +31,25 @@ #include "DataFormatsParameters/GRPECSObject.h" #include "DetectorsCommonDataFormats/DetID.h" #include "DataFormatsMID/ColumnData.h" +#include "MIDBase/ColumnDataHandler.h" #include "MIDGlobalMapping/ExtendedMappingInfo.h" #include "MIDGlobalMapping/GlobalMapper.h" #include "MIDFiltering/ChannelMasksHandler.h" + +// ... #if !defined(__CLING__) || defined(__ROOTCLING__) #include "CCDB/BasicCCDBManager.h" #endif static const std::string sPathQCQuality = "qc/MID/MO/MIDQuality/Trends/global/MIDQuality/MIDQuality"; +/// @brief Reject list object +struct RejectListStruct { + long start = 0; /// Start validity + long end = 0; /// End validity + std::vector rejectList{}; /// Bad channels +}; + /// @brief Get timestamp in milliseconds /// @param timestamp Input timestamp (in s or ms) /// @return Timestamp in ms @@ -109,6 +121,12 @@ std::vector findObjectsTSInPeriod(long start, long end, const o2::ccdb::Cc /// @return Pair with first and last time std::pair findTSRange(TCanvas* qcQuality, bool selectBad = true) { + // Gets the plot with the quality flags + // The flag values are: + // Good: 3.5 + // Medium: 2.5 + // Bad: 1.5 + // Null: 0.5 auto* gr = static_cast(qcQuality->GetListOfPrimitives()->FindObject("Graph")); double xp, yp; std::pair range{std::numeric_limits::max(), 0}; @@ -168,25 +186,38 @@ std::vector getRejectList(std::vector return badChannels; } +/// @brief Gets the run duration with a safety marging +/// @param ccdbApi CCDB api +/// @param marging margin in milliseconds +/// @return Pair with the timestamps of start-margin and end+margin for the run +std::pair getRunDuration(const o2::ccdb::CcdbApi& ccdbApi, int runNumber, int64_t margin = 120000) +{ + auto runRange = o2::ccdb::BasicCCDBManager::getRunDuration(ccdbApi, runNumber); + runRange.first -= margin; // Subtract margin + runRange.second += margin; // Add margin + return runRange; +} + /// @brief Builds the reject list for the selected timestamp /// @param timestamp Timestamp for query /// @param qcdbApi QCDB api /// @param ccdbApi CCDB api /// @param outCCDBApi api of the CCDB where the reject list will be uploaded /// @return Reject list -std::vector build_rejectlist(long timestamp, const o2::ccdb::CcdbApi& qcdbApi, const o2::ccdb::CcdbApi& ccdbApi, const o2::ccdb::CcdbApi& outCCDBApi) +RejectListStruct build_rejectlist(long timestamp, const o2::ccdb::CcdbApi& qcdbApi, const o2::ccdb::CcdbApi& ccdbApi) { std::map metadata; + RejectListStruct rl; auto* qcQuality = qcdbApi.retrieveFromTFileAny(sPathQCQuality, metadata, getTSMS(timestamp)); if (!qcQuality) { std::cerr << "Cannot find QC quality for " << tsToString(timestamp) << std::endl; - return {}; + return rl; } // Find the first and last timestamp where the quality was bad (if any) auto badTSRange = findTSRange(qcQuality); if (badTSRange.second == 0) { std::cout << "All good" << std::endl; - return {}; + return rl; } // Search for the last timestamp for which the run quality was good auto goodTSRange = findTSRange(qcQuality, false); @@ -196,18 +227,15 @@ std::vector build_rejectlist(long timestamp, const o2::ccdb if (!grpecs.isDetReadOut(o2::detectors::DetID::MID)) { std::cout << "Error: we are probably reading a parallel run" << std::endl; grpecs.print(); - return {}; + return rl; } if (grpecs.getRunType() != o2::parameters::GRPECS::PHYSICS) { std::cout << "This is not a physics run: skip" << std::endl; grpecs.print(); - return {}; + return rl; } - auto runRange = o2::ccdb::BasicCCDBManager::getRunDuration(ccdbApi, grpecs.getRun()); - long margin = 120000; // Add a two minutes safety margin - runRange.first -= margin; // Subtract margin - runRange.second += margin; // Add margin + auto runRange = getRunDuration(ccdbApi, grpecs.getRun()); // Search for hits histogram in the period where the QC quality was bad auto tsVector = findObjectsTSInPeriod(badTSRange.first, badTSRange.second, qcdbApi, "qc/MID/MO/QcTaskMIDDigits/Hits"); @@ -221,15 +249,15 @@ std::vector build_rejectlist(long timestamp, const o2::ccdb auto infos = gm.buildStripsInfo(); auto badChannels = findBadChannels(occupancy, infos); auto badChannelsCCDB = *ccdbApi.retrieveFromTFileAny>("MID/Calib/BadChannels", metadata, getTSMS(timestamp)); - auto rejectList = getRejectList(badChannels, badChannelsCCDB); - if (rejectList.empty()) { + rl.rejectList = getRejectList(badChannels, badChannelsCCDB); + if (rl.rejectList.empty()) { std::cout << "Warning: reject list was empty. It probably means that an entire board is already masked in calibration for run " << grpecs.getRun() << std::endl; - return {}; + return rl; } // Print some useful information std::cout << "Reject list:" << std::endl; - for (auto& col : rejectList) { + for (auto& col : rl.rejectList) { std::cout << col << std::endl; } std::cout << "Run number: " << grpecs.getRun() << std::endl; @@ -239,60 +267,160 @@ std::vector build_rejectlist(long timestamp, const o2::ccdb std::cout << "Bad: " << timeRangeToString(badTSRange.first, badTSRange.second) << std::endl; // Set the start of the reject list to the last timestamp in which the occupancy was ok - auto startRL = goodTSRange.second; + rl.start = goodTSRange.second; if (goodTSRange.first == 0) { // If the quality was bad for the full run, set the start of the reject list to the SOR std::cout << "CAVEAT: no good TS found. Will use SOT instead" << std::endl; - startRL = runRange.first; + rl.start = runRange.first; } // Set the end of the reject list to the end of run - auto endRL = runRange.second; - // Ask if you want to upload the object to the CCDB - std::cout << "Upload reject list with validity: " << startRL << " - " << endRL << " to " << outCCDBApi.getURL() << "? [y/n]" << std::endl; - std::string answer; - std::cin >> answer; - if (answer == "y") { - std::cout << "Storing RejectList valid from " << startRL << " to " << endRL << std::endl; - outCCDBApi.storeAsTFileAny(&rejectList, "MID/Calib/RejectList", metadata, startRL, endRL); + rl.end = runRange.second; + return rl; +} + +/// @brief Loads the reject list from a json file +/// @param ccdbApi CCDB api +/// @param filename json filename +/// @return Reject list structure +RejectListStruct load_from_json(const o2::ccdb::CcdbApi& ccdbApi, const char* filename = "rejectlist.json") +{ + // Open the JSON file + std::cout << "Reading reject list from file " << filename << std::endl; + RejectListStruct rl; + std::ifstream inFile(filename); + if (!inFile.is_open()) { + std::cerr << "Could not open the file!" << std::endl; + return rl; } - return rejectList; + + // Create an IStreamWrapper for file input stream + rapidjson::IStreamWrapper isw(inFile); + + rapidjson::Document doc; + if (doc.ParseStream(isw).HasParseError()) { + std::cerr << "Problem parsing " << filename << std::endl; + return rl; + } + auto startRange = getRunDuration(ccdbApi, doc["startRun"].GetInt()); + auto endRange = getRunDuration(ccdbApi, doc["endRun"].GetInt()); + rl.start = startRange.first; + rl.end = endRange.second; + std::cout << "Manual RL validity: " << timeRangeToString(rl.start, rl.end) << std::endl; + auto rlArray = doc["rejectList"].GetArray(); + for (auto& ar : rlArray) { + o2::mid::ColumnData col; + col.deId = ar["deId"].GetInt(); + col.columnId = ar["columnId"].GetInt(); + auto patterns = ar["patterns"].GetArray(); + for (size_t iar = 0; iar < 5; ++iar) { + col.patterns[iar] = std::strtol(patterns[iar].GetString(), NULL, 16); + } + rl.rejectList.emplace_back(col); + std::cout << col << std::endl; + } + return rl; } -/// @brief Builds the reject list for the selected timestamp -/// @param timestamp Timestamp for query -/// @param qcdbUrl QCDB URL -/// @param ccdbUrl CCDB URL -/// @param outCCDBUrl URL of the CCDB where the reject list will be uploaded -/// @return Reject list -std::vector build_rejectlist(long timestamp, const char* qcdbUrl = "http://ali-qcdb-gpn.cern.ch:8083", const char* ccdbUrl = "http://alice-ccdb.cern.ch", const char* outCCDBUrl = "http://localhost:8080") +/// @brief Merges the manual and automatic reject lists +/// @param manualRL Manual reject list from json file +/// @param rls Reject list from QCDB and CCDB +/// @return Merged reject list +std::vector merge_rejectlists(const RejectListStruct& manualRL, const std::vector& rls) { - // Get the QC quality object for the selected timestamp - o2::ccdb::CcdbApi qcdbApi; - qcdbApi.init(qcdbUrl); - o2::ccdb::CcdbApi ccdbApi; - ccdbApi.init(ccdbUrl); - o2::ccdb::CcdbApi outCCDBApi; - outCCDBApi.init(outCCDBUrl); - return build_rejectlist(timestamp, qcdbApi, ccdbApi, outCCDBApi); + std::vector merged; + if (rls.empty()) { + merged.emplace_back(manualRL); + return merged; + } + o2::mid::ColumnDataHandler ch; + RejectListStruct tmpRL; + long lastEnd = manualRL.start; + for (auto& rl : rls) { + std::cout << "Checking rl with validity: " << timeRangeToString(rl.start, rl.end) << std::endl; + if (rl.start >= manualRL.start && rl.end <= manualRL.end) { + // The period is included in the validity of the manual reject list + if (rl.start > lastEnd) { + // Fill holes between periods + tmpRL = manualRL; + tmpRL.start = lastEnd; + tmpRL.end = rl.start; + merged.emplace_back(tmpRL); + std::cout << "Adding manual RL with validity: " << timeRangeToString(tmpRL.start, tmpRL.end) << std::endl; + } + lastEnd = rl.end; + + // merge + ch.clear(); + ch.merge(rl.rejectList); + ch.merge(manualRL.rejectList); + tmpRL = rl; + tmpRL.rejectList = ch.getMerged(); + std::sort(tmpRL.rejectList.begin(), tmpRL.rejectList.end(), [](const o2::mid::ColumnData& col1, const o2::mid::ColumnData& col2) { return o2::mid::getColumnDataUniqueId(col1.deId, col1.columnId) < o2::mid::getColumnDataUniqueId(col2.deId, col2.columnId); }); + merged.emplace_back(tmpRL); + std::cout << "Merging RL with validity: " << timeRangeToString(tmpRL.start, tmpRL.end) << std::endl; + // std::cout << "Before: " << std::endl; + // for (auto& col : rl.rejectList) { + // std::cout << col << std::endl; + // } + // std::cout << "After: " << std::endl; + // for (auto& col : tmpRL.rejectList) { + // std::cout << col << std::endl; + // } + } else { + if (rl.start > manualRL.end && lastEnd < manualRL.end) { + // Close manual period + tmpRL = manualRL; + tmpRL.start = lastEnd; + merged.emplace_back(tmpRL); + std::cout << "Adding manual RL with validity: " << timeRangeToString(tmpRL.start, tmpRL.end) << std::endl; + lastEnd = manualRL.end; + } + // Add current reject list as it is + merged.emplace_back(rl); + std::cout << "Adding RL with validity: " << timeRangeToString(rl.start, rl.end) << std::endl; + } + } + return merged; } -/// @brief Builds the reject list iin a time range +/// @brief Builds the reject list in a time range /// @param start Start time for query /// @param end End time for query /// @param qcdbUrl QCDB URL /// @param ccdbUrl CCDB URL /// @param outCCDBUrl URL of the CCDB where the reject lists will be uploaded -void build_rejectlist(long start, long end, const char* qcdbUrl = "http://ali-qcdb-gpn.cern.ch:8083", const char* ccdbUrl = "http://alice-ccdb.cern.ch", const char* outCCDBUrl = "http://localhost:8080") +void build_rejectlist(long start, long end, const char* qcdbUrl = "http://ali-qcdb-gpn.cern.ch:8083", const char* ccdbUrl = "http://alice-ccdb.cern.ch", const char* outCCDBUrl = "http://localhost:8080", const char* json_rejectlist = "") { // Query the MID QC quality objects o2::ccdb::CcdbApi qcdbApi; qcdbApi.init(qcdbUrl); o2::ccdb::CcdbApi ccdbApi; ccdbApi.init(ccdbUrl); - o2::ccdb::CcdbApi outCCDBApi; - outCCDBApi.init(outCCDBUrl); + std::vector rls; auto objectsTS = findObjectsTSInPeriod(start, end, qcdbApi, sPathQCQuality.c_str()); for (auto ts : objectsTS) { - build_rejectlist(ts, qcdbApi, ccdbApi, outCCDBApi); + auto rl = build_rejectlist(ts, qcdbApi, ccdbApi); + if (rl.start != rl.end) { + rls.emplace_back(rl); + } + } + + if (!std::string(json_rejectlist).empty()) { + auto rlManual = load_from_json(ccdbApi, json_rejectlist); + rls = merge_rejectlists(rlManual, rls); + } + + o2::ccdb::CcdbApi outCCDBApi; + outCCDBApi.init(outCCDBUrl); + std::map metadata; + for (auto& rl : rls) { + // Ask if you want to upload the object to the CCDB + std::cout << "Upload reject list with validity: " << rl.start << " - " << rl.end << " to " << outCCDBApi.getURL() << "? [y/n]" << std::endl; + std::string answer; + std::cin >> answer; + if (answer == "y") { + std::cout << "Storing RejectList valid from " << rl.start << " to " << rl.end << std::endl; + outCCDBApi.storeAsTFileAny(&rl.rejectList, "MID/Calib/RejectList", metadata, rl.start, rl.end); + } } } \ No newline at end of file diff --git a/Detectors/Raw/src/HBFUtilsInitializer.cxx b/Detectors/Raw/src/HBFUtilsInitializer.cxx index 1f89d9725b397..e3cc9a8eef414 100644 --- a/Detectors/Raw/src/HBFUtilsInitializer.cxx +++ b/Detectors/Raw/src/HBFUtilsInitializer.cxx @@ -78,7 +78,7 @@ HBFUtilsInitializer::HBFUtilsInitializer(const o2f::ConfigContext& configcontext hbfuInput = optStr; } else if (opt == HBFOpt::ROOT) { rootFileInput = optStr; - } else { + } else if (!helpasked) { LOGP(fatal, "uknown hbfutils-config option {}", optStr); } } diff --git a/Framework/AnalysisSupport/CMakeLists.txt b/Framework/AnalysisSupport/CMakeLists.txt index eb5706817704b..5fb1282469711 100644 --- a/Framework/AnalysisSupport/CMakeLists.txt +++ b/Framework/AnalysisSupport/CMakeLists.txt @@ -20,6 +20,7 @@ o2_add_library(FrameworkAnalysisSupport SOURCES src/Plugin.cxx src/DataInputDirector.cxx src/AODJAlienReaderHelpers.cxx + src/AODWriterHelpers.cxx PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src PUBLIC_LINK_LIBRARIES O2::Framework ${EXTRA_TARGETS} ROOT::TreePlayer) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx new file mode 100644 index 0000000000000..fa10d4661f537 --- /dev/null +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -0,0 +1,414 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/AnalysisContext.h" +#include "Framework/ConfigContext.h" +#include "Framework/ControlService.h" +#include "AODWriterHelpers.h" +#include "Framework/OutputObjHeader.h" +#include "Framework/EndOfStreamContext.h" +#include "Framework/ProcessingContext.h" +#include "Framework/InitContext.h" +#include "Framework/CallbackService.h" +#include "Framework/AnalysisSupportHelpers.h" +#include "Framework/TableConsumer.h" +#include "Framework/DataOutputDirector.h" +#include "Framework/TableTreeHelpers.h" + +#include +#include +#include +#include +#include +#include + +namespace o2::framework::writers +{ + +struct InputObjectRoute { + std::string name; + uint32_t uniqueId; + std::string directory; + uint32_t taskHash; + OutputObjHandlingPolicy policy; + OutputObjSourceType sourceType; +}; + +struct InputObject { + TClass* kind = nullptr; + void* obj = nullptr; + std::string name; + int count = -1; +}; + +const static std::unordered_map ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"}, + {OutputObjHandlingPolicy::QAObject, "QAResults.root"}}; + +AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); + int compressionLevel = 505; + if (ctx.options().hasOption("aod-writer-compression")) { + compressionLevel = ctx.options().get("aod-writer-compression"); + } + return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function { + LOGP(debug, "======== getGlobalAODSink::Init =========="); + + // find out if any table needs to be saved + bool hasOutputsToWrite = false; + for (auto& outobj : outputInputs) { + auto ds = dod->getDataOutputDescriptors(outobj); + if (ds.size() > 0) { + hasOutputsToWrite = true; + break; + } + } + + // if nothing needs to be saved then return a trivial functor + // this happens when nothing needs to be saved but there are dangling outputs + if (!hasOutputsToWrite) { + return [](ProcessingContext&) mutable -> void { + static bool once = false; + if (!once) { + LOG(info) << "No AODs to be saved."; + once = true; + } + }; + } + + // end of data functor is called at the end of the data stream + auto endofdatacb = [dod](EndOfStreamContext& context) { + dod->closeDataFiles(); + context.services().get().readyToQuit(QuitRequest::Me); + }; + + auto& callbacks = ic.services().get(); + callbacks.set(endofdatacb); + + // prepare map(startTime, tfNumber) + std::map tfNumbers; + std::map tfFilenames; + + std::vector aodMetaDataKeys; + std::vector aodMetaDataVals; + + // this functor is called once per time frame + return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { + LOGP(debug, "======== getGlobalAODSink::processing =========="); + LOGP(debug, " processing data set with {} entries", pc.inputs().size()); + + // return immediately if pc.inputs() is empty. This should never happen! + if (pc.inputs().size() == 0) { + LOGP(info, "No inputs available!"); + return; + } + + // update tfNumbers + uint64_t startTime = 0; + uint64_t tfNumber = 0; + auto ref = pc.inputs().get("tfn"); + if (ref.spec && ref.payload) { + startTime = DataRefUtils::getHeader(ref)->startTime; + tfNumber = pc.inputs().get("tfn"); + tfNumbers.insert(std::pair(startTime, tfNumber)); + } + // update tfFilenames + std::string aodInputFile; + auto ref2 = pc.inputs().get("tff"); + if (ref2.spec && ref2.payload) { + startTime = DataRefUtils::getHeader(ref2)->startTime; + aodInputFile = pc.inputs().get("tff"); + tfFilenames.insert(std::pair(startTime, aodInputFile)); + } + + // close all output files if one has reached size limit + dod->checkFileSizes(); + + // loop over the DataRefs which are contained in pc.inputs() + for (const auto& ref : pc.inputs()) { + if (!ref.spec) { + LOGP(debug, "Invalid input will be skipped!"); + continue; + } + + // get metadata + if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) { + aodMetaDataKeys = pc.inputs().get>(ref.spec->binding); + } + if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) { + aodMetaDataVals = pc.inputs().get>(ref.spec->binding); + } + + // skip non-AOD refs + if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) { + continue; + } + startTime = DataRefUtils::getHeader(ref)->startTime; + + // does this need to be saved? + auto dh = DataRefUtils::getHeader(ref); + auto tableName = dh->dataDescription.as(); + auto ds = dod->getDataOutputDescriptors(*dh); + if (ds.empty()) { + continue; + } + + // get TF number from startTime + auto it = tfNumbers.find(startTime); + if (it != tfNumbers.end()) { + tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); + } else { + LOGP(fatal, "No time frame number found for output with start time {}", startTime); + throw std::runtime_error("Processing is stopped!"); + } + // get aod input file from startTime + auto it2 = tfFilenames.find(startTime); + if (it2 != tfFilenames.end()) { + aodInputFile = it2->second; + } + + // get the TableConsumer and corresponding arrow table + auto msg = pc.inputs().get(ref.spec->binding); + if (msg.header == nullptr) { + LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); + continue; + } + auto s = pc.inputs().get(ref.spec->binding); + auto table = s->asArrowTable(); + if (!table->Validate().ok()) { + LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); + continue; + } + if (table->schema()->fields().empty()) { + LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); + } + + // loop over all DataOutputDescriptors + // a table can be saved in multiple ways + // e.g. different selections of columns to different files + for (auto d : ds) { + auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel); + auto treename = fileAndFolder.folderName + "/" + d->treename; + TableToTree ta2tr(table, + fileAndFolder.file, + treename.c_str()); + + // update metadata + if (fileAndFolder.file->FindObjectAny("metaData")) { + LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); + } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { + TMap aodMetaDataMap; + for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { + aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd])); + } + fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite"); + } + + if (!d->colnames.empty()) { + for (auto& cn : d->colnames) { + auto idx = table->schema()->GetFieldIndex(cn); + auto col = table->column(idx); + auto field = table->schema()->field(idx); + if (idx != -1) { + ta2tr.addBranch(col, field); + } + } + } else { + ta2tr.addAllBranches(); + } + ta2tr.process(); + } + } + }; + } + + }; +} + +AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + auto tskmap = ac.outTskMap; + auto objmap = ac.outObjHistMap; + + return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function { + auto& callbacks = ic.services().get(); + auto inputObjects = std::make_shared>>(); + + static TFile* f[OutputObjHandlingPolicy::numPolicies]; + for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { + f[i] = nullptr; + } + + static std::string currentDirectory = ""; + static std::string currentFile = ""; + + auto endofdatacb = [inputObjects](EndOfStreamContext& context) { + LOG(debug) << "Writing merged objects and histograms to file"; + if (inputObjects->empty()) { + LOG(error) << "Output object map is empty!"; + context.services().get().readyToQuit(QuitRequest::Me); + return; + } + for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { + if (f[i] != nullptr) { + f[i]->Close(); + } + } + LOG(debug) << "All outputs merged in their respective target files"; + context.services().get().readyToQuit(QuitRequest::Me); + }; + + callbacks.set(endofdatacb); + return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { + auto const& ref = pc.inputs().get("x"); + if (!ref.header) { + LOG(error) << "Header not found"; + return; + } + if (!ref.payload) { + LOG(error) << "Payload not found"; + return; + } + auto datah = o2::header::get(ref.header); + if (!datah) { + LOG(error) << "No data header in stack"; + return; + } + + auto objh = o2::header::get(ref.header); + if (!objh) { + LOG(error) << "No output object header in stack"; + return; + } + + InputObject obj; + FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); + tm.InitMap(); + obj.kind = tm.ReadClass(); + tm.SetBufferOffset(0); + tm.ResetMap(); + if (obj.kind == nullptr) { + LOG(error) << "Cannot read class info from buffer."; + return; + } + + auto policy = objh->mPolicy; + auto sourceType = objh->mSourceType; + auto hash = objh->mTaskHash; + + obj.obj = tm.ReadObjectAny(obj.kind); + auto* named = static_cast(obj.obj); + obj.name = named->GetName(); + auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); + if (hpos == tskmap.end()) { + LOG(error) << "No task found for hash " << hash; + return; + } + auto taskname = hpos->name; + auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); + if (opos == objmap.end()) { + LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; + return; + } + auto objects = opos->bindings; + if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { + LOG(error) << "No object " << obj.name << " in map for task " << taskname; + return; + } + auto nameHash = runtime_hash(obj.name.c_str()); + InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; + auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); + // If it's the first one, we just add it to the list. + if (existing == inputObjects->end()) { + obj.count = objh->mPipelineSize; + inputObjects->push_back(std::make_pair(key, obj)); + existing = inputObjects->end() - 1; + } else { + obj.count = existing->second.count; + // Otherwise, we merge it with the existing one. + auto merger = existing->second.kind->GetMerge(); + if (!merger) { + LOG(error) << "Already one unmergeable object found for " << obj.name; + return; + } + TList coll; + coll.Add(static_cast(obj.obj)); + merger(existing->second.obj, &coll, nullptr); + } + // We expect as many objects as the pipeline size, for + // a given object name and task hash. + existing->second.count -= 1; + + if (existing->second.count != 0) { + return; + } + // Write the object here. + auto route = existing->first; + auto entry = existing->second; + auto file = ROOTfileNames.find(route.policy); + if (file == ROOTfileNames.end()) { + return; + } + auto filename = file->second; + if (f[route.policy] == nullptr) { + f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); + } + auto nextDirectory = route.directory; + if ((nextDirectory != currentDirectory) || (filename != currentFile)) { + if (!f[route.policy]->FindKey(nextDirectory.c_str())) { + f[route.policy]->mkdir(nextDirectory.c_str()); + } + currentDirectory = nextDirectory; + currentFile = filename; + } + + // translate the list-structure created by the registry into a directory structure within the file + std::function writeListToFile; + writeListToFile = [&](TList* list, TDirectory* parentDir) { + TIter next(list); + TObject* object = nullptr; + while ((object = next())) { + if (object->InheritsFrom(TList::Class())) { + writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); + } else { + parentDir->WriteObjectAny(object, object->Class(), object->GetName()); + auto* written = list->Remove(object); + delete written; + } + } + }; + + TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); + if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { + auto* outputList = static_cast(entry.obj); + outputList->SetOwner(false); + + // if registry should live in dedicated folder a TNamed object is appended to the list + if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { + delete outputList->Last(); + outputList->RemoveLast(); + currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); + } + + writeListToFile(outputList, currentDir); + outputList->SetOwner(); + delete outputList; + entry.obj = nullptr; + } else { + currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); + delete (TObject*)entry.obj; + entry.obj = nullptr; + } + }; + }}; +} +} // namespace o2::framework::writers diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.h b/Framework/AnalysisSupport/src/AODWriterHelpers.h new file mode 100644 index 0000000000000..7ae59a5cf3b01 --- /dev/null +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.h @@ -0,0 +1,28 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ +#define O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ + +#include "Framework/AlgorithmSpec.h" +#include + +namespace o2::framework::writers +{ + +struct AODWriterHelpers { + static AlgorithmSpec getOutputObjHistWriter(ConfigContext const& context); + static AlgorithmSpec getOutputTTreeWriter(ConfigContext const& context); +}; + +} // namespace o2::framework::writers + +#endif // O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index bba3499286e08..52435375d7e9e 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -16,6 +16,7 @@ #include "Framework/Capability.h" #include "Framework/Signpost.h" #include "AODJAlienReaderHelpers.h" +#include "AODWriterHelpers.h" #include #include #include @@ -33,6 +34,20 @@ struct ROOTFileReader : o2::framework::AlgorithmPlugin { } }; +struct ROOTObjWriter : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getOutputObjHistWriter(config); + } +}; + +struct ROOTTTreeWriter : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getOutputTTreeWriter(config); + } +}; + using namespace o2::framework; struct RunSummary : o2::framework::ServicePlugin { o2::framework::ServiceSpec* create() final @@ -211,6 +226,8 @@ struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin { DEFINE_DPL_PLUGINS_BEGIN DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTObjWriter, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTTTreeWriter, CustomAlgorithm); DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService); DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery); DEFINE_DPL_PLUGINS_END diff --git a/Framework/Core/include/Framework/AnalysisContext.h b/Framework/Core/include/Framework/AnalysisContext.h new file mode 100644 index 0000000000000..0f62f952d0aaa --- /dev/null +++ b/Framework/Core/include/Framework/AnalysisContext.h @@ -0,0 +1,58 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_ANALYSISCONTEXT_H_ +#define O2_FRAMEWORK_ANALYSISCONTEXT_H_ + +#include +#include "Framework/InputSpec.h" +#include "Framework/OutputSpec.h" + +namespace o2::framework +{ +class DataOutputDirector; + +struct OutputTaskInfo { + uint32_t id; + std::string name; +}; + +struct OutputObjectInfo { + uint32_t id; + std::vector bindings; +}; + +// +struct AnalysisContext { + std::vector requestedAODs; + std::vector providedAODs; + std::vector requestedDYNs; + std::vector providedDYNs; + std::vector requestedIDXs; + std::vector providedOutputObjHist; + std::vector spawnerInputs; + + // Needed to created the hist writer + std::vector outTskMap; + std::vector outObjHistMap; + + // Needed to create the output director + std::vector outputsInputs; + std::vector isDangling; + + // Needed to create the aod writer + std::vector outputsInputsAOD; +}; +} // namespace o2::framework + +extern template class std::vector; +extern template class std::vector; + +#endif // O2_FRAMEWORK_ANALYSISCONTEXT_H_ diff --git a/Framework/Core/src/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h similarity index 71% rename from Framework/Core/src/AnalysisSupportHelpers.h rename to Framework/Core/include/Framework/AnalysisSupportHelpers.h index ba5bcedb4bc67..4ae601dc9e4a2 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -14,6 +14,7 @@ #include "Framework/OutputSpec.h" #include "Framework/InputSpec.h" #include "Framework/DataProcessorSpec.h" +#include "Framework/AnalysisContext.h" #include "Headers/DataHeader.h" #include @@ -24,36 +25,7 @@ static constexpr std::array extendedAODOrigins{header::Da static constexpr std::array writableAODOrigins{header::DataOrigin{"AOD"}, header::DataOrigin{"AOD1"}, header::DataOrigin{"AOD2"}, header::DataOrigin{"DYN"}}; class DataOutputDirector; - -struct OutputTaskInfo { - uint32_t id; - std::string name; -}; - -struct OutputObjectInfo { - uint32_t id; - std::vector bindings; -}; -} // namespace o2::framework - -extern template class std::vector; -extern template class std::vector; - -namespace o2::framework -{ -// -struct AnalysisContext { - std::vector requestedAODs; - std::vector providedAODs; - std::vector requestedDYNs; - std::vector providedDYNs; - std::vector requestedIDXs; - std::vector providedOutputObjHist; - std::vector spawnerInputs; - - std::vector outTskMap; - std::vector outObjHistMap; -}; +class ConfigContext; // Helper class to be moved in the AnalysisSupport plugin at some point struct AnalysisSupportHelpers { @@ -74,11 +46,11 @@ struct AnalysisSupportHelpers { /// Match all inputs of kind ATSK and write them to a ROOT file, /// one root file per originating task. - static DataProcessorSpec getOutputObjHistSink(std::vector const& objmap, - std::vector const& tskmap); + static DataProcessorSpec getOutputObjHistSink(ConfigContext const&); /// writes inputs of kind AOD to file - static DataProcessorSpec getGlobalAODSink(std::shared_ptr dod, - std::vector const& outputInputs, int compression); + static DataProcessorSpec getGlobalAODSink(ConfigContext const&); + /// Get the data director + static std::shared_ptr getDataOutputDirector(ConfigContext const& ctx); }; }; // namespace o2::framework diff --git a/Framework/Core/include/Framework/ConfigContext.h b/Framework/Core/include/Framework/ConfigContext.h index 5790699fe68bb..87259f0519915 100644 --- a/Framework/Core/include/Framework/ConfigContext.h +++ b/Framework/Core/include/Framework/ConfigContext.h @@ -8,11 +8,11 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_CONFIG_CONTEXT_H -#define FRAMEWORK_CONFIG_CONTEXT_H +#ifndef O2_FRAMEWORK_CONFIG_CONTEXT_H_ +#define O2_FRAMEWORK_CONFIG_CONTEXT_H_ #include "Framework/ConfigParamRegistry.h" -#include "Framework/ServiceRegistry.h" +#include "Framework/ServiceRegistryRef.h" namespace o2::framework { @@ -23,9 +23,10 @@ namespace o2::framework class ConfigContext { public: - ConfigContext(ConfigParamRegistry& options, int argc, char** argv) : mOptions{options}, mArgc{argc}, mArgv{argv} {} + ConfigContext(ConfigParamRegistry& options, ServiceRegistryRef services, int argc, char** argv); [[nodiscard]] ConfigParamRegistry& options() const { return mOptions; } + [[nodiscard]] ServiceRegistryRef services() const { return mServices; } [[nodiscard]] bool helpOnCommandLine() const; @@ -34,11 +35,13 @@ class ConfigContext private: ConfigParamRegistry& mOptions; + + ServiceRegistryRef mServices; // additionaly keep information about the original command line int mArgc = 0; char** mArgv = nullptr; }; -} // namespace o2 +} // namespace o2::framework -#endif +#endif // O2_FRAMEWORK_CONFIG_CONTEXT_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index eee4c4b6583d3..8293bf0cf7039 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -30,6 +30,7 @@ #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "Framework/ConfigParamDiscovery.h" +#include "ServiceRegistryRef.h" #include namespace o2::framework @@ -198,7 +199,8 @@ int mainNoCatch(int argc, char** argv) workflowOptions.push_back(extra); } - ConfigContext configContext(workflowOptionsRegistry, argc, argv); + ServiceRegistry configRegistry; + ConfigContext configContext(workflowOptionsRegistry, ServiceRegistryRef{configRegistry}, argc, argv); o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); overrideCloning(configContext, specs); overridePipeline(configContext, specs); diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index e949f27a6eed6..eb17566fd6d31 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -9,18 +9,16 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "Framework/DataOutputDirector.h" #include "Framework/OutputObjHeader.h" #include "Framework/ControlService.h" #include "Framework/EndOfStreamContext.h" #include "Framework/DeviceSpec.h" #include "Framework/TableTreeHelpers.h" - -#include "TFile.h" -#include "TTree.h" -#include "TMap.h" -#include "TObjString.h" +#include "Framework/PluginManager.h" +#include "Framework/ConfigContext.h" +#include "WorkflowHelpers.h" template class std::vector; template class std::vector; @@ -28,21 +26,105 @@ template class std::vector; namespace o2::framework { -struct InputObjectRoute { - std::string name; - uint32_t uniqueId; - std::string directory; - uint32_t taskHash; - OutputObjHandlingPolicy policy; - OutputObjSourceType sourceType; -}; +std::shared_ptr AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx) +{ + auto const& options = ctx.options(); + auto const& OutputsInputs = ctx.services().get().outputsInputs; + auto const& isDangling = ctx.services().get().isDangling; + + std::shared_ptr dod = std::make_shared(); + + // analyze options and take actions accordingly + // default values + std::string rdn, resdir("./"); + std::string fnb, fnbase("AnalysisResults_trees"); + float mfs, maxfilesize(-1.); + std::string fmo, filemode("RECREATE"); + int ntfm, ntfmerge = 1; + + // values from json + if (options.isSet("aod-writer-json")) { + auto fnjson = options.get("aod-writer-json"); + if (!fnjson.empty()) { + std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); + if (!rdn.empty()) { + resdir = rdn; + } + if (!fnb.empty()) { + fnbase = fnb; + } + if (!fmo.empty()) { + filemode = fmo; + } + if (mfs > 0.) { + maxfilesize = mfs; + } + if (ntfm > 0) { + ntfmerge = ntfm; + } + } + } + + // values from command line options, information from json is overwritten + if (options.isSet("aod-writer-resdir")) { + rdn = options.get("aod-writer-resdir"); + if (!rdn.empty()) { + resdir = rdn; + } + } + if (options.isSet("aod-writer-resfile")) { + fnb = options.get("aod-writer-resfile"); + if (!fnb.empty()) { + fnbase = fnb; + } + } + if (options.isSet("aod-writer-resmode")) { + fmo = options.get("aod-writer-resmode"); + if (!fmo.empty()) { + filemode = fmo; + } + } + if (options.isSet("aod-writer-maxfilesize")) { + mfs = options.get("aod-writer-maxfilesize"); + if (mfs > 0) { + maxfilesize = mfs; + } + } + if (options.isSet("aod-writer-ntfmerge")) { + ntfm = options.get("aod-writer-ntfmerge"); + if (ntfm > 0) { + ntfmerge = ntfm; + } + } + // parse the keepString + if (options.isSet("aod-writer-keep")) { + auto keepString = options.get("aod-writer-keep"); + if (!keepString.empty()) { + dod->reset(); + std::string d("dangling"); + if (d.find(keepString) == 0) { + // use the dangling outputs + std::vector danglingOutputs; + for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) { + danglingOutputs.emplace_back(OutputsInputs[ii]); + } + } + dod->readSpecs(danglingOutputs); + } else { + // use the keep string + dod->readString(keepString); + } + } + } + dod->setResultDir(resdir); + dod->setFilenameBase(fnbase); + dod->setFileMode(filemode); + dod->setMaximumFileSize(maxfilesize); + dod->setNumberTimeFramesToMerge(ntfmerge); -struct InputObject { - TClass* kind = nullptr; - void* obj = nullptr; - std::string name; - int count = -1; -}; + return dod; +} void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector const& providedOutputs, std::vector const& requestedInputs, @@ -125,191 +207,16 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c } } -const static std::unordered_map ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"}, - {OutputObjHandlingPolicy::QAObject, "QAResults.root"}}; - // ============================================================================= -DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector const& objmap, std::vector const& tskmap) +DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx) { - auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function { - auto& callbacks = ic.services().get(); - auto inputObjects = std::make_shared>>(); - - static TFile* f[OutputObjHandlingPolicy::numPolicies]; - for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { - f[i] = nullptr; - } - - static std::string currentDirectory = ""; - static std::string currentFile = ""; - - auto endofdatacb = [inputObjects](EndOfStreamContext& context) { - LOG(debug) << "Writing merged objects and histograms to file"; - if (inputObjects->empty()) { - LOG(error) << "Output object map is empty!"; - context.services().get().readyToQuit(QuitRequest::Me); - return; - } - for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { - if (f[i] != nullptr) { - f[i]->Close(); - } - } - LOG(debug) << "All outputs merged in their respective target files"; - context.services().get().readyToQuit(QuitRequest::Me); - }; - - callbacks.set(endofdatacb); - return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { - auto const& ref = pc.inputs().get("x"); - if (!ref.header) { - LOG(error) << "Header not found"; - return; - } - if (!ref.payload) { - LOG(error) << "Payload not found"; - return; - } - auto datah = o2::header::get(ref.header); - if (!datah) { - LOG(error) << "No data header in stack"; - return; - } - - auto objh = o2::header::get(ref.header); - if (!objh) { - LOG(error) << "No output object header in stack"; - return; - } - - InputObject obj; - FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); - tm.InitMap(); - obj.kind = tm.ReadClass(); - tm.SetBufferOffset(0); - tm.ResetMap(); - if (obj.kind == nullptr) { - LOG(error) << "Cannot read class info from buffer."; - return; - } - - auto policy = objh->mPolicy; - auto sourceType = objh->mSourceType; - auto hash = objh->mTaskHash; - - obj.obj = tm.ReadObjectAny(obj.kind); - auto* named = static_cast(obj.obj); - obj.name = named->GetName(); - auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); - if (hpos == tskmap.end()) { - LOG(error) << "No task found for hash " << hash; - return; - } - auto taskname = hpos->name; - auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); - if (opos == objmap.end()) { - LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; - return; - } - auto objects = opos->bindings; - if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { - LOG(error) << "No object " << obj.name << " in map for task " << taskname; - return; - } - auto nameHash = runtime_hash(obj.name.c_str()); - InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; - auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); - // If it's the first one, we just add it to the list. - if (existing == inputObjects->end()) { - obj.count = objh->mPipelineSize; - inputObjects->push_back(std::make_pair(key, obj)); - existing = inputObjects->end() - 1; - } else { - obj.count = existing->second.count; - // Otherwise, we merge it with the existing one. - auto merger = existing->second.kind->GetMerge(); - if (!merger) { - LOG(error) << "Already one unmergeable object found for " << obj.name; - return; - } - TList coll; - coll.Add(static_cast(obj.obj)); - merger(existing->second.obj, &coll, nullptr); - } - // We expect as many objects as the pipeline size, for - // a given object name and task hash. - existing->second.count -= 1; - - if (existing->second.count != 0) { - return; - } - // Write the object here. - auto route = existing->first; - auto entry = existing->second; - auto file = ROOTfileNames.find(route.policy); - if (file == ROOTfileNames.end()) { - return; - } - auto filename = file->second; - if (f[route.policy] == nullptr) { - f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); - } - auto nextDirectory = route.directory; - if ((nextDirectory != currentDirectory) || (filename != currentFile)) { - if (!f[route.policy]->FindKey(nextDirectory.c_str())) { - f[route.policy]->mkdir(nextDirectory.c_str()); - } - currentDirectory = nextDirectory; - currentFile = filename; - } - - // translate the list-structure created by the registry into a directory structure within the file - std::function writeListToFile; - writeListToFile = [&](TList* list, TDirectory* parentDir) { - TIter next(list); - TObject* object = nullptr; - while ((object = next())) { - if (object->InheritsFrom(TList::Class())) { - writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); - } else { - parentDir->WriteObjectAny(object, object->Class(), object->GetName()); - auto* written = list->Remove(object); - delete written; - } - } - }; - - TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); - if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { - auto* outputList = static_cast(entry.obj); - outputList->SetOwner(false); - - // if registry should live in dedicated folder a TNamed object is appended to the list - if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { - delete outputList->Last(); - outputList->RemoveLast(); - currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); - } - - writeListToFile(outputList, currentDir); - outputList->SetOwner(); - delete outputList; - entry.obj = nullptr; - } else { - currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); - delete (TObject*)entry.obj; - entry.obj = nullptr; - } - }; - }; - - char const* name = "internal-dpl-aod-global-analysis-file-sink"; // Lifetime is sporadic because we do not ask each analysis task to send its // results every timeframe. DataProcessorSpec spec{ - .name = name, + .name = "internal-dpl-aod-global-analysis-file-sink", .inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}), Lifetime::Sporadic)}, - .algorithm = {writerFunction}, + .outputs = {}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTObjWriter", ctx), }; return spec; @@ -317,188 +224,17 @@ DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector dod, - std::vector const& outputInputs, int compressionLevel) + AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx) { - - auto writerFunction = [dod, outputInputs, compressionLevel](InitContext& ic) -> std::function { - LOGP(debug, "======== getGlobalAODSink::Init =========="); - - // find out if any table needs to be saved - bool hasOutputsToWrite = false; - for (auto& outobj : outputInputs) { - auto ds = dod->getDataOutputDescriptors(outobj); - if (ds.size() > 0) { - hasOutputsToWrite = true; - break; - } - } - - // if nothing needs to be saved then return a trivial functor - // this happens when nothing needs to be saved but there are dangling outputs - if (!hasOutputsToWrite) { - return [](ProcessingContext&) mutable -> void { - static bool once = false; - if (!once) { - LOG(info) << "No AODs to be saved."; - once = true; - } - }; - } - - // end of data functor is called at the end of the data stream - auto endofdatacb = [dod](EndOfStreamContext& context) { - dod->closeDataFiles(); - context.services().get().readyToQuit(QuitRequest::Me); - }; - - auto& callbacks = ic.services().get(); - callbacks.set(endofdatacb); - - // prepare map(startTime, tfNumber) - std::map tfNumbers; - std::map tfFilenames; - - std::vector aodMetaDataKeys; - std::vector aodMetaDataVals; - - // this functor is called once per time frame - return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { - LOGP(debug, "======== getGlobalAODSink::processing =========="); - LOGP(debug, " processing data set with {} entries", pc.inputs().size()); - - // return immediately if pc.inputs() is empty. This should never happen! - if (pc.inputs().size() == 0) { - LOGP(info, "No inputs available!"); - return; - } - - // update tfNumbers - uint64_t startTime = 0; - uint64_t tfNumber = 0; - auto ref = pc.inputs().get("tfn"); - if (ref.spec && ref.payload) { - startTime = DataRefUtils::getHeader(ref)->startTime; - tfNumber = pc.inputs().get("tfn"); - tfNumbers.insert(std::pair(startTime, tfNumber)); - } - // update tfFilenames - std::string aodInputFile; - auto ref2 = pc.inputs().get("tff"); - if (ref2.spec && ref2.payload) { - startTime = DataRefUtils::getHeader(ref2)->startTime; - aodInputFile = pc.inputs().get("tff"); - tfFilenames.insert(std::pair(startTime, aodInputFile)); - } - - // close all output files if one has reached size limit - dod->checkFileSizes(); - - // loop over the DataRefs which are contained in pc.inputs() - for (const auto& ref : pc.inputs()) { - if (!ref.spec) { - LOGP(debug, "Invalid input will be skipped!"); - continue; - } - - // get metadata - if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) { - aodMetaDataKeys = pc.inputs().get>(ref.spec->binding); - } - if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) { - aodMetaDataVals = pc.inputs().get>(ref.spec->binding); - } - - // skip non-AOD refs - if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) { - continue; - } - startTime = DataRefUtils::getHeader(ref)->startTime; - - // does this need to be saved? - auto dh = DataRefUtils::getHeader(ref); - auto tableName = dh->dataDescription.as(); - auto ds = dod->getDataOutputDescriptors(*dh); - if (ds.empty()) { - continue; - } - - // get TF number from startTime - auto it = tfNumbers.find(startTime); - if (it != tfNumbers.end()) { - tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); - } else { - LOGP(fatal, "No time frame number found for output with start time {}", startTime); - throw std::runtime_error("Processing is stopped!"); - } - // get aod input file from startTime - auto it2 = tfFilenames.find(startTime); - if (it2 != tfFilenames.end()) { - aodInputFile = it2->second; - } - - // get the TableConsumer and corresponding arrow table - auto msg = pc.inputs().get(ref.spec->binding); - if (msg.header == nullptr) { - LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); - continue; - } - auto s = pc.inputs().get(ref.spec->binding); - auto table = s->asArrowTable(); - if (!table->Validate().ok()) { - LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); - continue; - } - if (table->schema()->fields().empty()) { - LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); - } - - // loop over all DataOutputDescriptors - // a table can be saved in multiple ways - // e.g. different selections of columns to different files - for (auto d : ds) { - auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel); - auto treename = fileAndFolder.folderName + "/" + d->treename; - TableToTree ta2tr(table, - fileAndFolder.file, - treename.c_str()); - - // update metadata - if (fileAndFolder.file->FindObjectAny("metaData")) { - LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); - } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { - TMap aodMetaDataMap; - for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { - aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd])); - } - fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite"); - } - - if (!d->colnames.empty()) { - for (auto& cn : d->colnames) { - auto idx = table->schema()->GetFieldIndex(cn); - auto col = table->column(idx); - auto field = table->schema()->field(idx); - if (idx != -1) { - ta2tr.addBranch(col, field); - } - } - } else { - ta2tr.addAllBranches(); - } - ta2tr.process(); - } - } - }; - }; // end of writerFunction + auto& ac = ctx.services().get(); // the command line options relevant for the writer are global // see runDataProcessing.h DataProcessorSpec spec{ .name = "internal-dpl-aod-writer", - .inputs = outputInputs, + .inputs = ac.outputsInputsAOD, .outputs = {}, - .algorithm = AlgorithmSpec{writerFunction}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTTTreeWriter", ctx), }; return spec; diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 1a656e4d60080..230d708b47dc7 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -30,7 +30,7 @@ #include "Framework/ServiceMetricsInfo.h" #include "WorkflowHelpers.h" #include "Framework/WorkflowSpecNode.h" -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "CommonMessageBackendsHelpers.h" #include @@ -516,7 +516,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); // create DataOutputDescriptor - std::shared_ptr dod = WorkflowHelpers::getDataOutputDirector(ctx.options(), outputsInputs, isDangling); + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink @@ -537,11 +537,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // add TFNumber and TFFilename as input to the writer outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); - int compression = 505; - if (ctx.options().hasOption("aod-writer-compression")) { - compression = ctx.options().get("aod-writer-compression"); - } - workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compression)); + workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx)); } // Move the dummy sink at the end, if needed for (size_t i = 0; i < workflow.size(); ++i) { diff --git a/Framework/Core/src/ConfigContext.cxx b/Framework/Core/src/ConfigContext.cxx index 726332e1d0ae3..9b121b1884998 100644 --- a/Framework/Core/src/ConfigContext.cxx +++ b/Framework/Core/src/ConfigContext.cxx @@ -14,6 +14,9 @@ namespace o2::framework { +ConfigContext::ConfigContext(ConfigParamRegistry& options, ServiceRegistryRef services, int argc, char** argv) + : mOptions{options}, mServices{services}, mArgc{argc}, mArgv{argv} {} + bool ConfigContext::helpOnCommandLine() const { bool helpasked = false; diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index da9a135dc5eb8..3782c48e81c56 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -9,7 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "WorkflowHelpers.h" -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "Framework/AlgorithmSpec.h" #include "Framework/AODReaderHelpers.h" #include "Framework/ConfigParamSpec.h" @@ -153,7 +153,7 @@ int defaultConditionQueryRateMultiplier() return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1; } -void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx) +void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx) { auto fakeCallback = AlgorithmSpec{[](InitContext& ic) { LOG(info) << "This is not a real device, merely a placeholder for external inputs"; @@ -241,7 +241,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}}); } - AnalysisContext ac; + ctx.services().registerService(ServiceRegistryHelpers::handleForService(new AnalysisContext)); + auto& ac = ctx.services().get(); + std::vector requestedCCDBs; std::vector providedCCDBs; @@ -573,7 +575,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // This is to inject a file sink so that any dangling ATSK object is written // to a ROOT file. if (ac.providedOutputObjHist.empty() == false) { - auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ac.outObjHistMap, ac.outTskMap); + auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ctx); extraSpecs.push_back(rootSink); } @@ -581,41 +583,38 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext extraSpecs.clear(); /// Analyze all ouputs - auto [outputsInputs, isDangling] = analyzeOutputs(workflow); + auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow); + ac.isDangling = isDanglingTmp; + ac.outputsInputs = outputsInputsTmp; // create DataOutputDescriptor - std::shared_ptr dod = getDataOutputDirector(ctx.options(), outputsInputs, isDangling); + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink // has to be created in any case! - std::vector outputsInputsAOD; - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { - auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]); - if (ds.size() > 0 || isDangling[ii]) { - outputsInputsAOD.emplace_back(outputsInputs[ii]); + for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { + auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]); + if (ds.size() > 0 || ac.isDangling[ii]) { + ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]); } } } // file sink for any AOD output - if (outputsInputsAOD.size() > 0) { + if (ac.outputsInputsAOD.size() > 0) { // add TFNumber and TFFilename as input to the writer - outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); - outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); - int compressionLevel = 505; - if (ctx.options().hasOption("aod-writer-compression")) { - compressionLevel = ctx.options().get("aod-writer-compression"); - } - auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compressionLevel); + ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); + ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); + auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx); extraSpecs.push_back(fileSink); - auto it = std::find_if(outputsInputs.begin(), outputsInputs.end(), [](InputSpec& spec) -> bool { + auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool { return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); }); - size_t ii = std::distance(outputsInputs.begin(), it); - isDangling[ii] = false; + size_t ii = std::distance(ac.outputsInputs.begin(), it); + ac.isDangling[ii] = false; } workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); @@ -623,20 +622,20 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // Select dangling outputs which are not of type AOD std::vector redirectedOutputsInputs; - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { + for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { if (ctx.options().get("forwarding-policy") == "none") { continue; } // We forward to the output proxy all the inputs only if they are dangling // or if the forwarding policy is "proxy". - if (!isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { + if (!ac.isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { continue; } // AODs are skipped in any case. - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { + if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { continue; } - redirectedOutputsInputs.emplace_back(outputsInputs[ii]); + redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]); } std::vector unmatched; @@ -985,102 +984,6 @@ struct DataMatcherId { size_t id; }; -std::shared_ptr WorkflowHelpers::getDataOutputDirector(ConfigParamRegistry const& options, std::vector const& OutputsInputs, std::vector const& isDangling) -{ - std::shared_ptr dod = std::make_shared(); - - // analyze options and take actions accordingly - // default values - std::string rdn, resdir("./"); - std::string fnb, fnbase("AnalysisResults_trees"); - float mfs, maxfilesize(-1.); - std::string fmo, filemode("RECREATE"); - int ntfm, ntfmerge = 1; - - // values from json - if (options.isSet("aod-writer-json")) { - auto fnjson = options.get("aod-writer-json"); - if (!fnjson.empty()) { - std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); - if (!rdn.empty()) { - resdir = rdn; - } - if (!fnb.empty()) { - fnbase = fnb; - } - if (!fmo.empty()) { - filemode = fmo; - } - if (mfs > 0.) { - maxfilesize = mfs; - } - if (ntfm > 0) { - ntfmerge = ntfm; - } - } - } - - // values from command line options, information from json is overwritten - if (options.isSet("aod-writer-resdir")) { - rdn = options.get("aod-writer-resdir"); - if (!rdn.empty()) { - resdir = rdn; - } - } - if (options.isSet("aod-writer-resfile")) { - fnb = options.get("aod-writer-resfile"); - if (!fnb.empty()) { - fnbase = fnb; - } - } - if (options.isSet("aod-writer-resmode")) { - fmo = options.get("aod-writer-resmode"); - if (!fmo.empty()) { - filemode = fmo; - } - } - if (options.isSet("aod-writer-maxfilesize")) { - mfs = options.get("aod-writer-maxfilesize"); - if (mfs > 0) { - maxfilesize = mfs; - } - } - if (options.isSet("aod-writer-ntfmerge")) { - ntfm = options.get("aod-writer-ntfmerge"); - if (ntfm > 0) { - ntfmerge = ntfm; - } - } - // parse the keepString - if (options.isSet("aod-writer-keep")) { - auto keepString = options.get("aod-writer-keep"); - if (!keepString.empty()) { - dod->reset(); - std::string d("dangling"); - if (d.find(keepString) == 0) { - // use the dangling outputs - std::vector danglingOutputs; - for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) { - danglingOutputs.emplace_back(OutputsInputs[ii]); - } - } - dod->readSpecs(danglingOutputs); - } else { - // use the keep string - dod->readString(keepString); - } - } - } - dod->setResultDir(resdir); - dod->setFilenameBase(fnbase); - dod->setFileMode(filemode); - dod->setMaximumFileSize(maxfilesize); - dod->setNumberTimeFramesToMerge(ntfmerge); - - return dod; -} - std::tuple, std::vector> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow) { // compute total number of input/output diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index b20249b99edc8..b2a4d4cab55df 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -180,7 +180,7 @@ struct WorkflowHelpers { // dangling inputs are satisfied. // @a workflow the workflow to decorate // @a ctx the context for the configuration phase - static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx); + static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx); // Final adjustments to @a workflow after service devices have been injected. static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx); @@ -204,8 +204,6 @@ struct WorkflowHelpers { const std::vector& edges, const std::vector& index); - static std::shared_ptr getDataOutputDirector(ConfigParamRegistry const& options, std::vector const& OutputsInputs, std::vector const& outputTypes); - /// Given @a workflow it gathers all the OutputSpec and in addition provides /// the information whether and output is dangling and/or of type AOD /// An Output is dangling if it does not have a corresponding InputSpec. diff --git a/Framework/Core/test/Mocking.h b/Framework/Core/test/Mocking.h index b3e48ad3b2d0f..3b9a86b46e89c 100644 --- a/Framework/Core/test/Mocking.h +++ b/Framework/Core/test/Mocking.h @@ -34,7 +34,8 @@ std::unique_ptr makeEmptyConfigContext() store->preload(); store->activate(); static ConfigParamRegistry registry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/Core/test/benchmark_WorkflowHelpers.cxx b/Framework/Core/test/benchmark_WorkflowHelpers.cxx index f1c070d8a0f4e..09a9ae0cca923 100644 --- a/Framework/Core/test/benchmark_WorkflowHelpers.cxx +++ b/Framework/Core/test/benchmark_WorkflowHelpers.cxx @@ -30,7 +30,8 @@ std::unique_ptr makeEmptyConfigContext() store->preload(); store->activate(); static ConfigParamRegistry registry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/Core/test/test_OverrideLabels.cxx b/Framework/Core/test/test_OverrideLabels.cxx index 573bd13be797a..c5134c0c169c0 100644 --- a/Framework/Core/test/test_OverrideLabels.cxx +++ b/Framework/Core/test/test_OverrideLabels.cxx @@ -31,7 +31,8 @@ std::unique_ptr mockupLabels(std::string labelArg) store->preload(); store->activate(); registry = ConfigParamRegistry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index 9986f52a1d940..efac16f6da4f0 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -17,6 +17,7 @@ #include "Framework/AnalysisTask.h" #include #include +#include using namespace o2; using namespace o2::framework; @@ -43,7 +44,7 @@ struct EtaAndClsHistogramsSimple { { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { - etaClsH->Fill(track.eta(), track.pt(), 0); + etaClsH->Fill(track.eta(), track.pt()); skimEx(track.pt(), track.eta()); } } @@ -57,7 +58,7 @@ struct EtaAndClsHistogramsIUSimple { { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { - etaClsH->Fill(track.eta(), track.pt(), 0); + etaClsH->Fill(track.eta(), track.pt()); skimEx(track.pt(), track.eta()); } } diff --git a/Steer/src/CollisionContextTool.cxx b/Steer/src/CollisionContextTool.cxx index af2f607b88774..3d1dcec29976e 100644 --- a/Steer/src/CollisionContextTool.cxx +++ b/Steer/src/CollisionContextTool.cxx @@ -27,6 +27,7 @@ #include "CommonUtils/ConfigurableParam.h" #include #include "DataFormatsParameters/GRPLHCIFData.h" +#include "SimConfig/SimConfig.h" // // Created by Sandro Wenzel on 13.07.21. @@ -52,11 +53,12 @@ struct Options { bool useexistingkinematics = false; bool noEmptyTF = false; // prevent empty timeframes; the first interaction will be shifted backwards to fall within the range given by Options.orbits int maxCollsPerTF = -1; // the maximal number of hadronic collisions per TF (can be used to constrain number of collisions per timeframe to some maximal value) - bool genVertices = false; // whether to assign vertices to collisions std::string configKeyValues = ""; // string to init config key values long timestamp = -1; // timestamp for CCDB queries std::string individualTFextraction = ""; // triggers extraction of individuel timeframe components when non-null // format is path prefix + std::string vertexModeString{"kNoVertex"}; // Vertex Mode; vertices will be assigned to collisions of mode != kNoVertex + o2::conf::VertexMode vertexMode = o2::conf::VertexMode::kNoVertex; }; enum class InteractionLockMode { @@ -203,7 +205,9 @@ bool parseOptions(int argc, char* argv[], Options& optvalues) "first-orbit", bpo::value(&optvalues.firstFractionalOrbit)->default_value(0), "First (fractional) orbit in the run (HBFUtils.firstOrbit + BC from decimal)")( "maxCollsPerTF", bpo::value(&optvalues.maxCollsPerTF)->default_value(-1), "Maximal number of MC collisions to put into one timeframe. By default no constraint.")( "noEmptyTF", bpo::bool_switch(&optvalues.noEmptyTF), "Enforce to have at least one collision")( - "configKeyValues", bpo::value(&optvalues.configKeyValues)->default_value(""), "Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')")("with-vertices", "Assign vertices to collisions.")("timestamp", bpo::value(&optvalues.timestamp)->default_value(-1L), "Timestamp for CCDB queries / anchoring")( + "configKeyValues", bpo::value(&optvalues.configKeyValues)->default_value(""), "Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')")( + "with-vertices", bpo::value(&optvalues.vertexModeString)->default_value("kNoVertex"), "Assign vertices to collisions. Argument is the vertex mode. Defaults to no vertexing applied")( + "timestamp", bpo::value(&optvalues.timestamp)->default_value(-1L), "Timestamp for CCDB queries / anchoring")( "extract-per-timeframe", bpo::value(&optvalues.individualTFextraction)->default_value(""), "Extract individual timeframe contexts. Format required: time_frame_prefix[:comma_separated_list_of_signals_to_offset]"); @@ -225,9 +229,8 @@ bool parseOptions(int argc, char* argv[], Options& optvalues) if (vm.count("use-existing-kine")) { optvalues.useexistingkinematics = true; } - if (vm.count("with-vertices")) { - optvalues.genVertices = true; - } + + o2::conf::SimConfig::parseVertexModeString(optvalues.vertexModeString, optvalues.vertexMode); // fix the first orbit and bunch crossing // auto orbitbcpair = parseOrbitAndBC(optvalues.firstIRString); @@ -277,10 +280,9 @@ int main(int argc, char* argv[]) LOG(info) << "Fetch bcPattern information from CCDB"; // fetch the GRP Object auto& ccdb = o2::ccdb::BasicCCDBManager::instance(); - ccdb.setTimestamp(options.timestamp); ccdb.setCaching(false); ccdb.setLocalObjectValidityChecking(true); - auto grpLHC = ccdb.get("GLO/Config/GRPLHCIF"); + auto grpLHC = ccdb.getForTimeStamp("GLO/Config/GRPLHCIF", options.timestamp); LOG(info) << "Fetched injection scheme " << grpLHC->getInjectionScheme() << " from CCDB"; sampler.setBunchFilling(grpLHC->getBunchFilling()); } else { @@ -449,14 +451,32 @@ int main(int argc, char* argv[]) auto numTimeFrames = digicontext.finalizeTimeframeStructure(orbitstart, options.orbitsPerTF); - if (options.genVertices) { - // TODO: offer option taking meanVertex directly from CCDB ! "GLO/Calib/MeanVertex" - // sample interaction vertices + if (options.vertexMode != o2::conf::VertexMode::kNoVertex) { + switch (options.vertexMode) { + case o2::conf::VertexMode::kCCDB: { + // fetch mean vertex from CCDB + auto meanv = o2::ccdb::BasicCCDBManager::instance().getForTimeStamp("GLO/Calib/MeanVertex", options.timestamp); + if (meanv) { + LOG(info) << "Applying vertexing using CCDB mean vertex " << *meanv; + digicontext.sampleInteractionVertices(*meanv); + } else { + LOG(fatal) << "No vertex available"; + } + break; + } - // init this vertex from CCDB or InteractionDiamond parameter - const auto& dparam = o2::eventgen::InteractionDiamondParam::Instance(); - o2::dataformats::MeanVertexObject meanv(dparam.position[0], dparam.position[1], dparam.position[2], dparam.width[0], dparam.width[1], dparam.width[2], dparam.slopeX, dparam.slopeY); - digicontext.sampleInteractionVertices(meanv); + case o2::conf::VertexMode::kDiamondParam: { + // init this vertex from CCDB or InteractionDiamond parameter + const auto& dparam = o2::eventgen::InteractionDiamondParam::Instance(); + o2::dataformats::MeanVertexObject meanv(dparam.position[0], dparam.position[1], dparam.position[2], dparam.width[0], dparam.width[1], dparam.width[2], dparam.slopeX, dparam.slopeY); + LOG(info) << "Applying vertexing using DiamondParam mean vertex " << meanv; + digicontext.sampleInteractionVertices(meanv); + break; + } + default: { + LOG(error) << "Unknown vertex mode ... Not generating vertices"; + } + } } // we fill QED contributions to the context diff --git a/run/O2PrimaryServerDevice.h b/run/O2PrimaryServerDevice.h index 202e6e8652cc7..53b86d1f23591 100644 --- a/run/O2PrimaryServerDevice.h +++ b/run/O2PrimaryServerDevice.h @@ -138,7 +138,8 @@ class O2PrimaryServerDevice final : public fair::mq::Device mPrimGen->SetEvent(&mEventHeader); // A good moment to couple to collision context - auto collContextFileName = mSimConfig.getConfigData().mFromCollisionContext; + auto collContextFileName_PrefixPair = mSimConfig.getCollContextFilenameAndEventPrefix(); + auto collContextFileName = collContextFileName_PrefixPair.first; if (collContextFileName.size() > 0) { LOG(info) << "Simulation has collission context"; mCollissionContext = o2::steer::DigitizationContext::loadFromFile(collContextFileName); @@ -147,7 +148,7 @@ class O2PrimaryServerDevice final : public fair::mq::Device LOG(info) << "We found " << vertices.size() << " vertices included "; // initialize the eventID to collID mapping - const auto source = mCollissionContext->findSimPrefix(mSimConfig.getOutPrefix()); + const auto source = mCollissionContext->findSimPrefix(collContextFileName_PrefixPair.second); if (source == -1) { LOG(fatal) << "Wrong simulation prefix"; }