Skip to content

Commit

Permalink
[QC-1117] Handle case when no cycle duration is set (#2287)
Browse files Browse the repository at this point in the history
* [QC-1117] Handle case when no cycle duration is set

* Work on the cycle duration in a single place

* better name

* format

* fix

* sanitize cycle duration also for mergers
  • Loading branch information
Barthelemy authored May 15, 2024
1 parent e572ab0 commit 0f28acc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 33 deletions.
3 changes: 3 additions & 0 deletions Framework/include/QualityControl/TaskRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class TaskRunnerFactory
/// \param policies - completion policies vector
static void customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies);
static framework::InputSpec createTimerInputSpec(const CommonSpec& globalConfig, std::vector<std::pair<size_t, size_t>>& cycleDurations, const std::string& detectorName, const std::string& taskName);

/// \brief Extracts and sanitize the cycle duration of the task
static std::vector<std::pair<size_t, size_t>> getSanitizedCycleDurations(const CommonSpec& globalConfig, const TaskSpec& taskSpec);
};

} // namespace o2::quality_control::core
Expand Down
14 changes: 2 additions & 12 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateFullChainInfrastructure

// In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks.
size_t resetAfterCycles = taskSpec.mergingMode == "delta" ? taskSpec.resetAfterCycles : 0;
std::vector<std::pair<size_t, size_t>> cycleDurationsMultiplied;
if (taskSpec.cycleDurationSeconds > 0) { // old, simple, style
cycleDurationsMultiplied = { { taskSpec.cycleDurationSeconds, 1 } };
} else { // new style
cycleDurationsMultiplied = taskSpec.multipleCycleDurations;
}
auto cycleDurationsMultiplied = TaskRunnerFactory::getSanitizedCycleDurations(infrastructureSpec.common, taskSpec);
std::for_each(cycleDurationsMultiplied.begin(), cycleDurationsMultiplied.end(),
[taskSpec](std::pair<size_t, size_t>& p) { p.first *= taskSpec.mergerCycleMultiplier; });
bool enableMovingWindows = !taskSpec.movingWindows.empty();
Expand Down Expand Up @@ -271,12 +266,7 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur

// In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks.
size_t resetAfterCycles = taskSpec.mergingMode == "delta" ? taskSpec.resetAfterCycles : 0;
std::vector<std::pair<size_t, size_t>> cycleDurationsMultiplied;
if (taskSpec.cycleDurationSeconds > 0) { // old, simple, style
cycleDurationsMultiplied = { { taskSpec.cycleDurationSeconds, 1 } };
} else { // new style
cycleDurationsMultiplied = taskSpec.multipleCycleDurations;
}
auto cycleDurationsMultiplied = TaskRunnerFactory::getSanitizedCycleDurations(infrastructureSpec.common, taskSpec);
std::for_each(cycleDurationsMultiplied.begin(), cycleDurationsMultiplied.end(),
[taskSpec](std::pair<size_t, size_t>& p) { p.first *= taskSpec.mergerCycleMultiplier; });
bool enableMovingWindows = !taskSpec.movingWindows.empty();
Expand Down
55 changes: 34 additions & 21 deletions Framework/src/TaskRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,39 @@ o2::framework::DataProcessorSpec TaskRunnerFactory::create(const TaskRunnerConfi
return newTask;
}

