Skip to content

Commit

Permalink
QC-1136 Publication Policy for ObjectsManager (#2235)
Browse files Browse the repository at this point in the history
* QC-1136 Publication Policy for ObjectsManager

This lets user ask for how long the registered object should be published.
By that, the user is made more aware of the expected lifetime of the object that they register,
which proved to be difficult for START STOP START in postprocessing.
SkeletonPostProcessing has been adapted to document the three main scenarios that may happen.

User tasks were adapted to use reasonable parameters.
On the occasion, a few problems leading to potentially faulty START STOP START were additionally fixed.

* Remove obsolete warning, replaced by PublicationPolicy

* Apply suggestions from code review

Co-authored-by: Barthélémy von Haller <barthelemy.von.haller@gmail.com>

---------

Co-authored-by: Barthélémy von Haller <barthelemy.von.haller@gmail.com>
  • Loading branch information
knopers8 and Barthelemy authored May 14, 2024
1 parent 18787b3 commit 0b697ea
Show file tree
Hide file tree
Showing 57 changed files with 344 additions and 217 deletions.
22 changes: 21 additions & 1 deletion Framework/include/QualityControl/ObjectsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ class TObjArray;
namespace o2::quality_control::core
{

enum class PublicationPolicy {
// QC framework will publish the object once after TaskInterface::endOfCycle() or PostProcessingInterface::update()
// and then will remove it from the list of published objects. Typically to be used in TaskInterface::endOfCycle()
// and PostProcessingInterface::update()
Once,
// QC framework will continue publishing this object after each TaskInterface::endOfCycle() and
// PostProcessingInterface::update(), up to and including TaskInterface::endOfCycle() at EndOfStream and
// PostProcessingInterface::finalize(). It will remove it from the list of published objects after that.
// Typically to be used in TaskInterface::startOfActivity() and PostProcessingInterface::initialize()
ThroughStop,
// QC framework will continue publishing this object after each TaskInterface::endOfCycle() and
// PostProcessingInterface::update() until the user task is destructed.
// Usually to be used in TaskInterface::initialize() and PostProcessingInterface::configure()
Forever
};

class ServiceDiscovery;

/// \brief Keeps the list of encapsulated objects to publish and does the actual publication.
Expand Down Expand Up @@ -63,7 +79,7 @@ class ObjectsManager
* @param obj The object to publish.
* @throws DuplicateObjectError
*/
void startPublishing(TObject* obj);
void startPublishing(TObject* obj, PublicationPolicy = PublicationPolicy::Forever);

/**
* Stop publishing this object
Expand All @@ -79,6 +95,9 @@ class ObjectsManager
*/
void stopPublishing(const std::string& objectName);

/// \brief Stop publishing all objects with this publication policy
void stopPublishing(PublicationPolicy policy);

/// \brief Stop publishing all registered objects
void stopPublishingAll();

Expand Down Expand Up @@ -196,6 +215,7 @@ class ObjectsManager

private:
std::unique_ptr<MonitorObjectCollection> mMonitorObjects;
std::map<MonitorObject*, PublicationPolicy> mPublicationPoliciesForMOs;
std::string mTaskName;
std::string mTaskClass;
std::string mDetectorName;
Expand Down
2 changes: 1 addition & 1 deletion Framework/include/QualityControl/PostProcessingInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PostProcessingInterface : public core::UserCodeInterface

/// \brief Configuration of a post-processing task.
/// Configuration of a post-processing task. Can be overridden if user wants to retrieve the configuration of the task.
/// \param config ConfigurationInterface with prefix set to ""
/// \param config boost property with the full QC configuration file
virtual void configure(const boost::property_tree::ptree& config);

/// \brief Initialization of a post-processing task.
Expand Down
1 change: 0 additions & 1 deletion Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ class TaskRunner : public framework::Task
uint64_t mDataReceivedInCycle = 0;
AliceO2::Common::Timer mTimerTotalDurationActivity;
AliceO2::Common::Timer mTimerDurationCycle;
int mNumberObjectsRegisteredAtStart = 0;
};

} // namespace o2::quality_control::core
Expand Down
29 changes: 24 additions & 5 deletions Framework/src/ObjectsManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <utility>
#include <algorithm>
#include <ranges>

using namespace o2::quality_control::core;
using namespace AliceO2::Common;
Expand Down Expand Up @@ -59,7 +60,7 @@ ObjectsManager::~ObjectsManager()
ILOG(Debug, Devel) << "ObjectsManager destructor" << ENDM;
}

