diff --git a/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp b/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp index 54fb6e3d7..67647bce5 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp @@ -28,6 +28,7 @@ #include // for trace_activity, decay_t, from #include +#include #include #include #include @@ -43,6 +44,7 @@ namespace morpheus { * @file */ + /** * @brief */ @@ -77,6 +79,15 @@ MonitorController::MonitorController(const std::string& description, m_determine_count_fn(determine_count_fn), m_count(0) { + if (!m_determine_count_fn) + { + m_determine_count_fn = auto_count_fn(); + if (!m_determine_count_fn) + { + throw std::runtime_error("No count function provided and no default count function available"); + } + } + m_progress_bar.set_option(indicators::option::BarWidth{50}); m_progress_bar.set_option(indicators::option::Start{"["}); m_progress_bar.set_option(indicators::option::Fill("■")); @@ -93,45 +104,37 @@ MonitorController::MonitorController(const std::string& description, template MessageT MonitorController::progress_sink(MessageT msg) { - if (m_determine_count_fn == std::nullopt) - { - m_determine_count_fn = auto_count_fn(msg); - } - m_count += (*m_determine_count_fn)(msg); m_progress_bar.set_progress(m_count); return msg; } -template -struct is_vector : std::false_type -{}; - -template -struct is_vector> : std::true_type -{}; - template auto MonitorController::auto_count_fn() -> std::optional> { - if constexpr (std::is_same_v>) + if constexpr (std::is_same_v>) { - return [](MessageT msg) { - return msg->num_rows(); + return [](std::shared_ptr msg) { + return msg->count(); }; } - if constexpr (std::is_same_v>) + if constexpr (std::is_same_v>>) { - return [](MessageT msg) { - return msg->count(); + return [](std::vector> msg) { + auto item_count_fn = [](std::shared_ptr msg) { + return msg->count(); + }; + return std::accumulate(msg.begin(), msg.end(), 0, [&](int sum, const auto& item) { + return sum + (*item_count_fn)(item); + }); }; } if constexpr (std::is_same_v>) { - return [](MessageT msg) { + return [](std::shared_ptr msg) { if (!msg->payload()) { return 0; @@ -140,26 +143,23 @@ auto MonitorController::auto_count_fn() -> std::optional::value) + if constexpr (std::is_same_v>>) { - // if (msg.empty()) - // { - // return std::nullopt; - // } - - return [this](MessageT msg) { - auto item_count_fn = auto_count_fn(msg[0]); - if (item_count_fn == std::nullopt) - { - return 0; - } - return std::accumulate(msg.begin(), msg.end(), 0, [item_count_fn](int sum, const auto& item) { + return [](std::vector> msg) { + auto item_count_fn = [](std::shared_ptr msg) { + if (!msg->payload()) + { + return 0; + } + return msg->payload()->count(); + }; + return std::accumulate(msg.begin(), msg.end(), 0, [&](int sum, const auto& item) { return sum + (*item_count_fn)(item); }); }; } - throw std::runtime_error("Unsupported message type received for MonitorController"); + return std::nullopt; } template diff --git a/python/morpheus/morpheus/_lib/src/controllers/monitor_controller.cpp b/python/morpheus/morpheus/_lib/src/controllers/monitor_controller.cpp index a0a7c5570..d5b8d8e0b 100644 --- a/python/morpheus/morpheus/_lib/src/controllers/monitor_controller.cpp +++ b/python/morpheus/morpheus/_lib/src/controllers/monitor_controller.cpp @@ -17,17 +17,15 @@ #include "morpheus/controllers/monitor_controller.hpp" +#include "morpheus/messages/control.hpp" + #include #include #include - - namespace morpheus { // Component public implementations // ****************** MonitorController ************************ // - - } // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp b/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp index 4f6c6873f..598a31f73 100644 --- a/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp +++ b/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp @@ -1,12 +1,13 @@ #include "../test_utils/common.hpp" // for get_morpheus_root, TEST_CLASS_WITH_PYTHON, morpheus #include "morpheus/controllers/monitor_controller.hpp" // for MonitorController -#include "morpheus/stages/monitor.hpp" // for MonitorStage +#include "morpheus/messages/control.hpp" // for ControlMessage #include #include #include #include +#include #include #include #include @@ -16,39 +17,74 @@ #include #include +#include #include using namespace morpheus; -TEST_CLASS(MonitorController); +TEST_CLASS_WITH_PYTHON(MonitorController); -std::shared_ptr create_cudf_table(int rows, int cols) +cudf::io::table_with_metadata create_cudf_table_with_metadata(int rows, int cols) { std::vector> columns; for (int i = 0; i < cols; ++i) { - // Create a numeric column of type INT32 with 'rows' elements auto col = cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT32}, rows); auto col_view = col->mutable_view(); - // Fill the column with range [0, rows - 1] std::vector data(rows); std::iota(data.begin(), data.end(), 0); cudaMemcpy(col_view.data(), data.data(), data.size() * sizeof(int32_t), cudaMemcpyHostToDevice); - // Add the column to the vector columns.push_back(std::move(col)); } - // Create and return the table - return std::make_shared(std::move(columns)); + auto table = std::make_unique(std::move(columns)); + + auto index_info = cudf::io::column_name_info{""}; + auto column_names = std::vector(cols, index_info); + auto metadata = cudf::io::table_metadata{std::move(column_names), {}, {}}; + + return cudf::io::table_with_metadata{std::move(table), metadata}; } TEST_F(TestMonitorController, TestAutoCountFn) { - auto test_mc_cudf = MonitorController>("test_cudf_table"); - auto cudf_auto_count_fn = test_mc_cudf.auto_count_fn(); - auto cudf_table = create_cudf_table(10, 2); - assert((*cudf_auto_count_fn)(cudf_table) == 10); + auto message_meta_mc = MonitorController>("test_message_meta"); + auto message_meta_auto_count_fn = message_meta_mc.auto_count_fn(); + auto meta = MessageMeta::create_from_cpp(std::move(create_cudf_table_with_metadata(10, 2))); + EXPECT_EQ((*message_meta_auto_count_fn)(meta), 10); + + auto control_message_mc = MonitorController>("test_control_message"); + auto control_message_auto_count_fn = control_message_mc.auto_count_fn(); + auto control_message = std::make_shared(); + auto cm_meta = MessageMeta::create_from_cpp(std::move(create_cudf_table_with_metadata(20, 3))); + control_message->payload(cm_meta); + EXPECT_EQ((*control_message_auto_count_fn)(control_message), 20); + + auto message_meta_vector_mc = + MonitorController>>("test_message_meta_vector"); + auto message_meta_vector_auto_count_fn = message_meta_vector_mc.auto_count_fn(); + std::vector> meta_vector; + for (int i = 0; i < 5; ++i) + { + meta_vector.emplace_back(MessageMeta::create_from_cpp(std::move(create_cudf_table_with_metadata(5, 2)))); + } + EXPECT_EQ((*message_meta_vector_auto_count_fn)(meta_vector), 25); + + auto control_message_vector_mc = + MonitorController>>("test_control_message_vector"); + auto control_message_vector_auto_count_fn = control_message_vector_mc.auto_count_fn(); + std::vector> control_message_vector; + for (int i = 0; i < 5; ++i) + { + auto cm = std::make_shared(); + cm->payload(MessageMeta::create_from_cpp(std::move(create_cudf_table_with_metadata(6, 2)))); + control_message_vector.emplace_back(cm); + } + EXPECT_EQ((*control_message_vector_auto_count_fn)(control_message_vector), 30); + + // Test invalid message type + EXPECT_THROW(MonitorController("invalid message type"), std::runtime_error); }