diff --git a/tests/test_multi_port_pipeline.py b/tests/test_multi_port_pipeline.py index 73c051200c..cac48b0bd0 100755 --- a/tests/test_multi_port_pipeline.py +++ b/tests/test_multi_port_pipeline.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,27 +16,45 @@ import pytest +import cudf + # When segment modules are imported, they're added to the module registry. # To avoid flake8 warnings about unused code, the noqa flag is used during import. import modules.multiplexer # noqa: F401 # pylint: disable=unused-import from _utils.dataset_manager import DatasetManager +from morpheus.config import Config from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.general.multi_port_modules_stage import MultiPortModulesStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +def _run_pipeline(config: Config, source_df: cudf.DataFrame, module_conf: dict, + stage_input_ports: list[str]) -> InMemorySinkStage: + pipe = Pipeline(config) + + mux_stage = pipe.add_stage( + MultiPortModulesStage(config, module_conf, input_ports=stage_input_ports, output_ports=["output"])) + + for x in range(len(stage_input_ports)): + source_stage = pipe.add_stage(InMemorySourceStage(config, [source_df.copy(deep=True)])) + pipe.add_edge(source_stage, mux_stage.input_ports[x]) + + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + + pipe.add_edge(mux_stage, sink_stage) + + pipe.run() + + return sink_stage + + @pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)]) -def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, expected_count): +def test_multi_port_pipeline(config: Config, dataset_cudf: DatasetManager, source_count, expected_count): filter_probs_df = dataset_cudf["filter_probs.csv"] - pipe = Pipeline(config) - - input_ports = [] - for x in range(source_count): - input_port = f"input_{x}" - input_ports.append(input_port) + input_ports = [f"input_{x}" for x in range(source_count)] multiplexer_module_conf = { "module_id": "multiplexer", @@ -46,17 +64,29 @@ def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, "output_port": "output" } - mux_stage = pipe.add_stage( - MultiPortModulesStage(config, multiplexer_module_conf, input_ports=input_ports, output_ports=["output"])) + sink_stage = _run_pipeline(config=config, + source_df=filter_probs_df, + module_conf=multiplexer_module_conf, + stage_input_ports=input_ports) + assert len(sink_stage.get_messages()) == expected_count - for x in range(source_count): - source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df])) - pipe.add_edge(source_stage, mux_stage.input_ports[x]) - sink_stage = pipe.add_stage(InMemorySinkStage(config)) +def test_multi_port_pipeline_mis_config(config: Config, dataset_cudf: DatasetManager): + config_input_ports = ["input_0", "input_1"] + stage_input_ports = ["input_0", "input_1", "input_2"] - pipe.add_edge(mux_stage, sink_stage) + filter_probs_df = dataset_cudf["filter_probs.csv"] - pipe.run() + multiplexer_module_conf = { + "module_id": "multiplexer", + "namespace": "morpheus_test", + "module_name": "multiplexer", + "input_ports": config_input_ports, + "output_port": "output" + } - assert len(sink_stage.get_messages()) == expected_count + with pytest.raises(ValueError): + _run_pipeline(config=config, + source_df=filter_probs_df, + module_conf=multiplexer_module_conf, + stage_input_ports=stage_input_ports)