Skip to content

Commit

Permalink
revert the multiple pipelines with control module idea
Browse files Browse the repository at this point in the history
  • Loading branch information
mraduldubey committed Jul 30, 2024
1 parent 2a9e70d commit 26bc046
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 84 deletions.
54 changes: 26 additions & 28 deletions base/include/AbsControlModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,38 @@
class PipeLine;
class AbsControlModuleProps : public ModuleProps {
public:
AbsControlModuleProps() {}
AbsControlModuleProps() {}
};

class AbsControlModule : public Module {
public:
AbsControlModule(AbsControlModuleProps _props);
~AbsControlModule();
bool init();
bool term();
std::string enrollModule(boost::shared_ptr<PipeLine> p, std::string role,
boost::shared_ptr<Module> module);
std::pair<bool, boost::shared_ptr<Module>> getModulefRole(PipeLine p,
std::string role);
virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {}
virtual void handleMMQExport(Command cmd, bool priority = false) {}
virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {}
virtual void handleSendMMQTSCmd(uint64_t mmqBeginTS, uint64_t mmqEndTS, bool priority = false) {}
virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {}
virtual void handleGoLive(bool goLive, bool priority) {}
virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {}
boost::container::deque<boost::shared_ptr<Module>> pipelineModules;
std::map<std::string, boost::shared_ptr<Module>> moduleRoles;
AbsControlModule(AbsControlModuleProps _props);
~AbsControlModule();
bool init();
bool term();
bool enrollModule(std::string role, boost::shared_ptr<Module> module);
boost::shared_ptr<Module> getModuleofRole(std::string role);
virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {}
virtual void handleMMQExport(Command cmd, bool priority = false) {}
virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {}
virtual void handleSendMMQTSCmd(uint64_t mmqBeginTS, uint64_t mmqEndTS, bool priority = false) {}
virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {}
virtual void handleGoLive(bool goLive, bool priority) {}
virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {}
boost::container::deque<boost::shared_ptr<Module>> pipelineModules;
std::map<std::string, boost::shared_ptr<Module>> moduleRoles;

protected:
bool process(frame_container &frames);
bool handleCommand(Command::CommandType type, frame_sp &frame);
bool handlePropsChange(frame_sp &frame);
virtual void sendEOS() {}
virtual void sendEOS(frame_sp& frame) {}
virtual void sendEOPFrame() {}
bool process(frame_container& frames);
bool handleCommand(Command::CommandType type, frame_sp& frame);
bool handlePropsChange(frame_sp& frame);
virtual void sendEOS() {}
virtual void sendEOS(frame_sp& frame) {}
virtual void sendEOPFrame() {}

private:
class Detail;
boost::shared_ptr<Detail> mDetail;
void handleError(const ErrorObject &error);
void handleHealthCallback(const HealthObject &healthObj);
class Detail;
boost::shared_ptr<Detail> mDetail;
void handleError(const ErrorObject& error);
void handleHealthCallback(const HealthObject& healthObj);
};
102 changes: 47 additions & 55 deletions base/src/AbsControlModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,98 +8,90 @@
class AbsControlModule::Detail
{
public:
Detail(AbsControlModuleProps& _props) : mProps(_props)
{
}
Detail(AbsControlModuleProps& _props) : mProps(_props)
{
}

~Detail()
{
}
~Detail()
{
}

std::string getPipelineRole(std::string pName, std::string role)
{
return pName + "_" + role;
}

AbsControlModuleProps mProps;
AbsControlModuleProps mProps;
};

AbsControlModule::AbsControlModule(AbsControlModuleProps _props)
:Module(CONTROL, "AbsControlModule", _props)
:Module(CONTROL, "AbsControlModule", _props)
{
mDetail.reset(new Detail(_props));
mDetail.reset(new Detail(_props));
}
AbsControlModule::~AbsControlModule() {}

bool AbsControlModule::handleCommand(Command::CommandType type, frame_sp& frame)
{
return true;
return true;
}

bool AbsControlModule::handlePropsChange(frame_sp& frame)
{
return true;
return true;
}

