diff --git a/docs/source/examples.md b/docs/source/examples.md index 5de469f588..bfe4f8e24c 100644 --- a/docs/source/examples.md +++ b/docs/source/examples.md @@ -24,7 +24,7 @@ limitations under the License. * [Example Ransomware Detection Morpheus Pipeline for AppShield Data](../../examples/ransomware_detection/README.md) * [Root Cause Analysis Acceleration & Predictive Maintenance Example](../../examples/root_cause_analysis/README.md) * [SID Visualization Example](../../examples/sid_visualization/README.md) -* [Large Language Models (LLMs)](../../examples/llm/README.md) +* Large Language Models (LLMs) * [Agents](../../examples/llm/agents/README.md) * [Completion](../../examples/llm/completion/README.md) * [VDB Upload](../../examples/llm/vdb_upload/README.md) diff --git a/examples/README.md b/examples/README.md index 1c001ffebe..4bdc94648f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -15,10 +15,18 @@ See the License for the specific language governing permissions and limitations under the License. --> -## Morpheus CLI Examples - -Examples run with the Morpheus CLI (`morpheus ...`) should be run from the repository root; otherwise, some filepath arguments may need to be changed. - -## Morpheus run.py Examples - -Examples run with python (`python run.py`) should be run from the example's directory; otherwise, relative Python imports may be broken. +# Examples +* [Anomalous Behavior Profiling with Forest Inference Library (FIL) Example](./abp_nvsmi_detection/README.md) +* [ABP Detection Example Using Morpheus](./abp_pcap_detection/README.md) +* [Digital Fingerprinting (DFP)](./digital_fingerprinting/README.md) +* [GNN Fraud Detection Pipeline](./gnn_fraud_detection_pipeline/README.md) +* [Example cyBERT Morpheus Pipeline for Apache Log Parsing](./log_parsing/README.md) +* [Sensitive Information Detection with Natural Language Processing (NLP) Example](./nlp_si_detection/README.md) +* [Example Ransomware Detection Morpheus Pipeline for AppShield Data](./ransomware_detection/README.md) +* [Root Cause Analysis Acceleration & Predictive Maintenance Example](./root_cause_analysis/README.md) +* [SID Visualization Example](./sid_visualization/README.md) +* Large Language Models (LLMs) + * [Agents](./llm/agents/README.md) + * [Completion](./llm/completion/README.md) + * [VDB Upload](./llm/vdb_upload/README.md) + * [Retreival Augmented Generation (RAG)](./llm/rag/README.md) diff --git a/examples/abp_pcap_detection/README.md b/examples/abp_pcap_detection/README.md index 371bd28e35..3cfae25aa9 100644 --- a/examples/abp_pcap_detection/README.md +++ b/examples/abp_pcap_detection/README.md @@ -27,14 +27,9 @@ docker pull nvcr.io/nvidia/tritonserver:23.06-py3 ``` ##### Deploy Triton Inference Server -From the root of the Morpheus repo, navigate to the anomalous behavior profiling example directory: +From the root of the Morpheus repo, run the following to launch Triton and load the `abp-pcap-xgb` model: ```bash -cd examples/abp_pcap_detection -``` - -The following creates the Triton container, mounts the `abp-pcap-xgb` directory to `/models/abp-pcap-xgb` in the Triton container, and starts the Triton server: -```bash -docker run --rm --gpus=all -p 8000:8000 -p 8001:8001 -p 8002:8002 -v $PWD/abp-pcap-xgb:/models/abp-pcap-xgb --name tritonserver nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver --model-repository=/models --exit-on-error=false +docker run --rm --gpus=all -p 8000:8000 -p 8001:8001 -p 8002:8002 -v $PWD/examples/abp_pcap_detection/abp-pcap-xgb:/models/abp-pcap-xgb --name tritonserver nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver --model-repository=/models --exit-on-error=false ``` ##### Verify Model Deployment @@ -53,8 +48,7 @@ Use Morpheus to run the Anomalous Behavior Profiling Detection Pipeline with the From the root of the Morpheus repo, run: ```bash -cd examples/abp_pcap_detection -python run.py --help +python examples/abp_pcap_detection/run.py --help ``` Output: @@ -62,44 +56,41 @@ Output: Usage: run.py [OPTIONS] Options: - --num_threads INTEGER RANGE Number of internal pipeline threads to use + --num_threads INTEGER RANGE Number of internal pipeline threads to use. [x>=1] --pipeline_batch_size INTEGER RANGE Internal batch size for the pipeline. Can be much larger than the model batch size. Also - used for Kafka consumers [x>=1] + used for Kafka consumers. [x>=1] --model_max_batch_size INTEGER RANGE - Max batch size to use for the model [x>=1] - --input_file PATH Input filepath [required] + Max batch size to use for the model. [x>=1] + --input_file PATH Input filepath. [required] --output_file TEXT The path to the file where the inference output will be saved. --model_fea_length INTEGER RANGE - Features length to use for the model [x>=1] + Features length to use for the model. + [x>=1] --model_name TEXT The name of the model that is deployed on - Tritonserver + Tritonserver. --iterative Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. Iterative mode is good for interleaving source stages. - --server_url TEXT Tritonserver url [required] - --file_type [auto|json|csv] Indicates what type of file to read. + --server_url TEXT Tritonserver url. [required] + --file_type [auto|csv|json] Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. --help Show this message and exit. ``` -To launch the configured Morpheus pipeline with the sample data that is provided in `examples/data`, from the `examples/abp_pcap_detection` directory run the following: +To launch the configured Morpheus pipeline with the sample data that is provided in `examples/data`, run the following: ```bash -python run.py \ - --input_file ../data/abp_pcap_dump.jsonlines \ - --output_file ./pcap_out.jsonlines \ - --model_name 'abp-pcap-xgb' \ - --server_url localhost:8001 +python examples/abp_pcap_detection/run.py ``` Note: Both Morpheus and Triton Inference Server containers must have access to the same GPUs in order for this example to work. -The pipeline will process the input `pcap_dump.jsonlines` sample data and write it to `pcap_out.jsonlines`. +The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `pcap_out.jsonlines`. ### CLI Example The above example is illustrative of using the Python API to build a custom Morpheus Pipeline. @@ -123,5 +114,3 @@ morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preproc to-file --filename "pcap_out.jsonlines" --overwrite \ monitor --description "Write to file rate" --unit "to-file" ``` - -Note: Triton is still needed to be launched from the `examples/abp_pcap_detection` directory. diff --git a/examples/abp_pcap_detection/run.py b/examples/abp_pcap_detection/run.py index 405f9bde6e..18d5c25e5d 100644 --- a/examples/abp_pcap_detection/run.py +++ b/examples/abp_pcap_detection/run.py @@ -33,6 +33,9 @@ from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.utils.logger import configure_logging +CUR_DIR = os.path.dirname(__file__) +EX_DATA_DIR = os.path.join(CUR_DIR, "../data") + @click.command() @click.option( @@ -57,7 +60,7 @@ @click.option( "--input_file", type=click.Path(exists=True, readable=True), - default="pcap.jsonlines", + default=os.path.join(EX_DATA_DIR, "abp_pcap_dump.jsonlines"), required=True, help="Input filepath.", ) @@ -84,7 +87,7 @@ help=("Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. " "Iterative mode is good for interleaving source stages."), ) -@click.option("--server_url", required=True, help="Tritonserver url.") +@click.option("--server_url", required=True, help="Tritonserver url.", default="localhost:8001") @click.option( "--file_type", type=click.Choice(FILE_TYPE_NAMES, case_sensitive=False), diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index c61f288499..8f05229710 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -28,17 +28,10 @@ mamba env update \ ``` ## Running - -##### Setup Env Variable -```bash -export MORPHEUS_ROOT=$(pwd) -``` - Use Morpheus to run the GNN fraud detection Pipeline with the transaction data. A pipeline has been configured in `run.py` with several command line options: ```bash -cd ${MORPHEUS_ROOT}/examples/gnn_fraud_detection_pipeline -python run.py --help +python examples/gnn_fraud_detection_pipeline/run.py --help ``` ``` Usage: run.py [OPTIONS] @@ -63,11 +56,10 @@ Options: --help Show this message and exit. ``` -To launch the configured Morpheus pipeline with the sample data that is provided at `$MORPHEUS_ROOT/models/dataset`, run the following: +To launch the configured Morpheus pipeline, run the following: ```bash -cd ${MORPHEUS_ROOT}/examples/gnn_fraud_detection_pipeline -python run.py +python examples/gnn_fraud_detection_pipeline/run.py ``` ``` ====Registering Pipeline==== @@ -125,6 +117,7 @@ morpheus --log_level INFO \ monitor --description "Graph construction rate" \ gnn-fraud-sage --model_dir examples/gnn_fraud_detection_pipeline/model/ \ monitor --description "Inference rate" \ + gnn-fraud-classification --model_xgb_file examples/gnn_fraud_detection_pipeline/model/xgb.pt \ monitor --description "Add classification rate" \ serialize \ to-file --filename "output.csv" --overwrite diff --git a/examples/gnn_fraud_detection_pipeline/run.py b/examples/gnn_fraud_detection_pipeline/run.py index 58374a8c2b..ae91845b86 100644 --- a/examples/gnn_fraud_detection_pipeline/run.py +++ b/examples/gnn_fraud_detection_pipeline/run.py @@ -32,6 +32,8 @@ from stages.graph_construction_stage import FraudGraphConstructionStage from stages.graph_sage_stage import GraphSAGEStage +CUR_DIR = os.path.dirname(__file__) + @click.command() @click.option( @@ -62,21 +64,21 @@ @click.option( "--input_file", type=click.Path(exists=True, readable=True, dir_okay=False), - default="validation.csv", + default=os.path.join(CUR_DIR, "validation.csv"), required=True, help="Input data filepath.", ) @click.option( "--training_file", type=click.Path(exists=True, readable=True, dir_okay=False), - default="training.csv", + default=os.path.join(CUR_DIR, "training.csv"), required=True, help="Training data filepath.", ) @click.option( "--model_dir", type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True), - default="model", + default=os.path.join(CUR_DIR, "model"), required=True, help="Path to trained Hinsage & XGB models.", ) diff --git a/examples/log_parsing/README.md b/examples/log_parsing/README.md index ce9790be06..425e1c0b1c 100644 --- a/examples/log_parsing/README.md +++ b/examples/log_parsing/README.md @@ -29,11 +29,6 @@ Example: docker pull nvcr.io/nvidia/tritonserver:23.06-py3 ``` -##### Setup Env Variable -```bash -export MORPHEUS_ROOT=$(pwd) -``` - ##### Start Triton Inference Server Container From the Morpheus repo root directory, run the following to launch Triton and load the `log-parsing-onnx` model: @@ -56,19 +51,15 @@ Once Triton server finishes starting up, it will display the status of all loade ### Run Log Parsing Pipeline -Run the following from the `examples/log_parsing` directory to start the log parsing pipeline: +Run the following from the root of the Morpheus repo to start the log parsing pipeline: ```bash -python run.py \ - --num_threads 1 \ - --input_file ${MORPHEUS_ROOT}/models/datasets/validation-data/log-parsing-validation-data-input.csv \ - --output_file ./log-parsing-output.jsonlines \ +python examples/log_parsing/run.py \ + --input_file=./models/datasets/validation-data/log-parsing-validation-data-input.csv \ --model_vocab_hash_file=data/bert-base-cased-hash.txt \ - --model_vocab_file=${MORPHEUS_ROOT}/models/training-tuning-scripts/sid-models/resources/bert-base-cased-vocab.txt \ - --model_seq_length=256 \ + --model_vocab_file=./models/training-tuning-scripts/sid-models/resources/bert-base-cased-vocab.txt \ --model_name log-parsing-onnx \ - --model_config_file=${MORPHEUS_ROOT}/models/log-parsing-models/log-parsing-config-20220418.json \ - --server_url localhost:8001 + --model_config_file=./models/log-parsing-models/log-parsing-config-20220418.json ``` Use `--help` to display information about the command line options: @@ -110,7 +101,7 @@ PYTHONPATH="examples/log_parsing" \ morpheus --log_level INFO \ --plugin "inference" \ --plugin "postprocessing" \ - run --num_threads 1 --pipeline_batch_size 1024 --model_max_batch_size 32 \ + run --pipeline_batch_size 1024 --model_max_batch_size 32 \ pipeline-nlp \ from-file --filename ./models/datasets/validation-data/log-parsing-validation-data-input.csv \ deserialize \ diff --git a/examples/log_parsing/run.py b/examples/log_parsing/run.py index b0dfe76fd3..7fff20bd27 100644 --- a/examples/log_parsing/run.py +++ b/examples/log_parsing/run.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import click @@ -28,6 +29,7 @@ from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage +from morpheus.utils.logger import configure_logging @click.command() @@ -79,7 +81,7 @@ help="The name of the model that is deployed on Tritonserver.", ) @click.option("--model_config_file", required=True, help="Model config file.") -@click.option("--server_url", required=True, help="Tritonserver url.") +@click.option("--server_url", required=True, help="Tritonserver url.", default="localhost:8001") def run_pipeline( num_threads, pipeline_batch_size, @@ -93,6 +95,10 @@ def run_pipeline( model_config_file, server_url, ): + + # Enable the default logger. + configure_logging(log_level=logging.INFO) + config = Config() config.mode = PipelineModes.NLP config.num_threads = num_threads diff --git a/examples/nlp_si_detection/README.md b/examples/nlp_si_detection/README.md index 32cc2f23b5..33081caf00 100644 --- a/examples/nlp_si_detection/README.md +++ b/examples/nlp_si_detection/README.md @@ -103,11 +103,10 @@ The following command line is the entire command to build and launch the pipelin From the Morpheus repo root directory, run: ```bash -export MORPHEUS_ROOT=$(pwd) # Launch Morpheus printing debug messages morpheus --log_level=DEBUG \ - `# Run a pipeline with 8 threads and a model batch size of 32 (Must match Triton config)` \ - run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=32 \ + `# Run a pipeline with a model batch size of 32 (Must match Triton config)` \ + run --pipeline_batch_size=1024 --model_max_batch_size=32 \ `# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \ pipeline-nlp --model_seq_length=256 \ `# 1st Stage: Read from file` \ diff --git a/examples/nlp_si_detection/run.sh b/examples/nlp_si_detection/run.sh index f702784968..390418e545 100755 --- a/examples/nlp_si_detection/run.sh +++ b/examples/nlp_si_detection/run.sh @@ -19,7 +19,7 @@ SCRIPT_DIR=${SCRIPT_DIR:-"$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null export MORPHEUS_ROOT=${MORPHEUS_ROOT:-"$(realpath ${SCRIPT_DIR}/../..)"} morpheus --log_level=DEBUG \ - run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=32 \ + run --pipeline_batch_size=1024 --model_max_batch_size=32 \ pipeline-nlp --model_seq_length=256 \ from-file --filename=${MORPHEUS_ROOT}/examples/data/pcap_dump.jsonlines \ deserialize \ diff --git a/examples/ransomware_detection/README.md b/examples/ransomware_detection/README.md index 23f44e4ede..6c04feae46 100644 --- a/examples/ransomware_detection/README.md +++ b/examples/ransomware_detection/README.md @@ -35,15 +35,15 @@ export MORPHEUS_ROOT=$(pwd) ``` ##### Start Triton Inference Server Container -Run the following from the `examples/ransomware_detection` directory to launch Triton and load the `ransomw-model-short-rf` model: - +From the Morpheus repo root directory, run the following to launch Triton and load the `ransomw-model-short-rf` model: ```bash # Run Triton in explicit mode -docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v $PWD/models:/models/triton-model-repo nvcr.io/nvidia/tritonserver:23.06-py3 \ - tritonserver --model-repository=/models/triton-model-repo \ - --exit-on-error=false \ - --model-control-mode=explicit \ - --load-model ransomw-model-short-rf +docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 \ + -v $PWD/examples/ransomware_detection/models:/models/triton-model-repo nvcr.io/nvidia/tritonserver:23.06-py3 \ + tritonserver --model-repository=/models/triton-model-repo \ + --exit-on-error=false \ + --model-control-mode=explicit \ + --load-model ransomw-model-short-rf ``` ##### Verify Model Deployment @@ -67,14 +67,13 @@ mamba install 'dask>=2023.1.1' 'distributed>=2023.1.1' ``` ## Run Ransomware Detection Pipeline -Run the following from the `examples/ransomware_detection` directory to start the ransomware detection pipeline: +Run the following from the root of the Morpheus repo to start the ransomware detection pipeline: ```bash -python run.py --server_url=localhost:8001 \ +python examples/ransomware_detection/run.py --server_url=localhost:8001 \ --sliding_window=3 \ --model_name=ransomw-model-short-rf \ - --conf_file=./config/ransomware_detection.yaml \ - --input_glob=${MORPHEUS_ROOT}/examples/data/appshield/*/snapshot-*/*.json \ + --input_glob=./examples/data/appshield/*/snapshot-*/*.json \ --output_file=./ransomware_detection_output.jsonlines ``` diff --git a/examples/ransomware_detection/run.py b/examples/ransomware_detection/run.py index 58296bd2ae..5a80265996 100644 --- a/examples/ransomware_detection/run.py +++ b/examples/ransomware_detection/run.py @@ -33,6 +33,8 @@ from stages.create_features import CreateFeaturesRWStage from stages.preprocessing import PreprocessingRWStage +CUR_DIR = os.path.dirname(__file__) + @click.command() @click.option('--debug', default=False) @@ -64,7 +66,7 @@ @click.option( "--conf_file", type=click.STRING, - default="./config/ransomware_detection.yaml", + default=os.path.join(CUR_DIR, "config/ransomware_detection.yaml"), help="Ransomware detection configuration filepath.", ) @click.option( diff --git a/examples/root_cause_analysis/README.md b/examples/root_cause_analysis/README.md index 2efd63c0ed..b456c3ff72 100644 --- a/examples/root_cause_analysis/README.md +++ b/examples/root_cause_analysis/README.md @@ -98,9 +98,6 @@ From the Morpheus repo root directory, run: ```bash export MORPHEUS_ROOT=$(pwd) -``` - -```bash morpheus --log_level=DEBUG \ `# Run a pipeline with 5 threads and a model batch size of 32 (Must match Triton config)` \ run --num_threads=8 --edge_buffer_size=4 --use_cpp=True --pipeline_batch_size=1024 --model_max_batch_size=32 \ @@ -113,7 +110,7 @@ deserialize \ `# 3rd Stage: Preprocessing converts the input data into BERT tokens` \ preprocess --column=log --vocab_hash_file=./data/bert-base-uncased-hash.txt --truncation=True --do_lower_case=True --add_special_tokens=False \ `# 4th Stage: Send messages to Triton for inference. Specify the binary model loaded in Setup` \ -inf-triton --force_convert_inputs=True --model_name=root-cause-binary-onnx --server_url=localhost:8001 \ +inf-triton --model_name=root-cause-binary-onnx --server_url=localhost:8000 --force_convert_inputs=True \ `# 5th Stage: Monitor stage prints throughput information to the console` \ monitor --description='Inference rate' --smoothing=0.001 --unit inf \ `# 6th Stage: Add scores from inference to the messages` \ diff --git a/morpheus/_lib/common/__init__.pyi b/morpheus/_lib/common/__init__.pyi index a5ec2b692c..39e722bdec 100644 --- a/morpheus/_lib/common/__init__.pyi +++ b/morpheus/_lib/common/__init__.pyi @@ -18,6 +18,7 @@ __all__ = [ "TypeId", "determine_file_type", "read_file_to_df", + "typeid_is_fully_supported", "typeid_to_numpy_str", "write_df_to_file" ] @@ -199,6 +200,8 @@ def determine_file_type(filename: str) -> FileTypes: pass def read_file_to_df(filename: str, file_type: FileTypes = FileTypes.Auto) -> object: pass +def typeid_is_fully_supported(arg0: TypeId) -> bool: + pass def typeid_to_numpy_str(arg0: TypeId) -> str: pass def write_df_to_file(df: object, filename: str, file_type: FileTypes = FileTypes.Auto, **kwargs) -> None: diff --git a/morpheus/_lib/common/module.cpp b/morpheus/_lib/common/module.cpp index 0c2ae40914..0bda85b975 100644 --- a/morpheus/_lib/common/module.cpp +++ b/morpheus/_lib/common/module.cpp @@ -129,6 +129,10 @@ PYBIND11_MODULE(common, _module) return DType(tid).type_str(); }); + _module.def("typeid_is_fully_supported", [](TypeId tid) { + return DType(tid).is_fully_supported(); + }); + _module.def( "determine_file_type", py::overload_cast(&determine_file_type), py::arg("filename")); _module.def("determine_file_type", diff --git a/morpheus/_lib/include/morpheus/objects/dtype.hpp b/morpheus/_lib/include/morpheus/objects/dtype.hpp index 63dbd1594a..2297460b52 100644 --- a/morpheus/_lib/include/morpheus/objects/dtype.hpp +++ b/morpheus/_lib/include/morpheus/objects/dtype.hpp @@ -33,6 +33,13 @@ namespace morpheus { */ // Pulled from cuDF + +/** + * @brief Template function to calculate the size in bits of a given type. + * + * @tparam T The type to calculate the size for. + * @return The size in bits of the given type. + */ template constexpr std::size_t size_in_bits() { @@ -40,8 +47,11 @@ constexpr std::size_t size_in_bits() return sizeof(T) * CHAR_BIT; } -// Pulled from cudf #pragma GCC visibility push(default) + +/** + * @brief Enum class for representing data types used in Tensors and DataFrame columns. + */ enum class TypeId : int32_t { EMPTY, ///< Always null with no underlying data @@ -78,40 +88,112 @@ enum class TypeId : int32_t NUM_TYPE_IDS ///< Total number of type ids }; -/****** DType****************************************/ +/** + * @class DType + * @brief This class represents a data type specified by a TypeId. + */ struct DType { + /** + * @brief Construct a DType for a given type specified by a TypeId. + * + * @param tid The TypeId to initialize the DType object with. + */ DType(TypeId tid); + + /** + * @brief Copy constructor. + * + * @param dtype The DType object to copy from. + */ DType(const DType& dtype) = default; + + /** + * @brief Equality operator. + * + * @param other The DType object to compare with. + * @return True if the two DType objects represent the same TypeId, false otherwise. + */ bool operator==(const DType& other) const; + /** + * @brief Get the TypeId of the DType object. + * + * @return The TypeId of the DType object. + */ TypeId type_id() const; - // Number of bytes per item + /** + * @brief Get the number of bytes per item. + * + * @return The number of bytes per item. + */ size_t item_size() const; - // Pretty print + /** + * @brief Get the name of the DType object. + * + * @return The name of the DType object. + */ std::string name() const; - // Returns the numpy string representation + /** + * @brief Get the numpy string representation of the DType object. + * + * @return The numpy string representation of the DType object. + */ std::string type_str() const; - // Cudf representation + /** + * @brief Get the cudf type id of the DType object. + * + * @return The cudf type id of the DType object. + */ cudf::type_id cudf_type_id() const; - // Returns the triton string representation + /** + * @brief Get the triton string representation of the DType object. + * + * @return The triton string representation of the DType object. + */ std::string triton_str() const; - // From cudf + /** + * @brief Create a DType object from a cudf type id. + * + * @param id The cudf type id. + * @return A DType object. + */ static DType from_cudf(cudf::type_id tid); - // From numpy + /** + * @brief Create a DType object from a numpy type string. + * + * @param type_str The numpy type string. + * @return A DType object. + */ static DType from_numpy(const std::string& numpy_str); - // From triton + /** + * @brief Create a DType object from a triton type string. + * + * @param type_str The triton type string. + * @return A DType object. + */ static DType from_triton(const std::string& type_str); - // from template + /** + * @brief Check if the DType object is fully supported. + * + * @return True if the DType object is fully supported, false otherwise. + */ + bool is_fully_supported() const; + + /** + * @brief Construct a DType object from a C++ type. + * + * @return A DType object. + */ template static DType create() { diff --git a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp index fd115de5af..24f6934fdd 100644 --- a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp +++ b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp @@ -111,6 +111,7 @@ class MORPHEUS_EXPORT InferenceClientStage * @param model_name : Name of the model specifies which model can handle the inference requests that are sent to * Triton inference * @param needs_logits : Determines if logits are required. + * @param force_convert_inputs : Determines if inputs should be converted to the model's input format. * @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this * if the Morpheus names do not match the model. */ @@ -154,6 +155,7 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy * Triton inference * @param server_url : Triton server URL. * @param needs_logits : Determines if logits are required. + * @param force_convert_inputs : Determines if inputs should be converted to the model's input format. * @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this * if the Morpheus names do not match the model. * @return std::shared_ptr>> @@ -164,6 +166,7 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy std::string model_name, std::string server_url, bool needs_logits, + bool force_convert_inputs, std::map input_mapping, std::map output_mapping); @@ -176,6 +179,7 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy * Triton inference * @param server_url : Triton server URL. * @param needs_logits : Determines if logits are required. + * @param force_convert_inputs : Determines if inputs should be converted to the model's input format. * @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this * if the Morpheus names do not match the model. * @return std::shared_ptr>> @@ -186,6 +190,7 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy std::string model_name, std::string server_url, bool needs_logits, + bool force_convert_inputs, std::map input_mapping, std::map output_mapping); }; diff --git a/morpheus/_lib/include/morpheus/stages/triton_inference.hpp b/morpheus/_lib/include/morpheus/stages/triton_inference.hpp index 923a75e2b7..1cc8af06af 100644 --- a/morpheus/_lib/include/morpheus/stages/triton_inference.hpp +++ b/morpheus/_lib/include/morpheus/stages/triton_inference.hpp @@ -153,9 +153,12 @@ class MORPHEUS_EXPORT TritonInferenceClientSession : public IInferenceClientSess std::vector m_model_inputs; std::vector m_model_outputs; std::shared_ptr m_client; + bool m_force_convert_inputs; public: - TritonInferenceClientSession(std::shared_ptr client, std::string model_name); + TritonInferenceClientSession(std::shared_ptr client, + std::string model_name, + bool force_convert_inputs); /** @brief Gets the inference input mappings for Triton @@ -178,9 +181,10 @@ class MORPHEUS_EXPORT TritonInferenceClient : public IInferenceClient private: std::shared_ptr m_client; std::string m_model_name; + bool m_force_convert_inputs; public: - TritonInferenceClient(std::unique_ptr&& client, std::string model_name); + TritonInferenceClient(std::unique_ptr&& client, std::string model_name, bool force_convert_inputs); /** @brief Creates a TritonInferenceClientSession diff --git a/morpheus/_lib/src/objects/dtype.cpp b/morpheus/_lib/src/objects/dtype.cpp index 870cdb8059..3f167b1e01 100644 --- a/morpheus/_lib/src/objects/dtype.cpp +++ b/morpheus/_lib/src/objects/dtype.cpp @@ -357,4 +357,21 @@ char DType::type_char() const } } +bool DType::is_fully_supported() const +{ + try + { + byte_order_char(); + cudf_type_id(); + item_size(); + triton_str(); + type_char(); + } catch (...) + { + return false; + } + + return true; +} + } // namespace morpheus diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 26428aa159..d53364a650 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -333,6 +333,10 @@ mrc::coroutines::AsyncGenerator> InferenceClientStage input_mappings, std::map output_mappings) { @@ -393,8 +398,9 @@ InferenceClientStageInterfaceProxy::init_mm(mrc::segment::Builder& builder, output_mappings_.emplace_back(TensorModelMapping{mapping.first, mapping.second}); } - auto triton_client = std::make_unique(server_url); - auto triton_inference_client = std::make_unique(std::move(triton_client), model_name); + auto triton_client = std::make_unique(server_url); + auto triton_inference_client = + std::make_unique(std::move(triton_client), model_name, force_convert_inputs); auto stage = builder.construct_object>( name, std::move(triton_inference_client), model_name, needs_logits, input_mappings_, output_mappings_); @@ -408,6 +414,7 @@ InferenceClientStageInterfaceProxy::init_cm(mrc::segment::Builder& builder, std::string server_url, std::string model_name, bool needs_logits, + bool force_convert_inputs, std::map input_mappings, std::map output_mappings) { @@ -424,9 +431,10 @@ InferenceClientStageInterfaceProxy::init_cm(mrc::segment::Builder& builder, output_mappings_.emplace_back(TensorModelMapping{mapping.first, mapping.second}); } - auto triton_client = std::make_unique(server_url); - auto triton_inference_client = std::make_unique(std::move(triton_client), model_name); - auto stage = builder.construct_object>( + auto triton_client = std::make_unique(server_url); + auto triton_inference_client = + std::make_unique(std::move(triton_client), model_name, force_convert_inputs); + auto stage = builder.construct_object>( name, std::move(triton_inference_client), model_name, needs_logits, input_mappings_, output_mappings_); return stage; diff --git a/morpheus/_lib/src/stages/triton_inference.cpp b/morpheus/_lib/src/stages/triton_inference.cpp index 30f100e7ea..a78beb5d11 100644 --- a/morpheus/_lib/src/stages/triton_inference.cpp +++ b/morpheus/_lib/src/stages/triton_inference.cpp @@ -258,9 +258,11 @@ triton::client::Error HttpTritonClient::async_infer(triton::client::InferenceSer } TritonInferenceClientSession::TritonInferenceClientSession(std::shared_ptr client, - std::string model_name) : + std::string model_name, + bool force_convert_inputs) : m_client(std::move(client)), - m_model_name(std::move(model_name)) + m_model_name(std::move(model_name)), + m_force_convert_inputs(force_convert_inputs) { // Now load the input/outputs for the model @@ -433,8 +435,24 @@ mrc::coroutines::Task TritonInferenceClientSession::infer(TensorMap&& for (auto model_input : m_model_inputs) { - auto inference_input_slice = - inputs[model_input.name].slice({start, 0}, {stop, -1}).as_type(model_input.datatype); + auto inference_input_slice = inputs.at(model_input.name).slice({start, 0}, {stop, -1}); + + if (inference_input_slice.dtype() != model_input.datatype) + { + if (m_force_convert_inputs) + { + inference_input_slice.swap(inference_input_slice.as_type(model_input.datatype)); + } + else + { + std::string err_msg = MORPHEUS_CONCAT_STR( + "Unexpected dtype for Triton input. Cannot automatically convert dtype due to loss of data." + "Input Name: '" + << model_input.name << ", Expected: " << model_input.datatype.name() + << ", Actual dtype:" << inference_input_slice.dtype().name()); + throw std::invalid_argument(err_msg); + } + } inference_inputs.emplace_back( TritonInferInput{model_input.name, @@ -491,14 +509,17 @@ mrc::coroutines::Task TritonInferenceClientSession::infer(TensorMap&& co_return model_output_tensors; }; -TritonInferenceClient::TritonInferenceClient(std::unique_ptr&& client, std::string model_name) : +TritonInferenceClient::TritonInferenceClient(std::unique_ptr&& client, + std::string model_name, + bool force_convert_inputs) : m_client(std::move(client)), - m_model_name(std::move(model_name)) + m_model_name(std::move(model_name)), + m_force_convert_inputs(force_convert_inputs) {} std::unique_ptr TritonInferenceClient::create_session() { - return std::make_unique(m_client, m_model_name); + return std::make_unique(m_client, m_model_name, m_force_convert_inputs); } } // namespace morpheus diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index 85767bdcef..78a0ff8091 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -71,10 +71,10 @@ class HttpServerSourceStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', method: str = 'POST', accept_status: int = 201, sleep_time: float = 0.10000000149011612, queue_timeout: int = 5, max_queue_size: int = 1024, num_server_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30, lines: bool = False, stop_after: int = 0) -> None: ... pass class InferenceClientStageCM(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, force_convert_inputs: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... pass class InferenceClientStageMM(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, force_convert_inputs: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... pass class KafkaSourceStage(mrc.core.segment.SegmentObject): @typing.overload diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 6cdba387f0..1cf57663ac 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -177,6 +177,7 @@ PYBIND11_MODULE(stages, _module) py::arg("server_url"), py::arg("model_name"), py::arg("needs_logits"), + py::arg("force_convert_inputs"), py::arg("input_mapping") = py::dict(), py::arg("output_mapping") = py::dict()); @@ -190,6 +191,7 @@ PYBIND11_MODULE(stages, _module) py::arg("server_url"), py::arg("model_name"), py::arg("needs_logits"), + py::arg("force_convert_inputs"), py::arg("input_mapping") = py::dict(), py::arg("output_mapping") = py::dict()); diff --git a/morpheus/_lib/tests/objects/test_dtype.cpp b/morpheus/_lib/tests/objects/test_dtype.cpp index 230d68dcd6..1f1a70bb51 100644 --- a/morpheus/_lib/tests/objects/test_dtype.cpp +++ b/morpheus/_lib/tests/objects/test_dtype.cpp @@ -22,6 +22,8 @@ #include #include +#include // for int32_t +#include // for set #include using namespace morpheus; @@ -283,4 +285,17 @@ TEST_F(TestDType, FromCudfNotSupported) EXPECT_THROW(DType::from_cudf(cudf::type_id::DECIMAL128), std::invalid_argument); EXPECT_THROW(DType::from_cudf(cudf::type_id::STRUCT), std::invalid_argument); EXPECT_THROW(DType::from_cudf(cudf::type_id::NUM_TYPE_IDS), std::invalid_argument); -} \ No newline at end of file +} + +TEST_F(TestDType, IsFullySupported) +{ + std::set unsupported_types = {TypeId::EMPTY, TypeId::STRING, TypeId::NUM_TYPE_IDS}; + for (auto type_id = static_cast(TypeId::EMPTY); type_id <= static_cast(TypeId::NUM_TYPE_IDS); + ++type_id) + { + auto enum_type_id = static_cast(type_id); + auto dtype = DType(enum_type_id); + + ASSERT_EQ(dtype.is_fully_supported(), !unsupported_types.contains(enum_type_id)); + } +} diff --git a/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp b/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp index df7785d259..170655e8c9 100644 --- a/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp +++ b/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp @@ -22,12 +22,14 @@ #include "morpheus/messages/multi_inference.hpp" #include "morpheus/messages/multi_response.hpp" #include "morpheus/objects/dtype.hpp" +#include "morpheus/objects/memory_descriptor.hpp" // for MemoryDescriptor #include "morpheus/objects/tensor.hpp" #include "morpheus/objects/tensor_object.hpp" #include "morpheus/stages/inference_client_stage.hpp" #include "morpheus/stages/triton_inference.hpp" #include "morpheus/types.hpp" #include "morpheus/utilities/cudf_util.hpp" +#include "morpheus/utilities/matx_util.hpp" #include #include @@ -37,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -44,15 +47,19 @@ #include #include #include +#include // for get_current_device_resource #include #include +#include // for initializer_list #include #include #include +#include // for operator<<, basic_ostream #include #include #include +#include class FakeInferResult : public triton::client::InferResult { @@ -117,6 +124,68 @@ class FakeInferResult : public triton::client::InferResult }; class FakeTritonClient : public morpheus::ITritonClient +{ + public: + triton::client::Error is_server_live(bool* live) override + { + *live = true; + return triton::client::Error::Success; + } + + triton::client::Error is_server_ready(bool* ready) override + { + *ready = true; + return triton::client::Error::Success; + } + + triton::client::Error is_model_ready(bool* ready, std::string& model_name) override + { + *ready = true; + return triton::client::Error::Success; + } + + triton::client::Error model_config(std::string* model_config, std::string& model_name) override + { + *model_config = R"({ + "max_batch_size": 100 + })"; + + return triton::client::Error::Success; + } + + triton::client::Error model_metadata(std::string* model_metadata, std::string& model_name) override + { + *model_metadata = R"({ + "inputs":[ + { + "name":"seq_ids", + "shape": [0, 1], + "datatype":"INT32" + } + ], + "outputs":[ + { + "name":"seq_ids", + "shape": [0, 1], + "datatype":"INT32" + } + ]})"; + + return triton::client::Error::Success; + } + + triton::client::Error async_infer(triton::client::InferenceServerHttpClient::OnCompleteFn callback, + const triton::client::InferOptions& options, + const std::vector& inputs, + const std::vector& outputs) override + { + callback(new FakeInferResult({{"seq_ids", std::vector({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})}})); + + return triton::client::Error::Success; + } +}; + +class ErrorProneTritonClient : public FakeTritonClient { private: bool m_is_server_live_has_errored = false; @@ -145,7 +214,7 @@ class FakeTritonClient : public morpheus::ITritonClient m_is_server_live = true; } - return triton::client::Error::Success; + return FakeTritonClient::is_server_live(live); } triton::client::Error is_server_ready(bool* ready) override @@ -192,11 +261,7 @@ class FakeTritonClient : public morpheus::ITritonClient return triton::client::Error("model_config error"); } - *model_config = R"({ - "max_batch_size": 100 - })"; - - return triton::client::Error::Success; + return FakeTritonClient::model_config(model_config, model_name); } triton::client::Error model_metadata(std::string* model_metadata, std::string& model_name) override @@ -207,23 +272,7 @@ class FakeTritonClient : public morpheus::ITritonClient return triton::client::Error("model_metadata error"); } - *model_metadata = R"({ - "inputs":[ - { - "name":"seq_ids", - "shape": [0, 1], - "datatype":"INT32" - } - ], - "outputs":[ - { - "name":"seq_ids", - "shape": [0, 1], - "datatype":"INT32" - } - ]})"; - - return triton::client::Error::Success; + return FakeTritonClient::model_metadata(model_metadata, model_name); } triton::client::Error async_infer(triton::client::InferenceServerHttpClient::OnCompleteFn callback, @@ -237,9 +286,7 @@ class FakeTritonClient : public morpheus::ITritonClient return triton::client::Error("async_infer error"); } - callback(new FakeInferResult({{"seq_ids", std::vector({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})}})); - - return triton::client::Error::Success; + return FakeTritonClient::async_infer(callback, options, inputs, outputs); } }; @@ -307,8 +354,9 @@ TEST_F(TestTritonInferenceStage, SingleRow) auto message = std::make_shared(meta, 0, count, memory); // create the fake triton client used for testing. - auto triton_client = std::make_unique(); - auto triton_inference_client = std::make_unique(std::move(triton_client), ""); + auto triton_client = std::make_unique(); + auto triton_inference_client = + std::make_unique(std::move(triton_client), "", true); auto stage = morpheus::InferenceClientStage( std::move(triton_inference_client), "", false, {}, {}); @@ -342,3 +390,90 @@ TEST_F(TestTritonInferenceStage, SingleRow) ASSERT_EQ(results.size(), 1); } + +TEST_F(TestTritonInferenceStage, ForceConvert) +{ + using namespace morpheus; + const TypeId model_type = TypeId::INT32; + const std::size_t count = 10; + + std::vector test_types = {TypeId::INT8, + TypeId::INT16, + TypeId::INT32, + TypeId::INT64, + TypeId::UINT8, + TypeId::UINT16, + TypeId::UINT32, + TypeId::UINT64}; + + for (const auto type_id : test_types) + { + for (bool force_convert_inputs : {true, false}) + { + const bool expect_throw = (type_id != model_type) && !force_convert_inputs; + const auto dtype = DType(type_id); + + DVLOG(10) << "Testing type: " << dtype.name() << " with force_convert_inputs: " << force_convert_inputs + << " and expect_throw: " << expect_throw; + + // Create a seq_id tensor + auto md = + std::make_shared(rmm::cuda_stream_per_thread, rmm::mr::get_current_device_resource()); + auto seq_ids_buffer = MatxUtil::create_seq_ids(count, 1, type_id, md); + + auto tensors = TensorMap(); + tensors["seq_ids"].swap(Tensor::create(seq_ids_buffer, dtype, {count, 3}, {})); + + // create the MultiInferenceMessage using the sequence id tensor. + auto memory = std::make_shared(count, std::move(tensors)); + auto table = create_test_table_with_metadata(count); + auto meta = morpheus::MessageMeta::create_from_cpp(std::move(table), 1); + auto message = std::make_shared(meta, 0, count, memory); + + // create the fake triton client used for testing. + auto triton_client = std::make_unique(); + auto triton_inference_client = + std::make_unique(std::move(triton_client), "", force_convert_inputs); + auto stage = + morpheus::InferenceClientStage( + std::move(triton_inference_client), "", false, {}, {}); + + // manually invoke the stage and iterate through the inference responses + auto on = std::make_shared(); + auto results_task = [](auto& stage, auto message, auto on) + -> mrc::coroutines::Task>> { + std::vector> results; + + auto responses_generator = stage.on_data(std::move(message), on); + + auto iter = co_await responses_generator.begin(); + + while (iter != responses_generator.end()) + { + results.emplace_back(std::move(*iter)); + + co_await ++iter; + } + + co_return results; + }(stage, message, on); + + results_task.resume(); + + while (on->resume_next()) {} + + if (expect_throw) + { + ASSERT_THROW(results_task.promise().result(), std::invalid_argument); + } + else + { + ASSERT_NO_THROW(results_task.promise().result()); + + auto results = results_task.promise().result(); + + ASSERT_EQ(results.size(), 1); + } + } + } +} diff --git a/morpheus/common/__init__.py b/morpheus/common/__init__.py index 01b1d97ba0..3170b82e66 100644 --- a/morpheus/common/__init__.py +++ b/morpheus/common/__init__.py @@ -23,6 +23,7 @@ from morpheus._lib.common import TypeId from morpheus._lib.common import determine_file_type from morpheus._lib.common import read_file_to_df +from morpheus._lib.common import typeid_is_fully_supported from morpheus._lib.common import typeid_to_numpy_str from morpheus._lib.common import write_df_to_file @@ -34,6 +35,7 @@ "HttpServer", "read_file_to_df", "Tensor", + "typeid_is_fully_supported", "typeid_to_numpy_str", "TypeId", "write_df_to_file", diff --git a/morpheus/pipeline/preallocator_mixin.py b/morpheus/pipeline/preallocator_mixin.py index c40ed6be04..acec20b9c7 100644 --- a/morpheus/pipeline/preallocator_mixin.py +++ b/morpheus/pipeline/preallocator_mixin.py @@ -26,6 +26,7 @@ import cudf from morpheus.common import TypeId +from morpheus.common import typeid_is_fully_supported from morpheus.common import typeid_to_numpy_str from morpheus.config import CppConfig from morpheus.messages import ControlMessage @@ -90,6 +91,13 @@ def _preallocate_control(self, msg: ControlMessage) -> ControlMessage: self._preallocate_meta(msg.payload()) return msg + def _all_types_supported_in_cpp(self) -> bool: + for column_type in self._needed_columns.values(): + if not typeid_is_fully_supported(column_type): + return False + + return True + def _post_build_single(self, builder: mrc.Builder, out_node: mrc.SegmentObject) -> mrc.SegmentObject: out_type = self.output_ports[0].output_type pretty_type = pretty_print_type_name(out_type) @@ -99,7 +107,7 @@ def _post_build_single(self, builder: mrc.Builder, out_node: mrc.SegmentObject) if issubclass(out_type, (ControlMessage, MessageMeta, MultiMessage)): # Intentionally not using `_build_cpp_node` because `LinearBoundaryIngressStage` lacks a C++ impl - if CppConfig.get_should_use_cpp(): + if CppConfig.get_should_use_cpp() and self._all_types_supported_in_cpp(): import morpheus._lib.stages as _stages needed_columns = list(self._needed_columns.items()) if issubclass(out_type, ControlMessage): diff --git a/morpheus/stages/inference/triton_inference_stage.py b/morpheus/stages/inference/triton_inference_stage.py index 0b8a79dddf..c46cdcab48 100644 --- a/morpheus/stages/inference/triton_inference_stage.py +++ b/morpheus/stages/inference/triton_inference_stage.py @@ -756,7 +756,14 @@ def __init__(self, def supports_cpp_node(self) -> bool: # Get the value from the worker class - return TritonInferenceWorker.supports_cpp_node() + if TritonInferenceWorker.supports_cpp_node(): + if not self._use_shared_memory: + return True + + logger.warning("The C++ implementation of TritonInferenceStage does not support the use_shared_memory " + "option. Falling back to Python implementation.") + + return False def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInferenceWorker: """ @@ -781,6 +788,7 @@ def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: self._server_url, self._model_name, self._needs_logits, + self._force_convert_inputs, self._input_mapping, self._output_mapping) @@ -789,6 +797,7 @@ def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: self._server_url, self._model_name, self._needs_logits, + self._force_convert_inputs, self._input_mapping, self._output_mapping)