diff --git a/examples/abp_pcap_detection/README.md b/examples/abp_pcap_detection/README.md index 3cfae25aa9..440c3fb783 100644 --- a/examples/abp_pcap_detection/README.md +++ b/examples/abp_pcap_detection/README.md @@ -99,13 +99,13 @@ Alternately, the Morpheus command line could have been used to accomplish the sa From the root of the Morpheus repo, run: ```bash morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preprocessing.py" \ - run --use_cpp False --pipeline_batch_size 100000 --model_max_batch_size 100000 \ + run --pipeline_batch_size 100000 --model_max_batch_size 100000 \ pipeline-fil --model_fea_length 13 --label=probs \ from-file --filename examples/data/abp_pcap_dump.jsonlines --filter_null False \ deserialize \ pcap-preprocess \ monitor --description "Preprocessing rate" \ - inf-triton --model_name "abp-pcap-xgb" --server_url "localhost:8001" --force_convert_inputs=True \ + inf-triton --model_name "abp-pcap-xgb" --server_url "localhost:8000" \ monitor --description "Inference rate" --unit inf \ add-class --label=probs \ monitor --description "Add classification rate" --unit "add-class" \ diff --git a/examples/abp_pcap_detection/abp_pcap_preprocessing.py b/examples/abp_pcap_detection/abp_pcap_preprocessing.py index 453dc2a419..59a8060854 100644 --- a/examples/abp_pcap_detection/abp_pcap_preprocessing.py +++ b/examples/abp_pcap_detection/abp_pcap_preprocessing.py @@ -170,7 +170,8 @@ def round_time_kernel(timestamp, rollup_time, secs): del df, grouped_df # Convert the dataframe to cupy the same way cuml does - data = cp.asarray(merged_df[fea_cols].to_cupy()) + # Explicity casting to float32 to match the model's input, and setting row-major as required by Triton + data = cp.asarray(merged_df[fea_cols].to_cupy(), order='C', dtype=cp.float32) count = data.shape[0] for col in req_cols: diff --git a/examples/abp_pcap_detection/run.py b/examples/abp_pcap_detection/run.py index 18d5c25e5d..8937351d16 100644 --- a/examples/abp_pcap_detection/run.py +++ b/examples/abp_pcap_detection/run.py @@ -21,7 +21,6 @@ from morpheus.cli.commands import FILE_TYPE_NAMES from morpheus.cli.utils import str_to_file_type from morpheus.config import Config -from morpheus.config import CppConfig from morpheus.config import PipelineModes from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage @@ -87,7 +86,7 @@ help=("Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. " "Iterative mode is good for interleaving source stages."), ) -@click.option("--server_url", required=True, help="Tritonserver url.", default="localhost:8001") +@click.option("--server_url", required=True, help="Tritonserver url.", default="localhost:8000") @click.option( "--file_type", type=click.Choice(FILE_TYPE_NAMES, case_sensitive=False), @@ -111,8 +110,6 @@ def run_pipeline( # Enable the default logger. configure_logging(log_level=logging.INFO) - CppConfig.set_should_use_cpp(False) - # Its necessary to get the global config object and configure it for FIL mode. config = Config() config.mode = PipelineModes.FIL @@ -124,8 +121,6 @@ def run_pipeline( config.feature_length = model_fea_length config.class_labels = ["probs"] - kwargs = {} - # Create a linear pipeline object. pipeline = LinearPipeline(config) @@ -154,13 +149,7 @@ def run_pipeline( # Add a inference stage. # This stage sends inference requests to the Tritonserver and captures the response. - pipeline.add_stage( - TritonInferenceStage( - config, - model_name=model_name, - server_url=server_url, - force_convert_inputs=True, - )) + pipeline.add_stage(TritonInferenceStage(config, model_name=model_name, server_url=server_url)) # Add a monitor stage. # This stage logs the metrics (inf/sec) from the above stage. @@ -176,7 +165,7 @@ def run_pipeline( # Add a serialize stage. # This stage includes & excludes columns from messages. - pipeline.add_stage(SerializeStage(config, **kwargs)) + pipeline.add_stage(SerializeStage(config)) # Add a monitor stage. # This stage logs the metrics (msg/sec) from the above stage. diff --git a/tests/examples/abp_pcap_detection/test_abp_pcap_preprocessing.py b/tests/examples/abp_pcap_detection/test_abp_pcap_preprocessing.py index 97443d65d6..90a3c067f4 100755 --- a/tests/examples/abp_pcap_detection/test_abp_pcap_preprocessing.py +++ b/tests/examples/abp_pcap_detection/test_abp_pcap_preprocessing.py @@ -62,6 +62,8 @@ def check_inf_message(msg: MultiInferenceFILMessage, input__0 = msg.memory.get_tensor('input__0') assert input__0.shape == (expected_count, expected_feature_length) + assert input__0.dtype == cp.float32 + assert input__0.strides == (expected_feature_length * 4, 4) assert (input__0 == expected_input__0).all() seq_ids = msg.memory.get_tensor('seq_ids') @@ -87,10 +89,12 @@ def test_abp_pcap_preprocessing(config: Config, dataset_cudf: DatasetManager, input_df = dataset_cudf.get_df(input_file, no_cache=True, filter_nulls=False) expected_flow_ids = input_df.src_ip + ":" + input_df.src_port + "=" + input_df.dest_ip + ":" + input_df.dest_port - expected_input__0 = cp.asarray( - np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'examples/abp_pcap_detection/abp_pcap_expected_input_0.csv'), - delimiter=",", - skiprows=0)) + expected_input__0 = cp.asarray(np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, + 'examples/abp_pcap_detection/abp_pcap_expected_input_0.csv'), + delimiter=",", + skiprows=0, + dtype=np.float32), + order='C') assert len(input_df) == 20