bool AbsControlModule::init()
{
if (!Module::init())
{
return false;
}
return true;
if (!Module::init())
{
return false;
}
return true;
}

bool AbsControlModule::term()
{
return Module::term();
return Module::term();
}

bool AbsControlModule::process(frame_container& frames)
{
// Commands are already processed by the time we reach here.
return true;
// Commands are already processed by the time we reach here.
return true;
}

void AbsControlModule::handleError(const ErrorObject &error)
void AbsControlModule::handleError(const ErrorObject& error)
{
LOG_ERROR << "Error in module " << error.getModuleName() << "Module Id"
<< error.getModuleId() << " (Code " << error.getErrorCode()
<< "): " << error.getErrorMessage();
LOG_ERROR << "Error in module " << error.getModuleName() << "Module Id"
<< error.getModuleId() << " (Code " << error.getErrorCode()
<< "): " << error.getErrorMessage();
}

void AbsControlModule::handleHealthCallback(const HealthObject &healthObj)
void AbsControlModule::handleHealthCallback(const HealthObject& healthObj)
{
LOG_ERROR << "Health Callback from module " << healthObj.getModuleId();
LOG_ERROR << "Health Callback from module " << healthObj.getModuleId();
}

std::string AbsControlModule::enrollModule(boost::shared_ptr<PipeLine> p, std::string role, boost::shared_ptr<Module> module)
bool AbsControlModule::enrollModule(std::string role, boost::shared_ptr<Module> module)
{
std::string pipelineRole = mDetail->getPipelineRole(p->getName(), role);
if (moduleRoles.find(pipelineRole) != moduleRoles.end())
{
std::string errMsg = "Enrollment Failed: This role <" + role + "> already registered with the Module <" + moduleRoles[pipelineRole]->getName() + "> in PipeLine <" + p->getName() + ">";
LOG_ERROR << errMsg;
throw AIPException(MODULE_ENROLLMENT_FAILED, errMsg);
}
moduleRoles[pipelineRole] = module;
module->registerErrorCallback(
[this](const ErrorObject &error) { handleError(error); });
if (module->getProps().enableHealthCallBack)
{
module->registerHealthCallback(
[this](const HealthObject &message) { handleHealthCallback(message); });
}
return pipelineRole;
moduleRoles[role] = module;
// NOTE: If you want error callback and health callback to work with a module, registering it with control is mandatory.
module->registerErrorCallback(
[this](const ErrorObject& error) { handleError(error); });
if (module->getProps().enableHealthCallBack)
{
module->registerHealthCallback(
[this](const HealthObject& message) { handleHealthCallback(message); });
}
return true;
}

std::pair<bool, boost::shared_ptr<Module>> AbsControlModule::getModulefRole(PipeLine p, std::string role)
boost::shared_ptr<Module> AbsControlModule::getModuleofRole(std::string role)
{
std::string pipelineRole = mDetail->getPipelineRole(p.getName(), role);
if (moduleRoles.find(pipelineRole) == moduleRoles.end())
{
return std::make_pair<bool, boost::shared_ptr<Module>>(false, nullptr);
}
std::pair<bool, boost::shared_ptr<Module>> res(true, moduleRoles[pipelineRole]);
return res;
boost::shared_ptr<Module> moduleWithRole = nullptr;
try
{
moduleWithRole = moduleRoles[role];
}
catch (std::out_of_range)
{
LOG_ERROR << "no module with the role <" << role << "> registered with the control module.";
}
return moduleWithRole;
}
3 changes: 2 additions & 1 deletion base/test/ImageEncodeCV_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ BOOST_AUTO_TEST_CASE(RGB_profile, *boost::unit_test::disabled())
p->addControlModule(mControl);
p->init();
mControl->init();
mControl->enrollModule(p, "Encode", m2);
// If you want error callbackand health callback to work with a module, registering it with control is mandatory.
mControl->enrollModule("Encode", m2);
p->run_all_threaded();
boost::this_thread::sleep_for(boost::chrono::seconds(3000));
p->stop();
Expand Down

0 comments on commit 26bc046

Please sign in to comment.