From 866b11cfab4280adb7eb1f70e7cb1d07f0f5b178 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 23 Oct 2024 08:36:50 -0700 Subject: [PATCH 1/3] Add test for issue #1973. Update the existing test_pipeline test to run in both CPU & GPU mode --- .../stages/test_llm_engine_stage_pipe.py | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/tests/morpheus_llm/stages/test_llm_engine_stage_pipe.py b/tests/morpheus_llm/stages/test_llm_engine_stage_pipe.py index 7cb381f15c..e971b84143 100644 --- a/tests/morpheus_llm/stages/test_llm_engine_stage_pipe.py +++ b/tests/morpheus_llm/stages/test_llm_engine_stage_pipe.py @@ -15,14 +15,20 @@ # limitations under the License. import os +from datetime import datetime + +import pytest from _utils import TEST_DIRS from _utils import assert_results from _utils.dataset_manager import DatasetManager from morpheus.config import Config +from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import stage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus_llm.llm import LLMEngine from morpheus_llm.llm.nodes.extracter_node import ExtracterNode @@ -37,9 +43,10 @@ def _build_engine() -> LLMEngine: return engine -def test_pipeline(config: Config, dataset_cudf: DatasetManager): +@pytest.mark.gpu_and_cpu_mode +def test_pipeline(config: Config, dataset: DatasetManager): test_data = os.path.join(TEST_DIRS.validation_data_dir, 'root-cause-validation-data-input.jsonlines') - input_df = dataset_cudf[test_data] + input_df = dataset[test_data] expected_df = input_df.copy(deep=True) expected_df["response"] = expected_df['log'] @@ -53,3 +60,34 @@ def test_pipeline(config: Config, dataset_cudf: DatasetManager): pipe.run() assert_results(sink.get_results()) + + +@pytest.mark.gpu_and_cpu_mode +def test_error_1973(config: Config, dataset: DatasetManager): + expected_timestamps: dict[str, datetime] = {} + + @stage(execution_modes=(config.execution_mode, )) + def log_timestamp(msg: ControlMessage, *, timestamp_name: str) -> ControlMessage: + ts = datetime.now() + msg.set_timestamp(key=timestamp_name, timestamp=ts) + expected_timestamps[timestamp_name] = ts + return msg + + task_payload = {"task_type": "llm_engine", "task_dict": {"input_keys": ['v1']}} + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, dataframes=[dataset["filter_probs.csv"]])) + pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=task_payload)) + pipe.add_stage(log_timestamp(config, timestamp_name="pre_llm")) + pipe.add_stage(LLMEngineStage(config, engine=_build_engine())) + pipe.add_stage(log_timestamp(config, timestamp_name="post_llm")) + sink = pipe.add_stage(InMemorySinkStage(config)) + + pipe.run() + + messages = sink.get_messages() + assert len(messages) == 1 + + msg = messages[0] + for (timestamp_name, expected_timestamp) in expected_timestamps.items(): + actual_timestamp = msg.get_timestamp(timestamp_name, fail_if_nonexist=True) + assert actual_timestamp == expected_timestamp From 51ce6eab553ff1395886a4d28a736f022d92b2c9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 23 Oct 2024 09:34:40 -0700 Subject: [PATCH 2/3] Add a get_timestamps method to ControlMessage --- .../_lib/include/morpheus/messages/control.hpp | 14 ++++++++++++++ .../morpheus/morpheus/_lib/messages/__init__.pyi | 1 + .../morpheus/morpheus/_lib/messages/module.cpp | 1 + .../morpheus/_lib/src/messages/control.cpp | 10 ++++++++++ .../morpheus/messages/control_message.py | 3 +++ .../morpheus_llm/stages/llm/llm_engine_stage.py | 4 ++++ tests/morpheus/messages/test_control_message.py | 16 ++++++++++++++++ 7 files changed, 49 insertions(+) diff --git a/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp b/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp index c10bdc4a78..6f14d93037 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp @@ -235,6 +235,13 @@ class MORPHEUS_EXPORT ControlMessage */ std::optional get_timestamp(const std::string& key, bool fail_if_nonexist = false); + /** + * @brief Return a reference to the timestamps map + * + * @return A const map reference containing timestamps + */ + const std::map& get_timestamps() const; + /** * @brief Retrieves timestamps for all keys that match a regex pattern. * @@ -340,6 +347,13 @@ struct MORPHEUS_EXPORT ControlMessageProxy */ static pybind11::object get_timestamp(ControlMessage& self, const std::string& key, bool fail_if_nonexist = false); + /** + * @brief Return all timestamps + * + * @return A Python dictionary of timestamps + */ + static pybind11::dict get_timestamps(ControlMessage& self); + /** * @brief Retrieves timestamps for all keys that match a regex pattern from the ControlMessage object. * diff --git a/python/morpheus/morpheus/_lib/messages/__init__.pyi b/python/morpheus/morpheus/_lib/messages/__init__.pyi index 4974b93daf..2b52b3d29b 100644 --- a/python/morpheus/morpheus/_lib/messages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/messages/__init__.pyi @@ -53,6 +53,7 @@ class ControlMessage(): """ Retrieve the timestamp for a given group and key. Returns None if the timestamp does not exist and fail_if_nonexist is False. """ + def get_timestamps(self) -> dict: ... def has_metadata(self, key: str) -> bool: ... def has_task(self, task_type: str) -> bool: ... def list_metadata(self) -> list: ... diff --git a/python/morpheus/morpheus/_lib/messages/module.cpp b/python/morpheus/morpheus/_lib/messages/module.cpp index af559bde58..961c3187e9 100644 --- a/python/morpheus/morpheus/_lib/messages/module.cpp +++ b/python/morpheus/morpheus/_lib/messages/module.cpp @@ -274,6 +274,7 @@ PYBIND11_MODULE(messages, _module) "fail_if_nonexist is False.", py::arg("key"), py::arg("fail_if_nonexist") = false) + .def("get_timestamps", &ControlMessageProxy::get_timestamps) .def("set_timestamp", &ControlMessageProxy::set_timestamp, "Set a timestamp for a given key and group.", diff --git a/python/morpheus/morpheus/_lib/src/messages/control.cpp b/python/morpheus/morpheus/_lib/src/messages/control.cpp index d20334c35a..c1a85dbcba 100644 --- a/python/morpheus/morpheus/_lib/src/messages/control.cpp +++ b/python/morpheus/morpheus/_lib/src/messages/control.cpp @@ -177,6 +177,11 @@ void ControlMessage::set_timestamp(const std::string& key, time_point_t timestam m_timestamps[key] = timestamp_ns; } +const std::map& ControlMessage::get_timestamps() const +{ + return m_timestamps; +} + std::map ControlMessage::filter_timestamp(const std::string& regex_filter) { std::map matching_timestamps; @@ -365,6 +370,11 @@ py::list ControlMessageProxy::list_metadata(ControlMessage& self) return py_keys; } +py::dict ControlMessageProxy::get_timestamps(ControlMessage& self) +{ + return py::cast(self.get_timestamps()); +} + py::dict ControlMessageProxy::filter_timestamp(ControlMessage& self, const std::string& regex_filter) { auto cpp_map = self.filter_timestamp(regex_filter); diff --git a/python/morpheus/morpheus/messages/control_message.py b/python/morpheus/morpheus/messages/control_message.py index 8c958572e8..b3fd742708 100644 --- a/python/morpheus/morpheus/messages/control_message.py +++ b/python/morpheus/morpheus/messages/control_message.py @@ -167,6 +167,9 @@ def get_timestamp(self, key: str, fail_if_nonexist: bool = False) -> datetime | raise ValueError("Timestamp for the specified key does not exist.") from e return None + def get_timestamps() -> dict[str, datetime]: + return self._timestamps + def filter_timestamp(self, regex_filter: str) -> dict[str, datetime]: re_obj = re.compile(regex_filter) diff --git a/python/morpheus_llm/morpheus_llm/stages/llm/llm_engine_stage.py b/python/morpheus_llm/morpheus_llm/stages/llm/llm_engine_stage.py index 289e447afa..09962b5fd2 100644 --- a/python/morpheus_llm/morpheus_llm/stages/llm/llm_engine_stage.py +++ b/python/morpheus_llm/morpheus_llm/stages/llm/llm_engine_stage.py @@ -93,6 +93,10 @@ def _copy_tasks_and_metadata(self, for tv in task_value: dst.add_task(task, tv) + timestamps = src.get_timestamps() + for (ts_key, ts) in timestamps.items(): + dst.set_timestamp(key=ts_key, timestamp=ts) + def _cast_to_cpp_control_message(self, py_message: ControlMessage, *, cpp_messages_lib: types.ModuleType) -> ControlMessage: """ diff --git a/tests/morpheus/messages/test_control_message.py b/tests/morpheus/messages/test_control_message.py index b9ba42d079..fe267b0c3e 100644 --- a/tests/morpheus/messages/test_control_message.py +++ b/tests/morpheus/messages/test_control_message.py @@ -250,6 +250,22 @@ def test_filter_timestamp(): assert result[f"{group}::key2"] == timestamp2, "The timestamp for key2 should match." +@pytest.mark.gpu_and_cpu_mode +def test_get_timestamps(): + # Create a ControlMessage instance + msg = messages.ControlMessage() + + # Setup test data + timestamp1 = datetime.datetime.now() + timestamp2 = timestamp1 + datetime.timedelta(seconds=1) + msg.set_timestamp("key1", timestamp1) + msg.set_timestamp("key2", timestamp2) + + # Assert both keys are in the result and have correct timestamps + timestamps = msg.get_timestamps() + assert timestamps == {"key1": timestamp1, "key2": timestamp2} + + @pytest.mark.gpu_and_cpu_modetest_tensor_manipulation_after_retrieval def test_get_timestamp_fail_if_nonexist(): # Create a ControlMessage instance From 4e117b94a299b61e60449fa29eb76fb70753e098 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 23 Oct 2024 09:44:42 -0700 Subject: [PATCH 3/3] Fix method --- python/morpheus/morpheus/messages/control_message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/morpheus/morpheus/messages/control_message.py b/python/morpheus/morpheus/messages/control_message.py index b3fd742708..a2c4f35496 100644 --- a/python/morpheus/morpheus/messages/control_message.py +++ b/python/morpheus/morpheus/messages/control_message.py @@ -167,7 +167,7 @@ def get_timestamp(self, key: str, fail_if_nonexist: bool = False) -> datetime | raise ValueError("Timestamp for the specified key does not exist.") from e return None - def get_timestamps() -> dict[str, datetime]: + def get_timestamps(self) -> dict[str, datetime]: return self._timestamps def filter_timestamp(self, regex_filter: str) -> dict[str, datetime]: