Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ressurect the FakeDataApplication #149

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/DFApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "appmodel/DataStoreConf.hpp"
#include "appmodel/DataWriterConf.hpp"
#include "appmodel/DataWriterModule.hpp"
#include "appmodel/FakeDataApplication.hpp"
#include "appmodel/FakeDataProdConf.hpp"
#include "appmodel/FilenameParams.hpp"
#include "appmodel/NetworkConnectionDescriptor.hpp"
#include "appmodel/NetworkConnectionRule.hpp"
Expand Down Expand Up @@ -135,6 +137,45 @@ fill_sourceid_object_from_app(conffwk::Configuration* confdb,
sidNetObj.set_objs("source_ids", source_id_objs);
}

inline void
fill_sourceid_object_from_app(conffwk::Configuration* confdb,
const std::string& dbfile,
const FakeDataApplication* fdapp,
const conffwk::ConfigObject* netConn,
conffwk::ConfigObject& sidNetObj,
std::vector<std::shared_ptr<conffwk::ConfigObject>> sidObjs)
{
sidNetObj.set_obj("netconn", netConn);

std::vector<const conffwk::ConfigObject*> source_id_objs;
std::vector<uint32_t> app_source_ids;

for (auto fdp_res : fdapp->get_contains()) {

// get the readout groups and the interfaces and streams therein; 1 reaout group corresponds to 1 data reader
// module
auto fdpc = fdp_res->cast<appmodel::FakeDataProdConf>();

if (!fdpc) {
continue;
}

app_source_ids.push_back(fdpc->get_source_id());
}

for (auto& source_id : app_source_ids) {
auto stream_sid_obj = std::make_shared<conffwk::ConfigObject>();
std::string streamSidUid(fdapp->UID() + "SourceIDConf" + std::to_string(source_id));
confdb->create(dbfile, "SourceIDConf", streamSidUid, *stream_sid_obj);
stream_sid_obj->set_by_val<uint32_t>("sid", source_id);
stream_sid_obj->set_by_val<std::string>("subsystem", "Detector_Readout");
sidObjs.push_back(stream_sid_obj);
source_id_objs.push_back(sidObjs.back().get());
}

sidNetObj.set_objs("source_ids", source_id_objs);
}

