Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable C++ mode for abp_pcap_detection example #1687

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4d27cbe
Update abp_pcap_detection example to be runnable from the project root
dagardner-nv Apr 29, 2024
3fd330d
Enable running the GNN fraud pipeline from the root of the Morpheus repo
dagardner-nv Apr 29, 2024
6c1d80e
Remove unnescesary --num_threads=1, remove need for setting MORPHEUS_…
dagardner-nv Apr 30, 2024
aae1664
Don't set flags where a reasonable default exists
dagardner-nv Apr 30, 2024
3231572
Add helper method to check if a typeid is fully supported in C++
dagardner-nv Apr 30, 2024
89bb5a7
Only build a C++ node if all of the types are supported in C++
dagardner-nv Apr 30, 2024
ba503e9
Don't construct a C++ node if either force_convert_inputs or use_shar…
dagardner-nv Apr 30, 2024
68bfa3c
Don't disable C++ mode
dagardner-nv Apr 30, 2024
8dee598
Don't disable C++ mode
dagardner-nv Apr 30, 2024
0d1c49d
Don't set threads to 8, since we have 9 stages. Update port num to th…
dagardner-nv Apr 30, 2024
2e7e1e4
Support running the ransomeware example from the root of the repo
dagardner-nv Apr 30, 2024
ebfec0f
Remove the --force_convert_inputs=True flag this isn't needed and jus…
dagardner-nv Apr 30, 2024
4ebb47f
Remove unused import
dagardner-nv Apr 30, 2024
60cf6ed
Only force python for shared mem
dagardner-nv May 1, 2024
3ca7f07
Not sure why, but this pipeline returns different results in C++ mode
dagardner-nv May 1, 2024
20ea8b2
Revert port-change
dagardner-nv May 1, 2024
b869c35
Use port 8000 to avoid warning about port change
dagardner-nv May 1, 2024
5458c41
Add missing gnn-fraud-classification stage
dagardner-nv May 1, 2024
df2a4b7
Remove setting MORPHEUS_ROOT
dagardner-nv May 1, 2024
ab2a996
Merge adjacent shell blocks
dagardner-nv May 1, 2024
1840ed1
Remove reference to the data dir, as we just have two csv files
dagardner-nv May 1, 2024
f0641f1
Remove --force_convert_inputs=True flag since implicit casting is alw…
dagardner-nv May 1, 2024
c49d3a7
Remove --force_convert_inputs=True flag since implicit casting is alw…
dagardner-nv May 1, 2024
3931259
Add test for is_fully_supported
dagardner-nv May 1, 2024
20ba1cc
Add missing includes
dagardner-nv May 1, 2024
08db18d
Merge branch 'branch-24.06' into abp_pcap_detection-1675
dagardner-nv May 2, 2024
ca4179b
Revert "Remove --force_convert_inputs=True flag since implicit castin…
dagardner-nv May 2, 2024
ced6be0
Revert "Remove --force_convert_inputs=True flag since implicit castin…
dagardner-nv May 2, 2024
4e424ec
Remove redundant call to cp.asarray, explicitly cast to the model's i…
dagardner-nv May 3, 2024
eb2d2d9
Remove unused kwargs
dagardner-nv May 3, 2024
1640058
Ensure tensors are in row-major order as this is what is required by …
dagardner-nv May 3, 2024
4f3d598
Run the pipeline in C++ mode, remove un-needed force_convert_inputs a…
dagardner-nv May 3, 2024
7661514
Remove unused import
dagardner-nv May 3, 2024
f9f9768
Revert removal of filter_null
dagardner-nv May 3, 2024
954d3b0
Update comment
dagardner-nv May 3, 2024
41edb21
Update test to expect row-major data, and assert the stride
dagardner-nv May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/abp_pcap_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ Alternately, the Morpheus command line could have been used to accomplish the sa
From the root of the Morpheus repo, run:
```bash
morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preprocessing.py" \
run --use_cpp False --pipeline_batch_size 100000 --model_max_batch_size 100000 \
run --pipeline_batch_size 100000 --model_max_batch_size 100000 \
pipeline-fil --model_fea_length 13 --label=probs \
from-file --filename examples/data/abp_pcap_dump.jsonlines --filter_null False \
deserialize \
pcap-preprocess \
monitor --description "Preprocessing rate" \
inf-triton --model_name "abp-pcap-xgb" --server_url "localhost:8001" --force_convert_inputs=True \
inf-triton --model_name "abp-pcap-xgb" --server_url "localhost:8000" \
monitor --description "Inference rate" --unit inf \
add-class --label=probs \
monitor --description "Add classification rate" --unit "add-class" \
Expand Down
3 changes: 2 additions & 1 deletion examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def round_time_kernel(timestamp, rollup_time, secs):
del df, grouped_df

# Convert the dataframe to cupy the same way cuml does
data = cp.asarray(merged_df[fea_cols].to_cupy())
# Explicity casting to float32 to match the model's input, and setting row-major as required by Triton
data = cp.asarray(merged_df[fea_cols].to_cupy(), order='C', dtype=cp.float32)
count = data.shape[0]

for col in req_cols:
Expand Down
17 changes: 3 additions & 14 deletions examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from morpheus.cli.commands import FILE_TYPE_NAMES
from morpheus.cli.utils import str_to_file_type
from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
Expand Down Expand Up @@ -87,7 +86,7 @@
help=("Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. "
"Iterative mode is good for interleaving source stages."),
)
@click.option("--server_url", required=True, help="Tritonserver url.", default="localhost:8001")
@click.option("--server_url", required=True, help="Tritonserver url.", default="localhost:8000")
@click.option(
"--file_type",
type=click.Choice(FILE_TYPE_NAMES, case_sensitive=False),
Expand All @@ -111,8 +110,6 @@ def run_pipeline(
# Enable the default logger.
configure_logging(log_level=logging.INFO)

CppConfig.set_should_use_cpp(False)

# Its necessary to get the global config object and configure it for FIL mode.
config = Config()
config.mode = PipelineModes.FIL
Expand All @@ -124,8 +121,6 @@ def run_pipeline(
config.feature_length = model_fea_length
config.class_labels = ["probs"]

kwargs = {}

# Create a linear pipeline object.
pipeline = LinearPipeline(config)

Expand Down Expand Up @@ -154,13 +149,7 @@ def run_pipeline(

# Add a inference stage.
# This stage sends inference requests to the Tritonserver and captures the response.
pipeline.add_stage(
TritonInferenceStage(
config,
model_name=model_name,
server_url=server_url,
force_convert_inputs=True,
))
pipeline.add_stage(TritonInferenceStage(config, model_name=model_name, server_url=server_url))

# Add a monitor stage.
# This stage logs the metrics (inf/sec) from the above stage.
Expand All @@ -176,7 +165,7 @@ def run_pipeline(

# Add a serialize stage.
# This stage includes & excludes columns from messages.
pipeline.add_stage(SerializeStage(config, **kwargs))
pipeline.add_stage(SerializeStage(config))

# Add a monitor stage.
# This stage logs the metrics (msg/sec) from the above stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def check_inf_message(msg: MultiInferenceFILMessage,

input__0 = msg.memory.get_tensor('input__0')
assert input__0.shape == (expected_count, expected_feature_length)
assert input__0.dtype == cp.float32
assert input__0.strides == (expected_feature_length * 4, 4)
assert (input__0 == expected_input__0).all()

seq_ids = msg.memory.get_tensor('seq_ids')
Expand All @@ -87,10 +89,12 @@ def test_abp_pcap_preprocessing(config: Config, dataset_cudf: DatasetManager,
input_df = dataset_cudf.get_df(input_file, no_cache=True, filter_nulls=False)

expected_flow_ids = input_df.src_ip + ":" + input_df.src_port + "=" + input_df.dest_ip + ":" + input_df.dest_port
expected_input__0 = cp.asarray(
np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'examples/abp_pcap_detection/abp_pcap_expected_input_0.csv'),
delimiter=",",
skiprows=0))
expected_input__0 = cp.asarray(np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir,
'examples/abp_pcap_detection/abp_pcap_expected_input_0.csv'),
delimiter=",",
skiprows=0,
dtype=np.float32),
order='C')

assert len(input_df) == 20

Expand Down
Loading