Skip to content

Commit

Permalink
Qc 978 Propagate QC flags to BK/RCT (#2320)
Browse files Browse the repository at this point in the history
* new BookkeepingQualitySink dataprocessor

Bookkeeping dataprocessor that gathers quality objects from all checks marked exportToBookkeeping in
config. It converts quality object to flags which are sent via gRPC to BKP/RCT at the endOfRun/stop
(whatever comes first).

* correctly distinquishing between Sync, Async and MC provenance

* more focused logging

* aggregator added to generator

* group by detectors

* added tests to InfrastractureGenerator

* BookkeepingQualitySink has customizeIfrastracture call

* ignoring syncQC provenance in send call until BKP implements it

---------

Co-authored-by: Michal Tichák <michal.tichak@cern.ch>
  • Loading branch information
justonedev1 and Michal Tichák authored Jun 13, 2024
1 parent e94056e commit c3e498d
Show file tree
Hide file tree
Showing 13 changed files with 448 additions and 13 deletions.
5 changes: 4 additions & 1 deletion Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_library(O2QualityControl
src/ActivityHelpers.cxx
src/ObjectsManager.cxx
src/CheckRunner.cxx
src/BookkeepingQualitySink.cxx
src/AggregatorRunner.cxx
src/CheckRunnerFactory.cxx
src/AggregatorRunnerFactory.cxx
Expand Down Expand Up @@ -282,7 +283,8 @@ set(TEST_SRCS
test/testUserCodeInterface.cxx
test/testStringUtils.cxx
test/testRunnerUtils.cxx
)
test/testBookkeepingQualitySink.cxx
)

set(TEST_ARGS
""
Expand All @@ -303,6 +305,7 @@ set(TEST_ARGS
""
""
""
""
)

list(LENGTH TEST_SRCS count)
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/AggregatorSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct AggregatorSpec {
UpdatePolicyType updatePolicy = UpdatePolicyType::OnAny;
// advanced
bool active = true;
bool exportToBookkeeping = false;
core::CustomParameters customParameters;
};

Expand Down
60 changes: 60 additions & 0 deletions Framework/include/QualityControl/BookkeepingQualitySink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
/// \file BookkeepingQualitySink.h
/// \author Michal Tichak
///

#ifndef QUALITYCONTROL_BOOKKEEPINGQUALITYSINK_H
#define QUALITYCONTROL_BOOKKEEPINGQUALITYSINK_H

#include <DataFormatsQualityControl/QualityControlFlagCollection.h>
#include <Framework/CompletionPolicy.h>
#include <Framework/DataProcessorSpec.h>
#include <Framework/Task.h>
#include "Provenance.h"

namespace o2::quality_control::core
{

// This class gathers all QualityObjects from it's inputs, converting them to flags + sending them to grpc RCT/BKP when stop() is invoked
class BookkeepingQualitySink : public framework::Task
{
public:
// we are using map here instead of the set, because items in the map are changeable, however items of the set are not.
using FlagsMap = std::unordered_map<std::string /*detector*/, std::unique_ptr<QualityControlFlagCollection>>;
using SendCallback = std::function<void(const std::string& grpcUri, const FlagsMap&, Provenance)>;

// sendCallback is mainly used for testing without the necessity to do grpc calls
BookkeepingQualitySink(const std::string& grpcUri, Provenance, SendCallback sendCallback = send);

void run(framework::ProcessingContext&) override;

void endOfStream(framework::EndOfStreamContext& context) override;
void stop() override;

static void customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies);
static framework::DataProcessorLabel getLabel() { return { "BookkeepingQualitySink" }; }
static void send(const std::string& grpcUri, const FlagsMap&, Provenance);

private:
std::string mGrpcUri;
Provenance mProvenance;
SendCallback mSendCallback;
FlagsMap mQualityObjectsMap;

void sendAndClear();
};

} // namespace o2::quality_control::core

#endif
1 change: 0 additions & 1 deletion Framework/include/QualityControl/CheckConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <string>
#include <vector>
#include <unordered_map>

#include <Framework/DataProcessorSpec.h>
#include "QualityControl/UpdatePolicyType.h"
Expand Down
3 changes: 2 additions & 1 deletion Framework/include/QualityControl/CheckSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ struct CheckSpec {
UpdatePolicyType updatePolicy = UpdatePolicyType::OnAny;
// advanced
bool active = true;
bool exportToBookkeeping = false;
core::CustomParameters customParameters;
};

} // namespace o2::quality_control::checker