std::vector<const confmodel::DaqModule*>
DFApplication::generate_modules(conffwk::Configuration* confdb,
const std::string& dbfile,
Expand Down Expand Up @@ -218,11 +259,12 @@ DFApplication::generate_modules(conffwk::Configuration* confdb,
for (auto app : sessionApps) {
auto smartapp = app->cast<appmodel::SmartDaqApplication>();
auto roapp = app->cast<appmodel::ReadoutApplication>();
auto fdapp = app->cast<appmodel::FakeDataApplication>();
auto dfapp = app->cast<appmodel::DFApplication>();
if (smartapp == nullptr || dfapp != nullptr)
continue;
auto src_id_check = smartapp->get_source_id();
if (roapp == nullptr && src_id_check == nullptr) {
if (roapp == nullptr && fdapp == nullptr && src_id_check == nullptr) {
continue;
}

Expand All @@ -241,6 +283,8 @@ DFApplication::generate_modules(conffwk::Configuration* confdb,
confdb->create(dbfile, "SourceIDToNetworkConnection", sidToNetUid, sidNetObjs.back());
if (roapp != nullptr) {
fill_sourceid_object_from_app(confdb, dbfile, roapp, &dreqNetObjs.back(), sidNetObjs.back(), sidObjs);
} else if (fdapp != nullptr) {
fill_sourceid_object_from_app(confdb, dbfile, fdapp, &dreqNetObjs.back(), sidNetObjs.back(), sidObjs);
} else {
fill_sourceid_object_from_app(smartapp, &dreqNetObjs.back(), sidNetObjs.back());
}
Expand Down
255 changes: 126 additions & 129 deletions src/FakeDataApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

#include "ModuleFactory.hpp"

#include "oks/kernel.hpp"
#include "conffwk/Configuration.hpp"
#include "oks/kernel.hpp"

#include "confmodel/Connection.hpp"
#include "confmodel/NetworkConnection.hpp"
Expand All @@ -21,8 +21,8 @@
#include "confmodel/Session.hpp"

#include "appmodel/FakeDataApplication.hpp"
#include "appmodel/FakeDataProdModule.hpp"
#include "appmodel/FakeDataProdConf.hpp"
#include "appmodel/FakeDataProdModule.hpp"
#include "appmodel/FragmentAggregatorModule.hpp"
#include "appmodel/NetworkConnectionDescriptor.hpp"
#include "appmodel/NetworkConnectionRule.hpp"
Expand All @@ -49,138 +49,135 @@ static ModuleFactory::Registrator __reg__("FakeDataApplication",
});

std::vector<const confmodel::DaqModule*>
FakeDataApplication::generate_modules(conffwk::Configuration* /*confdb*/,
const std::string& /*dbfile*/,
const confmodel::Session* /*session*/) const
FakeDataApplication::generate_modules(conffwk::Configuration* confdb,
const std::string& dbfile,
const confmodel::Session* session) const
{
// oks::OksFile::set_nolock_mode(true);

std::vector<const confmodel::DaqModule*> modules;

// // Process the queue rules looking for inputs to our DL/TP handler modules
// const QueueDescriptor* dlhReqInputQDesc = nullptr;
// const QueueDescriptor* faOutputQDesc = nullptr;

// for (auto rule : get_queue_rules()) {
// auto destination_class = rule->get_destination_class();
// auto data_type = rule->get_descriptor()->get_data_type();
// if (destination_class == "FakeDataProdModule") {
// if (data_type == "DataRequest") {
// dlhReqInputQDesc = rule->get_descriptor();
// }
// } else if (destination_class == "FragmentAggregatorModule") {
// faOutputQDesc = rule->get_descriptor();
// }
// }
// // Process the network rules looking for the Fragment Aggregator and TP handler data reuest inputs
// const NetworkConnectionDescriptor* faNetDesc = nullptr;
// const NetworkConnectionDescriptor* tsNetDesc = nullptr;
// for (auto rule : get_network_rules()) {
// auto endpoint_class = rule->get_endpoint_class();
// if (endpoint_class == "FragmentAggregatorModule") {
// faNetDesc = rule->get_descriptor();
// } else if (endpoint_class == "FakeDataProdModule") {
// tsNetDesc = rule->get_descriptor();
// }
// }

// // Create here the Queue on which all data fragments are forwarded to the fragment aggregator
// // and a container for the queues of data request to TP handler and DLH
// if (faOutputQDesc == nullptr) {
// throw(BadConf(ERS_HERE, "No fragment output queue descriptor given"));
// }
// conffwk::ConfigObject faQueueObj;
// std::vector<const confmodel::Connection*> faOutputQueues;

// std::string taFragQueueUid(faOutputQDesc->get_uid_base() + UID());
// confdb->create(dbfile, "Queue", taFragQueueUid, faQueueObj);
// faQueueObj.set_by_val<std::string>("data_type", faOutputQDesc->get_data_type());
// faQueueObj.set_by_val<std::string>("queue_type", faOutputQDesc->get_queue_type());
// faQueueObj.set_by_val<uint32_t>("capacity", faOutputQDesc->get_capacity());

// if (dlhReqInputQDesc == nullptr) {
// throw(BadConf(ERS_HERE, "No DLH request input queue descriptor given"));
// }

// // Create a FakeDataProdModule for each stream of this Readout Group
// // for (auto roGroup : get_readout_groups()) {
// for (auto roGroup : get_contains()) {
// if (roGroup->disabled(*session)) {
// TLOG_DEBUG(7) << "Ignoring disabled ReadoutGroup " << roGroup->UID();
// continue;
// }
// auto rset = roGroup->cast<confmodel::ReadoutGroup>();
// if (rset == nullptr) {
// throw(BadConf(ERS_HERE, "FakeDataApplication contains something other than ReadoutGroup"));
// }
// std::vector<const confmodel::Connection*> outputQueues;
// for (auto res : rset->get_contains()) {
// auto stream = res->cast<appmodel::FakeDataProdConf>();
// if (stream == nullptr) {
// throw(BadConf(ERS_HERE, "ReadoutGroup contains something other than FakeDataProdConf"));
// }
// if (stream->disabled(*session)) {
// TLOG_DEBUG(7) << "Ignoring disabled FakeDataProdConf " << stream->UID();
// continue;
// }
// auto id = stream->get_source_id();
// std::string uid("FakeDataProdModule-" + std::to_string(id));
// conffwk::ConfigObject dlhObj;
// TLOG_DEBUG(7) << "creating OKS configuration object for FakeDataProdModule";
// confdb->create(dbfile, "FakeDataProdModule", uid, dlhObj);
// dlhObj.set_obj("configuration", &stream->config_object());

// // Time Sync network connection
// std::string tsStreamUid = tsNetDesc->get_uid_base() + std::to_string(id);
// auto tsServiceObj = tsNetDesc->get_associated_service()->config_object();
// conffwk::ConfigObject tsNetObj;
// confdb->create(dbfile, "NetworkConnection", tsStreamUid, tsNetObj);
// tsNetObj.set_by_val<std::string>("connection_type", tsNetDesc->get_connection_type());
// tsNetObj.set_by_val<std::string>("data_type", tsNetDesc->get_data_type());
// tsNetObj.set_obj("associated_service", &tsServiceObj);

// dlhObj.set_objs("outputs", { &faQueueObj, &tsNetObj });

// std::string reqQueueUid(dlhReqInputQDesc->get_uid_base() + std::to_string(id));
// conffwk::ConfigObject reqQueueObj;
// confdb->create(dbfile, "QueueWithSourceId", reqQueueUid, reqQueueObj);
// reqQueueObj.set_by_val<std::string>("data_type", dlhReqInputQDesc->get_data_type());
// reqQueueObj.set_by_val<std::string>("queue_type", dlhReqInputQDesc->get_queue_type());
// reqQueueObj.set_by_val<uint32_t>("capacity", dlhReqInputQDesc->get_capacity());
// reqQueueObj.set_by_val<uint32_t>("source_id", stream->get_source_id());
// // Add the requessts queue dal pointer to the outputs of the FragmentAggregatorModule
// faOutputQueues.push_back(confdb->get<confmodel::Connection>(reqQueueUid));

// dlhObj.set_objs("inputs", { &reqQueueObj });

// modules.push_back(confdb->get<FakeDataProdModule>(uid));
// }
// }

// // Finally create Fragment Aggregator
// std::string faUid("fragmentaggregator-" + UID());
// conffwk::ConfigObject faObj;
// TLOG_DEBUG(7) << "creating OKS configuration object for Fragment Aggregator class ";
// confdb->create(dbfile, "FragmentAggregatorModule", faUid, faObj);

// // Add network connection to TRBs
// auto faServiceObj = faNetDesc->get_associated_service()->config_object();
// std::string faNetUid = faNetDesc->get_uid_base() + UID();
// conffwk::ConfigObject faNetObj;
// confdb->create(dbfile, "NetworkConnection", faNetUid, faNetObj);
// faNetObj.set_by_val<std::string>("connection_type", faNetDesc->get_connection_type());
// faNetObj.set_by_val<std::string>("data_type", faNetDesc->get_data_type());
// faNetObj.set_obj("associated_service", &faServiceObj);

// // Add output queueus of data requests
// std::vector<const conffwk::ConfigObject*> qObjs;
// for (auto q : faOutputQueues) {
// qObjs.push_back(&q->config_object());
// }
// faObj.set_objs("inputs", { &faNetObj, &faQueueObj });
// faObj.set_objs("outputs", qObjs);

// modules.push_back(confdb->get<FragmentAggregatorModule>(faUid));
// Process the queue rules looking for inputs to our DL/TP handler modules
const QueueDescriptor* dlhReqInputQDesc = nullptr;
const QueueDescriptor* faOutputQDesc = nullptr;

for (auto rule : get_queue_rules()) {
auto destination_class = rule->get_destination_class();
auto data_type = rule->get_descriptor()->get_data_type();
if (destination_class == "FakeDataProdModule") {
if (data_type == "DataRequest") {
dlhReqInputQDesc = rule->get_descriptor();
}
} else if (destination_class == "FragmentAggregatorModule") {
faOutputQDesc = rule->get_descriptor();
}
}
if (faOutputQDesc == nullptr) {
throw(BadConf(ERS_HERE, "No fragment output queue descriptor given"));
}
if (dlhReqInputQDesc == nullptr) {
throw(BadConf(ERS_HERE, "No DLH request input queue descriptor given"));
}
// Process the network rules looking for the Fragment Aggregator and TP handler data reuest inputs
const NetworkConnectionDescriptor* faNetDesc = nullptr;
const NetworkConnectionDescriptor* tsNetDesc = nullptr;
for (auto rule : get_network_rules()) {
auto endpoint_class = rule->get_endpoint_class();
if (endpoint_class == "FragmentAggregatorModule") {
faNetDesc = rule->get_descriptor();
} else if (endpoint_class == "FakeDataProdModule") {
tsNetDesc = rule->get_descriptor();
}
}
if (faNetDesc == nullptr) {
throw(BadConf(ERS_HERE, "No Fragment output network descriptor given"));
}
if (tsNetDesc == nullptr) {
throw(BadConf(ERS_HERE, "No TimeSync output network descriptor given"));
}

// Create here the Queue on which all data fragments are forwarded to the fragment aggregator
// and a container for the queues of data request to TP handler and DLH

conffwk::ConfigObject faQueueObj;
std::vector<const confmodel::Connection*> faOutputQueues;

std::string taFragQueueUid(faOutputQDesc->get_uid_base() + UID());
confdb->create(dbfile, "Queue", taFragQueueUid, faQueueObj);
faQueueObj.set_by_val<std::string>("data_type", faOutputQDesc->get_data_type());
faQueueObj.set_by_val<std::string>("queue_type", faOutputQDesc->get_queue_type());
faQueueObj.set_by_val<uint32_t>("capacity", faOutputQDesc->get_capacity());


// Create a FakeDataProdModule for each stream of this Readout Group
for (auto fdpConf : get_contains()) {
if (fdpConf->disabled(*session)) {
TLOG_DEBUG(7) << "Ignoring disabled FakeDataProdConf " << fdpConf->UID();
continue;
}

auto stream = fdpConf->cast<appmodel::FakeDataProdConf>();
if (stream == nullptr) {
throw(BadConf(ERS_HERE, "ReadoutGroup contains something other than FakeDataProdConf"));
}

auto id = stream->get_source_id();
std::string uid("FakeDataProdModule-" + std::to_string(id));
conffwk::ConfigObject dlhObj;
TLOG_DEBUG(7) << "creating OKS configuration object for FakeDataProdModule";
confdb->create(dbfile, "FakeDataProdModule", uid, dlhObj);
dlhObj.set_obj("configuration", &stream->config_object());

// Time Sync network connection
std::string tsStreamUid = tsNetDesc->get_uid_base() + std::to_string(id);
auto tsServiceObj = tsNetDesc->get_associated_service()->config_object();
conffwk::ConfigObject tsNetObj;
confdb->create(dbfile, "NetworkConnection", tsStreamUid, tsNetObj);
tsNetObj.set_by_val<std::string>("connection_type", tsNetDesc->get_connection_type());
tsNetObj.set_by_val<std::string>("data_type", tsNetDesc->get_data_type());
tsNetObj.set_obj("associated_service", &tsServiceObj);

dlhObj.set_objs("outputs", { &faQueueObj, &tsNetObj });

std::string reqQueueUid(dlhReqInputQDesc->get_uid_base() + std::to_string(id));
conffwk::ConfigObject reqQueueObj;
confdb->create(dbfile, "QueueWithSourceId", reqQueueUid, reqQueueObj);
reqQueueObj.set_by_val<std::string>("data_type", dlhReqInputQDesc->get_data_type());
reqQueueObj.set_by_val<std::string>("queue_type", dlhReqInputQDesc->get_queue_type());
reqQueueObj.set_by_val<uint32_t>("capacity", dlhReqInputQDesc->get_capacity());
reqQueueObj.set_by_val<uint32_t>("source_id", stream->get_source_id());
// Add the requessts queue dal pointer to the outputs of the FragmentAggregatorModule
faOutputQueues.push_back(confdb->get<confmodel::Connection>(reqQueueUid));

dlhObj.set_objs("inputs", { &reqQueueObj });

modules.push_back(confdb->get<FakeDataProdModule>(uid));
}

// Finally create Fragment Aggregator
std::string faUid("fragmentaggregator-" + UID());
conffwk::ConfigObject faObj;
TLOG_DEBUG(7) << "creating OKS configuration object for Fragment Aggregator class ";
confdb->create(dbfile, "FragmentAggregatorModule", faUid, faObj);

// Add network connection to TRBs
auto faServiceObj = faNetDesc->get_associated_service()->config_object();
std::string faNetUid = faNetDesc->get_uid_base() + UID();
conffwk::ConfigObject faNetObj;
confdb->create(dbfile, "NetworkConnection", faNetUid, faNetObj);
faNetObj.set_by_val<std::string>("connection_type", faNetDesc->get_connection_type());
faNetObj.set_by_val<std::string>("data_type", faNetDesc->get_data_type());
faNetObj.set_obj("associated_service", &faServiceObj);

// Add output queueus of data requests
std::vector<const conffwk::ConfigObject*> qObjs;
for (auto q : faOutputQueues) {
qObjs.push_back(&q->config_object());
}
faObj.set_objs("inputs", { &faNetObj, &faQueueObj });
faObj.set_objs("outputs", qObjs);

modules.push_back(confdb->get<FragmentAggregatorModule>(faUid));

// oks::OksFile::set_nolock_mode(false);
return modules;
Expand Down
Loading
Loading