Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QC-1231] Remove the retrieval of qcConfiguration in init #2414

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Framework/include/QualityControl/AggregatorRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ class AggregatorRunner : public framework::Task

void send(const QualityObjectsWithAggregatorNameVector&, framework::DataAllocator&);

void refreshConfig(framework::InitContext& iCtx);

/**
* Prepare the inputs, remove the duplicates
*/
Expand Down
3 changes: 0 additions & 3 deletions Framework/include/QualityControl/CheckRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ class CheckRunner : public framework::Task
/// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ)
void reset();

/// Refresh the configuration using the payload found in the fairmq options (if available)
void refreshConfig(framework::InitContext& iCtx);

// General state
std::string mDeviceName;
std::map<std::string, Check> mChecks;
Expand Down
1 change: 0 additions & 1 deletion Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class TaskRunner : public framework::Task

/// \brief Checks if all the expected data inputs are present in the provided InputRecord
static bool isDataReady(const framework::InputRecord& inputs);
void refreshConfig(framework::InitContext& iCtx);
void printTaskConfig() const;
void startOfActivity();
void endOfActivity();
Expand Down
43 changes: 0 additions & 43 deletions Framework/src/AggregatorRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -78,48 +78,6 @@ AggregatorRunner::~AggregatorRunner()
}
}

void AggregatorRunner::refreshConfig(InitContext& iCtx)
{
try {
auto updatedTree = iCtx.options().get<boost::property_tree::ptree>("qcConfiguration");

if (updatedTree.empty()) {
ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM;
} else {
if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor
ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM;
printTree(updatedTree);
}

// read the config, prepare spec
auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree, workflow_type_helpers::getWorkflowType(iCtx.options()));

// replace the runner config
mRunnerConfig = AggregatorRunnerFactory::extractRunnerConfig(infrastructureSpec.common);

// replace the aggregators configs
mAggregatorsConfig.clear();
mAggregatorsConfig = AggregatorRunnerFactory::extractAggregatorsConfig(infrastructureSpec.common, infrastructureSpec.aggregators);

// replace the inputs
mInputs.clear();
prepareInputs();

mOutputs.clear();
prepareOutputs();

ILOG(Debug, Devel) << "Configuration refreshed" << ENDM;
}
} catch (std::invalid_argument& error) {
// ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode
ILOG(Warning, Devel) << "Could not get updated config tree in TaskRunner::init() - `qcConfiguration` could not be retrieved" << ENDM;
} catch (...) {
// we catch here because we don't know where it will get lost in DPL, and also we don't care if this part has failed.
ILOG(Warning, Devel) << "Error caught in refreshConfig(): "
<< current_diagnostic(true) << ENDM;
}
}

void AggregatorRunner::prepareInputs()
{
std::set<std::string> alreadySeen;
Expand Down Expand Up @@ -158,7 +116,6 @@ std::string AggregatorRunner::createAggregatorRunnerName()
void AggregatorRunner::init(framework::InitContext& iCtx)
{
core::initInfologger(iCtx, mRunnerConfig.infologgerDiscardParameters, "aggregator");
refreshConfig(iCtx);
QcInfoLogger::setDetector(AggregatorRunner::getDetectorName(mAggregators));
Bookkeeping::getInstance().init(mRunnerConfig.bookkeepingUrl);

Expand Down
41 changes: 0 additions & 41 deletions Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -169,51 +169,10 @@ CheckRunner::~CheckRunner()
}
}

void CheckRunner::refreshConfig(InitContext& iCtx)
{
try {
// get the tree
auto updatedTree = iCtx.options().get<boost::property_tree::ptree>("qcConfiguration");

if (updatedTree.empty()) {
ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM;
} else {
if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor
ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM;
printTree(updatedTree);
}

// prepare the information we need
auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree, workflow_type_helpers::getWorkflowType(iCtx.options()));

// Use the config to reconfigure the check runner.
// The configs for the checks we find in the config and in our map are updated.
// Topology changes are ignored: New checks are ignored. Removed checks are ignored.
for (const auto& checkSpec : infrastructureSpec.checks) {
// search if we have this check in this runner and replace it
if (mChecks.find(checkSpec.checkName) != mChecks.end()) {
auto checkConfig = Check::extractConfig(infrastructureSpec.common, checkSpec);
mChecks.erase(checkConfig.name);
mChecks.emplace(checkConfig.name, checkConfig);
ILOG(Debug, Devel) << "Check " << checkSpec.checkName << " has been updated" << ENDM;
}
}
}
} catch (std::invalid_argument& error) {
// ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode
ILOG(Warning, Devel) << "Could not get updated config tree in CheckRunner::init() - `qcConfiguration` could not be retrieved" << ENDM;
} catch (...) {
// we catch here because we don't know where it will get lost in DPL, and also we don't care if this part has failed.
ILOG(Warning, Devel) << "Error caught in CheckRunner::refreshConfig(): "
<< current_diagnostic(true) << ENDM;
}
}

void CheckRunner::init(framework::InitContext& iCtx)
{
try {
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, createCheckRunnerFacility(mDeviceName));
refreshConfig(iCtx);
Bookkeeping::getInstance().init(mConfig.bookkeepingUrl);
initDatabase();
initMonitoring();
Expand Down
39 changes: 0 additions & 39 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -81,50 +81,11 @@ TaskRunner::~TaskRunner()
ILOG(Debug, Trace) << "TaskRunner destructor (" << this << ")" << ENDM;
}

void TaskRunner::refreshConfig(InitContext& iCtx)
{
try {
// get the tree
auto updatedTree = iCtx.options().get<boost::property_tree::ptree>("qcConfiguration");

if (updatedTree.empty()) {
ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM;
} else {
if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor
ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM;
printTree(updatedTree);
}

// prepare the information we need
auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree, workflow_type_helpers::getWorkflowType(iCtx.options()));
// find the correct taskSpec
auto taskSpecIter = find_if(infrastructureSpec.tasks.begin(),
infrastructureSpec.tasks.end(),
[this](const TaskSpec& ts) { return ts.taskName == mTaskConfig.taskName; });
if (taskSpecIter != infrastructureSpec.tasks.end()) {
bool runningWithMergers = mTaskConfig.parallelTaskID != 0; // it is 0 when we are the one and only task instance.
int resetAfterCycles = TaskRunnerFactory::computeResetAfterCycles(*taskSpecIter, runningWithMergers);
mTaskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, *taskSpecIter, mTaskConfig.parallelTaskID, resetAfterCycles);
ILOG(Debug, Devel) << "Configuration refreshed" << ENDM;
} else {
ILOG(Error, Support) << "Could not find the task " << mTaskConfig.taskName << " in the templated config provided by ECS, we continue with the original config" << ENDM;
}
}
} catch (std::invalid_argument& error) {
// ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode
ILOG(Warning, Devel) << "Could not get updated config tree in TaskRunner::init() - `qcConfiguration` could not be retrieved" << ENDM;
} catch (...) {
// we catch here because we don't know where it will get lost in dpl, and also we don't care if this part has failed.
ILOG(Warning, Devel) << "Error caught in refreshConfig() : " << current_diagnostic(true) << ENDM;
}
}

void TaskRunner::init(InitContext& iCtx)
{
core::initInfologger(iCtx, mTaskConfig.infologgerDiscardParameters, "task/" + mTaskConfig.taskName, mTaskConfig.detectorName);
ILOG(Info, Devel) << "Initializing TaskRunner" << ENDM;

refreshConfig(iCtx);
printTaskConfig();
Bookkeeping::getInstance().init(mTaskConfig.bookkeepingUrl);

Expand Down
Loading