#endif //QUALITYCONTROL_CHECKSPEC_H
#endif // QUALITYCONTROL_CHECKSPEC_H
3 changes: 2 additions & 1 deletion Framework/include/QualityControl/InfrastructureGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class InfrastructureGenerator
static void generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generateAggregator(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generatePostProcessing(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generateBookkeepingQualitySink(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
};

} // namespace core
Expand Down Expand Up @@ -302,4 +303,4 @@ inline void customizeInfrastructure(std::vector<framework::CompletionPolicy>& po

} // namespace o2::quality_control

#endif //QC_CORE_INFRASTRUCTUREGENERATOR_H
#endif // QC_CORE_INFRASTRUCTUREGENERATOR_H
46 changes: 46 additions & 0 deletions Framework/include/QualityControl/Provenance.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
/// \file Provenance.h
/// \author Michal Tichak
///

#include <stdexcept>
#include <string>

namespace o2::quality_control::core
{

enum class Provenance {
SyncQC,
AsyncQC,
MCQC
};

inline Provenance toEnum(const std::string& provenance)
{
if (provenance == "qc_mc") {
return Provenance::MCQC;
}

if (provenance == "qc") {
return Provenance::SyncQC;
}

if (provenance == "qc_async") {
return Provenance::AsyncQC;
}

throw std::runtime_error{ "unknown provenance flag: " + provenance };
}

} // namespace o2::quality_control::core
137 changes: 137 additions & 0 deletions Framework/src/BookkeepingQualitySink.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2024 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
/// \file BookkeepingQualitySink.cxx
/// \author Michal Tichak
///

#include "QualityControl/BookkeepingQualitySink.h"
#include <Framework/DataRefUtils.h>
#include <Framework/InputRecordWalker.h>
#include <Framework/CompletionPolicyHelpers.h>
#include <Framework/DeviceSpec.h>
#include "QualityControl/Bookkeeping.h"
#include "QualityControl/QualitiesToFlagCollectionConverter.h"
#include "QualityControl/QualityObject.h"
#include "QualityControl/QcInfoLogger.h"
#include <BookkeepingApi/QcFlagServiceClient.h>
#include <BookkeepingApi/BkpClientFactory.h>
#include <stdexcept>

namespace o2::quality_control::core
{

void BookkeepingQualitySink::customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies)
{
using namespace o2::framework;
auto matcher = [label = BookkeepingQualitySink::getLabel()](auto const& device) {
return std::find(device.labels.begin(), device.labels.end(), label) != device.labels.end();
};
policies.emplace_back(CompletionPolicyHelpers::consumeWhenAny("BookkeepingQualitySinkCompletionPolicy", matcher));
}

void BookkeepingQualitySink::send(const std::string& grpcUri, const BookkeepingQualitySink::FlagsMap& flags, Provenance provenance)
{
auto bkpClient = o2::bkp::api::BkpClientFactory::create(grpcUri);
auto& qcClient = bkpClient->qcFlag();

for (const auto& [detector, flagCollection] : flags) {
ILOG(Info, Support) << "Sending " << flags.size() << " flags for detector: " << detector << ENDM;

if (flagCollection->size() == 0) {
continue;
}

std::vector<QcFlag> bkpQcFlags{};
for (const auto& flag : *flagCollection) {
// BKP uses start/end of run for missing time values, so we are using this functionality in order to avoid
// determining these values by ourselves (see TaskRunner::start() for details). mtichak checked with mboulais that
// it is okay to do so.
bkpQcFlags.emplace_back(QcFlag{
.flagTypeId = flag.getFlag().getID(),
.from = flag.getStart() == gFullValidityInterval.getMin() ? std::nullopt : std::optional<uint64_t>{ flag.getStart() },
.to = flag.getEnd() == gFullValidityInterval.getMax() ? std::nullopt : std::optional<uint64_t>{ flag.getEnd() },
.origin = flag.getSource(),
.comment = flag.getComment() });
}

try {
switch (provenance) {
case Provenance::SyncQC:
// TODO: add a sync function call when Bookkeeping implements it.
break;
case Provenance::AsyncQC:
qcClient->createForDataPass(flagCollection->getRunNumber(), flagCollection->getPassName(), detector, bkpQcFlags);
break;
case Provenance::MCQC:
qcClient->createForSimulationPass(flagCollection->getRunNumber(), flagCollection->getPeriodName(), detector, bkpQcFlags);
break;
}
} catch (const std::runtime_error& err) {
ILOG(Error, Support) << "Failed to send flags for detector: " << detector
<< " with error: " << err.what() << ENDM;
}
}
}

BookkeepingQualitySink::BookkeepingQualitySink(const std::string& grpcUri, Provenance provenance, SendCallback sendCallback)
: mGrpcUri{ grpcUri }, mProvenance{ provenance }, mSendCallback{ sendCallback } {}

auto merge(std::unique_ptr<QualityControlFlagCollection>&& collection, const std::unique_ptr<QualityObject>& qualityObject) -> std::unique_ptr<QualityControlFlagCollection>
{
QualitiesToFlagCollectionConverter converter(std::move(collection), qualityObject->getPath());
converter(*qualityObject);
return converter.getResult();
}

auto collectionForQualityObject(const QualityObject& qualityObject) -> std::unique_ptr<QualityControlFlagCollection>
{
return std::make_unique<QualityControlFlagCollection>(
qualityObject.getName(),
qualityObject.getDetectorName(),
gFullValidityInterval,
qualityObject.getActivity().mId,
qualityObject.getActivity().mPeriodName,
qualityObject.getActivity().mPassName,
qualityObject.getActivity().mProvenance);
}

void BookkeepingQualitySink::run(framework::ProcessingContext& context)
{
for (auto const& ref : framework::InputRecordWalker(context.inputs())) {
try {
auto qualityObject = framework::DataRefUtils::as<QualityObject>(ref);
auto [emplacedIt, _] = mQualityObjectsMap.emplace(qualityObject->getDetectorName(), collectionForQualityObject(*qualityObject));
emplacedIt->second = merge(std::move(emplacedIt->second), qualityObject);
} catch (...) {
ILOG(Warning, Support) << "Unexpected message received, QualityObject expected" << ENDM;
}
}
}

void BookkeepingQualitySink::endOfStream(framework::EndOfStreamContext&)
{
sendAndClear();
}

void BookkeepingQualitySink::stop()
{
sendAndClear();
}

void BookkeepingQualitySink::sendAndClear()
{
mSendCallback(mGrpcUri, mQualityObjectsMap, mProvenance);
mQualityObjectsMap.clear();
}

} // namespace o2::quality_control::core
Loading

0 comments on commit c3e498d

Please sign in to comment.