void ObjectsManager::startPublishing(TObject* object)
void ObjectsManager::startPublishing(TObject* object, PublicationPolicy publicationPolicy)
{
if (!object) {
ILOG(Warning, Support) << "A nullptr provided to ObjectManager::startPublishing" << ENDM;
Expand All @@ -75,6 +76,7 @@ void ObjectsManager::startPublishing(TObject* object)
newObject->setCreateMovingWindow(std::find(mMovingWindowsList.begin(), mMovingWindowsList.end(), object->GetName()) != mMovingWindowsList.end());
mMonitorObjects->Add(newObject);
mUpdateServiceDiscovery = true;
mPublicationPoliciesForMOs[newObject] = publicationPolicy;
}

void ObjectsManager::updateServiceDiscovery()
Expand Down Expand Up @@ -114,31 +116,48 @@ void ObjectsManager::stopPublishing(TObject* object)
}
// We look for the MonitorObject which observes the provided object by comparing its address
// This way, we avoid invoking any methods of the provided object, thus we can stop publishing it even after it is deleted
TObject* objectToRemove = nullptr;
MonitorObject* moToRemove = nullptr;
for (auto moAsTObject : *mMonitorObjects) {
auto mo = dynamic_cast<MonitorObject*>(moAsTObject);
if (mo && mo->getObject() == object) {
objectToRemove = moAsTObject;
moToRemove = mo;
continue;
}
}
if (objectToRemove) {
mMonitorObjects->Remove(objectToRemove);
if (moToRemove) {
mPublicationPoliciesForMOs.erase(moToRemove);
mMonitorObjects->Remove(moToRemove);
mMonitorObjects->Compress();
}
}

void ObjectsManager::stopPublishing(const string& objectName)
{
auto* mo = dynamic_cast<MonitorObject*>(getMonitorObject(objectName));
mPublicationPoliciesForMOs.erase(mo);
mMonitorObjects->Remove(mo);
mMonitorObjects->Compress();
}

void ObjectsManager::stopPublishing(PublicationPolicy policy)
{
// we do not use directly the view, because deletions in the policy map would invalidate the iterators in the view
// c++23 will allow us to do std::ranges::to instead.
std::vector<MonitorObject*> allObjectsMatchingPolicy;
for (auto* mo : mPublicationPoliciesForMOs | std::views::filter([policy](const auto& kv) { return kv.second == policy; }) | std::views::keys) {
allObjectsMatchingPolicy.push_back(mo);
}

for (const auto mo : allObjectsMatchingPolicy) {
stopPublishing(mo->getObject());
}
}

void ObjectsManager::stopPublishingAll()
{
removeAllFromServiceDiscovery();
mMonitorObjects->Clear();
mPublicationPoliciesForMOs.clear();
}

bool ObjectsManager::isBeingPublished(const string& name)
Expand Down
4 changes: 3 additions & 1 deletion Framework/src/PostProcessingRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ void PostProcessingRunner::doUpdate(const Trigger& trigger)

if (mActivity.mValidity.isValid()) {
mPublicationCallback(mObjectManager->getNonOwningArray());
mObjectManager->stopPublishing(PublicationPolicy::Once);
} else {
ILOG(Warning, Support) << "Objects will not be published because their validity is invalid. This should not happen." << ENDM;
}
Expand All @@ -315,7 +316,8 @@ void PostProcessingRunner::doFinalize(const Trigger& trigger)
ILOG(Warning, Devel) << "Objects will not be published because their validity is invalid. Most likely the task's update() method was never triggered." << ENDM;
}
mTaskState = TaskState::Finished;
mObjectManager->stopPublishingAll();
mObjectManager->stopPublishing(PublicationPolicy::Once);
mObjectManager->stopPublishing(PublicationPolicy::ThroughStop);
}