std::vector<std::pair<size_t, size_t>> TaskRunnerFactory::getSanitizedCycleDurations(const CommonSpec& globalConfig, const TaskSpec& taskSpec)
{
// Two ways of configuring, incompatible.
// 1. simple, old, way: cycleDurationSeconds is the duration in seconds for all cycles
// 2. complex, new, way: cycleDurations: a list of tuples specifying different durations to be applied for a certain time

// First make sure that we have one and only one cycle duration definition
if (taskSpec.cycleDurationSeconds > 0 && taskSpec.multipleCycleDurations.size() > 0) {
throw std::runtime_error("Both cycleDurationSeconds and cycleDurations have been defined for task '" + taskSpec.taskName + "'. Pick one. Sheepishly bailing out.");
}
if (taskSpec.cycleDurationSeconds <= 0 && taskSpec.multipleCycleDurations.size() == 0) {
throw std::runtime_error("Neither cycleDurationSeconds nor cycleDurations have been defined for task '" + taskSpec.taskName + "'. Pick one. Sheepishly bailing out.");
}

// Convert old style into new style if needed
std::vector<std::pair<size_t, size_t>> multipleCycleDurations = taskSpec.multipleCycleDurations; // this is the new style
if (taskSpec.cycleDurationSeconds > 0) { // if it was actually the old style, then we convert it to the new style
multipleCycleDurations = { { taskSpec.cycleDurationSeconds, 1 } };
}

// Check that the durations are not below 10 seconds except when using a dummy database
auto dummyDatabaseUsed = globalConfig.database.count("implementation") > 0 && globalConfig.database.at("implementation") == "Dummy";
if (!dummyDatabaseUsed) {
for (auto& [cycleDuration, validity] : multipleCycleDurations) {
if (cycleDuration < 10) {
ILOG(Error, Support) << "Cycle duration is too short (" << cycleDuration << "), replaced by a duration of 10 seconds." << ENDM;
cycleDuration = 10;
}
}
}
return multipleCycleDurations;
}

TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig, const TaskSpec& taskSpec, std::optional<int> id, std::optional<int> resetAfterCycles)
{
std::string deviceName{ TaskRunner::createTaskRunnerIdString() + "-" + InfrastructureSpecReader::validateDetectorName(taskSpec.detectorName) + "-" + taskSpec.taskName };
Expand All @@ -76,16 +109,7 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig
}

// cycle duration
// Two ways of configuring, incompatible.
// 1. simple, old, way: cycleDurationSeconds is the duration in seconds for all cycles
// 2. complex, new, way: cycleDurations: a list of tuples specifying different durations to be applied for a certain time
if (taskSpec.cycleDurationSeconds > 0 && taskSpec.multipleCycleDurations.size() > 0) {
throw std::runtime_error("Both cycleDurationSeconds and cycleDurations have been defined for task '" + taskSpec.taskName + "'. Pick one. Sheepishly bailing out.");
}
auto multipleCycleDurations = taskSpec.multipleCycleDurations; // this is the new style
if (taskSpec.cycleDurationSeconds > 0) { // if it was actually the old style, then we convert it to the new style
multipleCycleDurations = { { taskSpec.cycleDurationSeconds, 1 } };
}
auto multipleCycleDurations = getSanitizedCycleDurations(globalConfig, taskSpec);
inputs.emplace_back(createTimerInputSpec(globalConfig, multipleCycleDurations, taskSpec.detectorName, taskSpec.taskName));

static std::unordered_map<std::string, o2::base::GRPGeomRequest::GeomRequest> const geomRequestFromString = {
Expand Down Expand Up @@ -176,17 +200,6 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig
InputSpec TaskRunnerFactory::createTimerInputSpec(const CommonSpec& globalConfig, std::vector<std::pair<size_t, size_t>>& cycleDurations,
const std::string& detectorName, const std::string& taskName)
{
// This is to check that the durations are not below 10 seconds except when using a dummy database
auto dummyDatabaseUsed = globalConfig.database.count("implementation") > 0 && globalConfig.database.at("implementation") == "Dummy";
if (!dummyDatabaseUsed) {
for (auto& [cycleDuration, validity] : cycleDurations) {
if (cycleDuration < 10) {
ILOG(Error, Support) << "Cycle duration is too short (" << cycleDuration << "), replaced by a duration of 10 seconds." << ENDM;
cycleDuration = 10;
}
}
}

// Create the TimerSpec for cycleDurations
std::vector<TimerSpec> timers;
for (auto& [cycleDuration, period] : cycleDurations) {
Expand Down

0 comments on commit 0f28acc

Please sign in to comment.