diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt index 864ad0c80..e16b7821a 100755 --- a/base/CMakeLists.txt +++ b/base/CMakeLists.txt @@ -627,6 +627,7 @@ SET(UT_FILES test/overlaymodule_tests.cpp test/testSignalGeneratorSrc_tests.cpp test/audioToTextXform_tests.cpp + test/simpleControlModuleTests.cpp ${ARM64_UT_FILES} ${CUDA_UT_FILES} ) diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index e0d04aea3..83e6d2f6a 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -18,6 +18,7 @@ class AbsControlModule : public Module { bool term(); bool enrollModule(std::string role, boost::shared_ptr module); boost::shared_ptr getModuleofRole(std::string role); + std::string printStatus(); virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {} virtual void handleMMQExport(Command cmd, bool priority = false) {} virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {} @@ -25,12 +26,18 @@ class AbsControlModule : public Module { virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {} virtual void handleGoLive(bool goLive, bool priority) {} virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {} - boost::container::deque> pipelineModules; - std::map> moduleRoles; + // Note: weak pointers to avoid cyclic dependency and mem leaks + std::map> moduleRoles; virtual void handleError(const APErrorObject &error) {} - virtual void handleHealthCallback(const APHealthObject &healthObj) {} - - + virtual void handleHealthCallback(const APHealthObject& healthObj); + /** + * @brief Register external function to be triggered on every health callBack that control modules recieves from the modules. + * For eg. In SimpleControlModule, this extention is called at the end of handleHealthCallback function. + * @param function with signature void f(const APHealthObject*, unsigned short) + * @return nothing. + */ + void registerHealthCallbackExtention( + boost::function callbackFunction); protected: bool process(frame_container& frames); bool handleCommand(Command::CommandType type, frame_sp& frame); @@ -38,7 +45,8 @@ class AbsControlModule : public Module { virtual void sendEOS() {} virtual void sendEOS(frame_sp& frame) {} virtual void sendEOPFrame() {} - + std::vector serializeControlModule(); + boost::function healthCallbackExtention; private: class Detail; boost::shared_ptr mDetail; diff --git a/base/include/Module.h b/base/include/Module.h index 842dd2df7..53955d780 100644 --- a/base/include/Module.h +++ b/base/include/Module.h @@ -21,6 +21,8 @@ #include "BufferMaker.h" #include "APCallback.h" +#define DUMMY_CTRL_EOP_PIN "dummy_ctrl_eop_pin" + using namespace std; class FrameContainerQueue; diff --git a/base/include/PipeLine.h b/base/include/PipeLine.h index b47b0d035..7e134dad4 100755 --- a/base/include/PipeLine.h +++ b/base/include/PipeLine.h @@ -27,7 +27,8 @@ class PipeLine { Status myStatus; typedef boost::shared_ptr item_type; typedef boost::container::deque< item_type > container_type; - + boost::shared_ptr controlModule = nullptr; + std::string mName; container_type modules; bool validate(); @@ -47,8 +48,8 @@ class PipeLine { void stop(); void term(); void wait_for_all(bool ignoreStatus = false); - void interrup_wait_for_all(); - void flushAllQueues(); + void interrupt_wait_for_all(); + void flushAllQueues(bool flushControlModuleQ=false); const char* getStatus(); }; diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h index be151463b..bb31d358a 100644 --- a/base/include/SimpleControlModule.h +++ b/base/include/SimpleControlModule.h @@ -15,13 +15,10 @@ class SimpleControlModule : public AbsControlModule ~SimpleControlModule() { - } - - void handleError(const APErrorObject &error); - void handleHealthCallback(const APHealthObject &healthObj); - - // ErrorCallbacks + std::string printStatus(); + void handleError(const APErrorObject& error) override; + void handleHealthCallback(const APHealthObject& healthObj) override; protected: void sendEOS(); void sendEOS(frame_sp& frame); diff --git a/base/src/AbsControlModule.cpp b/base/src/AbsControlModule.cpp index 387397d6f..f8cdc0f04 100644 --- a/base/src/AbsControlModule.cpp +++ b/base/src/AbsControlModule.cpp @@ -4,6 +4,7 @@ #include "Module.h" #include "Command.h" #include "PipeLine.h" +#include "boost/algorithm/string/join.hpp" class AbsControlModule::Detail { @@ -24,6 +25,7 @@ AbsControlModule::AbsControlModule(AbsControlModuleProps _props) { mDetail.reset(new Detail(_props)); } + AbsControlModule::~AbsControlModule() {} bool AbsControlModule::handleCommand(Command::CommandType type, frame_sp& frame) @@ -56,6 +58,12 @@ bool AbsControlModule::process(frame_container& frames) return true; } +/** + * @brief Enroll your module to use healthcallback, errorcallback and other control module functions + * @param boost::shared_ptr the module to be registered + * @param role unique string for role of the module + * @return bool. + */ bool AbsControlModule::enrollModule(std::string role, boost::shared_ptr module) { if (moduleRoles.find(role) != moduleRoles.end()) @@ -84,11 +92,54 @@ boost::shared_ptr AbsControlModule::getModuleofRole(std::string role) boost::shared_ptr moduleWithRole = nullptr; try { - moduleWithRole = moduleRoles[role]; + moduleWithRole = moduleRoles[role].lock(); } catch (std::out_of_range) { LOG_ERROR << "no module with the role <" << role << "> registered with the control module."; } return moduleWithRole; +} + +void AbsControlModule::registerHealthCallbackExtention( + boost::function callbackFunction) +{ + healthCallbackExtention = callbackFunction; +}; + +void AbsControlModule::handleHealthCallback(const APHealthObject& healthObj) +{ + LOG_INFO << "Health Callback from module " << healthObj.getModuleId(); + if (!healthCallbackExtention.empty()) + { + LOG_INFO << "Calling the registered Health Callback Extention..."; + healthCallbackExtention(&healthObj, 1); + } +} + +std::vector AbsControlModule::serializeControlModule() +{ + std::string spacedLineFmt = "\t-->"; + std::vector status; + status.push_back("Module <" + this->getId() + "> \n"); + status.push_back("Enrolled Modules \n"); + for (auto it : moduleRoles) + { + status.push_back("module <" + it.second.lock()->getId() + "> role <" + it.first + ">\n"); + std::string cbStatus = "registered for...\n"; + if (it.second.lock()->getProps().enableHealthCallBack) + { + cbStatus += spacedLineFmt + "health callbacks \n"; + } + cbStatus += spacedLineFmt + "error callbacks \n"; + status.push_back(spacedLineFmt + cbStatus); + } + return status; +} + +std::string AbsControlModule::printStatus() +{ + auto ser = boost::algorithm::join(serializeControlModule(), "|"); + LOG_INFO << ser; + return ser; } \ No newline at end of file diff --git a/base/src/Module.cpp b/base/src/Module.cpp index 661b7ee15..0e6a72f06 100644 --- a/base/src/Module.cpp +++ b/base/src/Module.cpp @@ -178,7 +178,10 @@ Module::Module(Kind nature, string name, ModuleProps _props) mPropsChangeMetadata.reset( new FrameMetadata(FrameMetadata::FrameType::PROPS_CHANGE)); } -Module::~Module() {} +Module::~Module() +{ + LOG_INFO << "Module destructor <" << myId << ">"; +} bool Module::term() { @@ -1295,6 +1298,16 @@ bool Module::step() { throw AIPException(CTRL_MODULE_INVALID_STATE, "Unexpected: " + std::to_string(frames.size()) + " frames remain unprocessed in control module."); } + if (mPlay) + { + mProfiler->startProcessingLap(); + ret = stepNonSource(frames); + mProfiler->endLap(mQue->size()); + } + else + { + ret = true; + } } else { @@ -1563,6 +1576,14 @@ bool Module::addEoPFrame(frame_container &frames) frames.insert(make_pair(me.first, frame)); } + if (myNature == CONTROL) + { + auto frame = frame_sp(new EoPFrame()); + auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL)); + frame->setMetadata((metadata)); + frames.insert(make_pair(DUMMY_CTRL_EOP_PIN, frame)); + } + // if sieve is disabled for atleast one connection - send additional EOP // frames - extra EOP frames downstream shouldn't matter if (mIsSieveDisabledForAny) @@ -1586,6 +1607,13 @@ bool Module::handleStop() { return true; } + if (myNature == CONTROL) + { + mRunning = false; + term(); + return true; + } + // handle SOURCE, TRANSFORM, SINK below mStopCount++; if (myNature != SOURCE && mStopCount != mForwardPins) { diff --git a/base/src/PipeLine.cpp b/base/src/PipeLine.cpp index 0bc0f9d79..83998c154 100755 --- a/base/src/PipeLine.cpp +++ b/base/src/PipeLine.cpp @@ -37,8 +37,8 @@ bool PipeLine::addControlModule(boost::shared_ptr cModule) for (int i = 0; i < modules.size(); i++) { modules[i]->addControlModule(cModule); - cModule->pipelineModules.push_back(modules[i]); } + controlModule = cModule; return true; } @@ -133,6 +133,10 @@ bool PipeLine::init() return false; } } + if (controlModule != nullptr) + { + controlModule->init(); + } myStatus = PL_INITED; LOG_TRACE << " Pipeline initialized"; return true; @@ -159,10 +163,11 @@ void PipeLine::run_all_threaded() m.myThread = boost::thread(ref(m)); Utils::setModuleThreadName(m.myThread, m.getId()); } - if ((modules[0]->controlModule) != nullptr) + if (controlModule != nullptr) { - Module& m = *(modules[0]->controlModule); + Module& m = *(controlModule); m.myThread = boost::thread(ref(m)); + Utils::setModuleThreadName(m.myThread, m.getId()); } mPlay = true; } @@ -183,7 +188,7 @@ void PipeLine::pause() i->get()->play(false); } } - + //Note: controlModule should not be paused mPlay = false; } @@ -196,7 +201,11 @@ void PipeLine::play() i->get()->play(true); } } - + // Control module should continue running anyways + if (controlModule != nullptr) + { + controlModule->play(true); + } mPlay = true; } @@ -215,6 +224,7 @@ void PipeLine::step() i->get()->queueStep(); } } + // should controlModule step ? } void PipeLine::stop() @@ -232,6 +242,10 @@ void PipeLine::stop() i->get()->stop(); } } + if (controlModule != nullptr) + { + controlModule->stop(); + } } void PipeLine::wait_for_all(bool ignoreStatus) @@ -247,10 +261,15 @@ void PipeLine::wait_for_all(bool ignoreStatus) Module& m = *(i->get()); m.myThread.join(); } + + if (controlModule != nullptr) + { + controlModule->myThread.join(); + } } -void PipeLine::interrup_wait_for_all() +void PipeLine::interrupt_wait_for_all() { if (myStatus > PL_STOPPING) { @@ -269,6 +288,12 @@ void PipeLine::interrup_wait_for_all() Module& m = *(i->get()); m.myThread.join(); } + + if (controlModule != nullptr) + { + controlModule->myThread.interrupt(); + controlModule->myThread.join(); + } myStatus = PL_STOPPED; } @@ -280,7 +305,12 @@ const char * PipeLine::getStatus() return StatusNames[myStatus]; } -void PipeLine::flushAllQueues() { +void PipeLine::flushAllQueues(bool flushControlModuleQ) +{ + if (flushControlModuleQ && controlModule != nullptr) + { + controlModule->flushQue(); + } for (auto& m : modules) { if (m->myNature == Module::Kind::SOURCE) @@ -288,4 +318,5 @@ void PipeLine::flushAllQueues() { m->flushQueRecursive(); } } -} \ No newline at end of file +} + diff --git a/base/src/SimpleControlModule.cpp b/base/src/SimpleControlModule.cpp index 0960c75c6..36ea575c0 100644 --- a/base/src/SimpleControlModule.cpp +++ b/base/src/SimpleControlModule.cpp @@ -15,6 +15,11 @@ void SimpleControlModule::sendEOPFrame() return Module::sendEoPFrame(); } +std::string SimpleControlModule::printStatus() +{ + return AbsControlModule::printStatus(); +} + // Right Now, Just Logging But Can be used to Do bunch of other things void SimpleControlModule::handleError(const APErrorObject &error) { @@ -26,4 +31,9 @@ void SimpleControlModule::handleError(const APErrorObject &error) void SimpleControlModule::handleHealthCallback(const APHealthObject &healthObj) { LOG_ERROR << "Health Callback from module " << healthObj.getModuleId(); + if (!healthCallbackExtention.empty()) + { + LOG_INFO << "Calling Health Callback Extention..."; + healthCallbackExtention(&healthObj,1); + } } diff --git a/base/test/simpleControlModuleTests.cpp b/base/test/simpleControlModuleTests.cpp new file mode 100644 index 000000000..5853032b7 --- /dev/null +++ b/base/test/simpleControlModuleTests.cpp @@ -0,0 +1,320 @@ +#include +#include +#include + +#include "PipeLine.h" +#include "Module.h" +#include "SimpleControlModule.h" +#include "FrameMetadata.h" +#include "Frame.h" +#include "FrameContainerQueue.h" +#include "AIPExceptions.h" +#include "ExternalSourceModule.h" +#include "ExternalSinkModule.h" + +BOOST_AUTO_TEST_SUITE(simpleControlModule_tests) + +class TestModuleSrcProps :public ModuleProps +{ +public: + TestModuleSrcProps() : ModuleProps() + { + } +}; + +class TestModuleSrc : public Module +{ +public: + TestModuleSrc(TestModuleSrcProps props = TestModuleSrcProps()) : Module(SOURCE, "TestModuleSrc", props) + { + addOutputPin(); + } + + std::string addOutputPin() + { + outMetadata = framemetadata_sp(new FrameMetadata(FrameMetadata::FrameType::GENERAL)); + return Module::addOutputPin(outMetadata); + } + + bool produce() + { + auto outPin = getOutputPinIdByType(FrameMetadata::FrameType::GENERAL); + auto frame = makeFrame(10); + frames.insert(make_pair(outPin, frame)); + + send(frames); + return true; + } + + bool validateInputPins() + { + return true; + } + + bool validateOutputPins() + { + return true; + } + + bool validateInputOutputPins() + { + return true; + } + + frame_container frames; + framemetadata_sp outMetadata; +}; + +class TestModuleTransformProps : public ModuleProps +{ +public: + TestModuleTransformProps() : ModuleProps() + {} + ~TestModuleTransformProps() + {} +}; + +class TestModuleTransform : public Module +{ +public: + TestModuleTransform(TestModuleTransformProps props) : Module(TRANSFORM, "TestTransform", props) + { + addOutputPin(); + } + + bool init() + { + Module::init(); + return true; + } + + std::string addOutputPin() + { + outMetadata = framemetadata_sp(new FrameMetadata(FrameMetadata::FrameType::GENERAL)); + return Module::addOutputPin(outMetadata); + } + +protected: + + bool process(frame_container &frames) + { + auto outPin = getOutputPinIdByType(FrameMetadata::FrameType::GENERAL); + auto frame = makeFrame(10, outPin); + frames.insert(make_pair(outPin, frame)); + send(frames); + return true; + } + + bool validateOutputPins() + { + return true; + } + + bool validateInputPins() + { + return true; + } + + bool validateInputOutputPins() + { + return true; + } +private: + std::string outPin; + framemetadata_sp outMetadata; +}; + +class TestSink : public Module +{ +public: + TestSink() : Module(SINK, "TestSink", ModuleProps()) + { + + } + + virtual ~TestSink() {} + +protected: + bool validateInputPins() + { + return true; + } + + bool process(frame_container& frames) + { + return true; + } +}; + +struct SimpleControlModuleTests +{ + SimpleControlModuleTests(bool enableHealthCallback = true, int intervalInSecs = 1) + { + LoggerProps loggerProps; + loggerProps.logLevel = boost::log::trivial::severity_level::info; + Logger::setLogLevel(boost::log::trivial::severity_level::info); + Logger::initLogger(loggerProps); + + auto metadata = framemetadata_sp(new FrameMetadata(FrameMetadata::GENERAL)); + sourceMod = boost::shared_ptr(new TestModuleSrc); + + /* set transform module health callbacks */ + TestModuleTransformProps props; + props.logHealth = true; + props.enableHealthCallBack = enableHealthCallback; + props.healthUpdateIntervalInSec = intervalInSecs; + transformMod1 = boost::shared_ptr(new TestModuleTransform(props)); + + sinkMod = boost::shared_ptr(new TestSink); + + // pins connection + sourceMod->setNext(transformMod1); + transformMod1->setNext(sinkMod); + + auto simpleCtrlProps = SimpleControlModuleProps(); + simpleCtrl = boost::shared_ptr(new SimpleControlModule(simpleCtrlProps)); + + p = boost::shared_ptr(new PipeLine("test")); + } + ~SimpleControlModuleTests() + { + } + + void createPipeline() + { + sourceMod->setNext(transformMod1); + transformMod1->setNext(sinkMod); + } + + void enrollModules() + { + simpleCtrl->enrollModule("transform_test_module", transformMod1); + } + + void attachModulesToPipeline() + { + p->appendModule(sourceMod); + p->addControlModule(simpleCtrl); + } + + void initPipeline() + { + p->init(); + } + + void runPipeline() + { + p->run_all_threaded(); + } + + void startPipeline() + { + createPipeline(); + enrollModules(); + + attachModulesToPipeline(); + + initPipeline(); + + runPipeline(); + + return; + } + + bool stopPipeline() + { + p->stop(); + p->term(); + p->wait_for_all(); + p.reset(); + return true; + } + + void addControlModule() + { + p->addControlModule(simpleCtrl); + } + + boost::shared_ptr sourceMod; + boost::shared_ptr transformMod1; + boost::shared_ptr sinkMod; + boost::shared_ptr simpleCtrl; + boost::shared_ptr p; +}; + +void TestCallackExtention(const APHealthObject* healthObj, unsigned int eventId) +{ + auto moduleId = healthObj->getModuleId(); + BOOST_TEST(moduleId.find("TestTransform") != std::string::npos); +} + +BOOST_AUTO_TEST_CASE(simpleControlModule_healthCallback) +{ + SimpleControlModuleTests t; + t.simpleCtrl->registerHealthCallbackExtention(TestCallackExtention); + + t.startPipeline(); + t.addControlModule(); + t.simpleCtrl->printStatus(); + boost::this_thread::sleep_for(boost::chrono::milliseconds(5000)); + t.stopPipeline(); + boost::this_thread::sleep_for(boost::chrono::milliseconds(3000)); +} + +BOOST_AUTO_TEST_CASE(simpleControlModule_enroll_ctrlMod_step_test, *boost::unit_test::disabled()) +{ + SimpleControlModuleTests t; + + t.simpleCtrl->registerHealthCallbackExtention(TestCallackExtention); + t.addControlModule(); + + t.sourceMod->init(); + t.transformMod1->init(); + t.sinkMod->init(); + t.simpleCtrl->init(); + + t.sourceMod->step(); + t.transformMod1->step(); + t.sinkMod->step(); + + t.simpleCtrl->enrollModule("transform_test_module", t.transformMod1); + t.simpleCtrl->enrollModule("source_test_module", t.sourceMod); + + // BOOSTASSERT the printStatus for enrollment + auto status = t.simpleCtrl->printStatus(); + BOOST_ASSERT(status.find("transform_test_module") != std::string::npos); + BOOST_ASSERT(status.find("source_test_module") != std::string::npos); + + // since we are queueing any command in control module, the step should remain blocked at mQue->pop inside step() + // the following code tests exactly that. + auto future = std::async(std::launch::async, &SimpleControlModule::step, t.simpleCtrl.get()); + if (future.wait_for(std::chrono::seconds(2)) == std::future_status::ready) + { + try + { + bool result = future.get(); + LOG_ERROR << "Simple control module step() unexpectedly returned a value <" << result << ">"; + } + catch (const std::exception& e) + { + std::cout << "Task threw an exception: " << e.what() << std::endl; + } + BOOST_ASSERT(false); + } + BOOST_ASSERT(true); + + t.sourceMod->stop(); + t.transformMod1->stop(); + t.sinkMod->stop(); + t.simpleCtrl->stop(); + + t.sourceMod->term(); + t.transformMod1->term(); + t.sinkMod->term(); + t.simpleCtrl->term(); + + LOG_INFO << "SUCCESS: do not wait for step() to finish..."; // future.get() + return; +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file