const std::string& PostProcessingRunner::getID() const
Expand Down
14 changes: 8 additions & 6 deletions Framework/src/SliceTrendingTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ void SliceTrendingTask::initialize(Trigger, framework::ServiceRegistryRef servic
{
// removing leftovers from any previous runs
mTrend.reset();
for (auto [name, object] : mPlots) {
getObjectsManager()->stopPublishing(object);
for (auto& [name, object] : mPlots) {
delete object;
object = nullptr;
}

mPlots.clear();
mReductors.clear();
mSources.clear();
Expand Down Expand Up @@ -105,7 +106,7 @@ void SliceTrendingTask::initialize(Trigger, framework::ServiceRegistryRef servic
}

if (mConfig.producePlotsOnUpdate) {
getObjectsManager()->startPublishing(mTrend.get());
getObjectsManager()->startPublishing(mTrend.get(), PublicationPolicy::ThroughStop);
}
}

Expand All @@ -121,8 +122,9 @@ void SliceTrendingTask::update(Trigger t, framework::ServiceRegistryRef services
void SliceTrendingTask::finalize(Trigger t, framework::ServiceRegistryRef)
{
if (!mConfig.producePlotsOnUpdate) {
getObjectsManager()->startPublishing(mTrend.get());
getObjectsManager()->startPublishing(mTrend.get(), PublicationPolicy::ThroughStop);
}

generatePlots();

for (const auto& source : mConfig.dataSources) {
Expand Down Expand Up @@ -180,8 +182,8 @@ void SliceTrendingTask::generatePlots()
for (const auto& plot : mConfig.plots) {
// Delete the existing plots before regenerating them.
if (mPlots.count(plot.name)) {
getObjectsManager()->stopPublishing(mPlots[plot.name]);
delete mPlots[plot.name];
mPlots[plot.name] = nullptr;
}

// Postprocess each pad (titles, axes, flushing buffers).
Expand Down Expand Up @@ -251,7 +253,7 @@ void SliceTrendingTask::generatePlots()
}

mPlots[plot.name] = c;
getObjectsManager()->startPublishing(c);
getObjectsManager()->startPublishing(c, PublicationPolicy::Once);
}
} // void SliceTrendingTask::generatePlots()

Expand Down
15 changes: 2 additions & 13 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,7 @@ void TaskRunner::start(ServiceRegistryRef services)
}

try {
auto objectsPublishedBeforeStart = mObjectsManager->getNumberPublishedObjects();
startOfActivity();
auto objectsPublishedAfterStart = mObjectsManager->getNumberPublishedObjects();
mNumberObjectsRegisteredAtStart = static_cast<int64_t>(objectsPublishedAfterStart) - objectsPublishedBeforeStart;

startCycle();
} catch (...) {
// we catch here because we don't know where it will go in DPL's CallbackService
Expand All @@ -370,16 +366,7 @@ void TaskRunner::stop()
mCycleNumber++;
mCycleOn = false;
}
auto objectsPublishedBeforeStop = mObjectsManager->getNumberPublishedObjects();
endOfActivity();
auto objectsPublishedAfterStop = mObjectsManager->getNumberPublishedObjects();
auto numberObjectsDeregisteredAtStop = static_cast<int64_t>(objectsPublishedBeforeStop) - objectsPublishedAfterStop;
if (mNumberObjectsRegisteredAtStart != numberObjectsDeregisteredAtStop) {
ILOG(Error, Support) << "The number of objects registered at Start Of Run is not equal to the number deregistered at End Of Run "
<< "(" << mNumberObjectsRegisteredAtStart << " vs. " << numberObjectsDeregisteredAtStop << ")."
<< " This should be fixed, otherwise the QC task might crash at second Start Of Run!!!" << ENDM;
}

mTask->reset();
} catch (...) {
// we catch here because we don't know where it will go in DPL's CallbackService
Expand Down Expand Up @@ -469,6 +456,7 @@ void TaskRunner::endOfActivity()

mTask->endOfActivity(mObjectsManager->getActivity());
mObjectsManager->removeAllFromServiceDiscovery();
mObjectsManager->stopPublishing(PublicationPolicy::ThroughStop);

double rate = mTotalNumberObjectsPublished / mTimerTotalDurationActivity.getTime();
mCollector->send(Metric{ "qc_objects_published" }.addValue(rate, "per_second_whole_run"));
Expand Down Expand Up @@ -597,6 +585,7 @@ int TaskRunner::publish(DataAllocator& outputs)
*array);

mLastPublicationDuration = publicationDurationTimer.getTime();
mObjectsManager->stopPublishing(PublicationPolicy::Once);
return objectsPublished;
}

Expand Down
10 changes: 5 additions & 5 deletions Framework/src/TrendingTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ void TrendingTask::initialize(Trigger, framework::ServiceRegistryRef services)
{
// removing leftovers from any previous runs
mTrend.reset();
for (auto [name, object] : mPlots) {
getObjectsManager()->stopPublishing(object);
for (auto& [name, object] : mPlots) {
delete object;
object = nullptr;
}
mPlots.clear();
mReductors.clear();
Expand Down Expand Up @@ -127,7 +127,7 @@ void TrendingTask::initialize(Trigger, framework::ServiceRegistryRef services)
}
}
if (mConfig.producePlotsOnUpdate) {
getObjectsManager()->startPublishing(mTrend.get());
getObjectsManager()->startPublishing(mTrend.get(), PublicationPolicy::ThroughStop);
}
}

Expand Down Expand Up @@ -217,8 +217,8 @@ void TrendingTask::generatePlots()
// Before we generate any new plots, we have to delete existing under the same names.
// It seems that ROOT cannot handle an existence of two canvases with a common name in the same process.
if (mPlots.count(plot.name)) {
getObjectsManager()->stopPublishing(mPlots[plot.name]);
delete mPlots[plot.name];
mPlots[plot.name] = nullptr;
}

// we determine the order of the plot, i.e. if it is a histogram (1), graph (2), or any higher dimension.
Expand Down Expand Up @@ -298,6 +298,6 @@ void TrendingTask::generatePlots()
}

mPlots[plot.name] = c;
getObjectsManager()->startPublishing(c);
getObjectsManager()->startPublishing(c, PublicationPolicy::Once);
}
}
Loading

0 comments on commit 0b697ea

Please sign in to comment.