Skip to content

Commit

Permalink
Add test for issue nv-morpheus#953
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Jan 19, 2024
1 parent 05d6747 commit 98cada9
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions tests/test_multi_port_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,27 +16,45 @@

import pytest

import cudf

# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
import modules.multiplexer # noqa: F401 # pylint: disable=unused-import
from _utils.dataset_manager import DatasetManager
from morpheus.config import Config
from morpheus.pipeline.pipeline import Pipeline
from morpheus.stages.general.multi_port_modules_stage import MultiPortModulesStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage


def _run_pipeline(config: Config, source_df: cudf.DataFrame, module_conf: dict,
stage_input_ports: list[str]) -> InMemorySinkStage:
pipe = Pipeline(config)

mux_stage = pipe.add_stage(
MultiPortModulesStage(config, module_conf, input_ports=stage_input_ports, output_ports=["output"]))

for x in range(len(stage_input_ports)):
source_stage = pipe.add_stage(InMemorySourceStage(config, [source_df.copy(deep=True)]))
pipe.add_edge(source_stage, mux_stage.input_ports[x])

sink_stage = pipe.add_stage(InMemorySinkStage(config))

pipe.add_edge(mux_stage, sink_stage)

pipe.run()

return sink_stage


@pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)])
def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, expected_count):
def test_multi_port_pipeline(config: Config, dataset_cudf: DatasetManager, source_count, expected_count):

filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe = Pipeline(config)

input_ports = []
for x in range(source_count):
input_port = f"input_{x}"
input_ports.append(input_port)
input_ports = [f"input_{x}" for x in range(source_count)]

multiplexer_module_conf = {
"module_id": "multiplexer",
Expand All @@ -46,17 +64,29 @@ def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count,
"output_port": "output"
}

mux_stage = pipe.add_stage(
MultiPortModulesStage(config, multiplexer_module_conf, input_ports=input_ports, output_ports=["output"]))
sink_stage = _run_pipeline(config=config,
source_df=filter_probs_df,
module_conf=multiplexer_module_conf,
stage_input_ports=input_ports)
assert len(sink_stage.get_messages()) == expected_count

for x in range(source_count):
source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_edge(source_stage, mux_stage.input_ports[x])

sink_stage = pipe.add_stage(InMemorySinkStage(config))
def test_multi_port_pipeline_mis_config(config: Config, dataset_cudf: DatasetManager):
config_input_ports = ["input_0", "input_1"]
stage_input_ports = ["input_0", "input_1", "input_2"]

pipe.add_edge(mux_stage, sink_stage)
filter_probs_df = dataset_cudf["filter_probs.csv"]

pipe.run()
multiplexer_module_conf = {
"module_id": "multiplexer",
"namespace": "morpheus_test",
"module_name": "multiplexer",
"input_ports": config_input_ports,
"output_port": "output"
}

assert len(sink_stage.get_messages()) == expected_count
with pytest.raises(ValueError):
_run_pipeline(config=config,
source_df=filter_probs_df,
module_conf=multiplexer_module_conf,
stage_input_ports=stage_input_ports)

0 comments on commit 98cada9

Please sign in to comment.