From c12dbac31b2431de798c514017ad4d3e5874b7aa Mon Sep 17 00:00:00 2001 From: Julio Perez <37191411+jperez999@users.noreply.github.com> Date: Thu, 25 Aug 2022 11:00:26 -0400 Subject: [PATCH] Fix unit scaling criteo inference serving (#559) * merge resolved * fix unit test for scaling criteo * add back model import --- ...erence-with-Merlin-Models-TensorFlow.ipynb | 41 +++++++++---------- .../test_scaling_criteo_merlin_models.py | 39 ++++++++++-------- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb b/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb index 950332694..42fa1fa48 100644 --- a/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb +++ b/examples/scaling-criteo/04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb @@ -31,17 +31,14 @@ "\n", "\n", "# Scaling Criteo: Triton Inference with Merlin Models TensorFlow\n", - "\n", "This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container.\n", "\n", "## Overview\n", - "\n", - "In the previous notebook, we processed the [criteo dataset with NVTabular](02-ETL-with-NVTabular) and [trained a DLRM model with Merlin Model Tensorflow](04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb). Finally, we want to deploy our pipeline to [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server), which can serve our model in a production environment. \n", + "In the previous notebook, we processed the [criteo dataset with NVTabular](02-ETL-with-NVTabular) and [trained a DLRM model with Merlin Model Tensorflow](04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb). Finally, we want to deploy our pipeline to [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server), which can serve our model in a production environment.\n", "\n", "We can send raw data to the API endpoint. TIS will execute the same NVTabular workflow for feature engineering and predict the processed data with Merlin Models TensorFlow. We deploy the pipeline as an ensemble and receive the predict scores. This notebook is based on the Example, [Serving Ranking Models With Merlin Systems](https://github.com/NVIDIA-Merlin/systems/blob/main/examples/Serving-Ranking-Models-With-Merlin-Systems.ipynb), in Merlin systems. If you are interested in more details, we recommend to go through the example, first.\n", "\n", "## Learning objectives\n", - "\n", "- Deploy an ensemble pipeline of NVTabular and Merlin Models TensorFlow to Triton Inference Server\n", "- Get prediction from Triton Inference Server" ] @@ -71,22 +68,18 @@ "source": [ "import os\n", "import glob\n", - "os.environ[\"TF_GPU_ALLOCATOR\"]=\"cuda_malloc_async\"\n", + "os.environ[\"TF_GPU_ALLOCATOR\"] = \"cuda_malloc_async\"\n", "\n", "import tensorflow as tf\n", - "import numpy as np\n", - "import tritonclient.grpc as grpcclient\n", - "\n", - "import merlin.models.tf as mm\n", "\n", + "import tritonclient.grpc as grpcclient\n", "from nvtabular.workflow import Workflow\n", - "\n", + "import merlin.models.tf as mm\n", "from merlin.schema.tags import Tags\n", "from merlin.systems.dag.ops.workflow import TransformWorkflow\n", "from merlin.systems.dag.ops.tensorflow import PredictTensorflow\n", "from merlin.systems.dag.ensemble import Ensemble\n", "from merlin.systems.triton import convert_df_to_triton_input\n", - "\n", "from merlin.core.dispatch import get_lib" ] }, @@ -270,12 +263,12 @@ "outputs": [], "source": [ "df_lib = get_lib()\n", - "\n", + "input_cols = workflow.input_schema.column_names\n", "# read in data for request\n", "batch = df_lib.read_parquet(\n", - " os.path.join(sorted(glob.glob(original_data_path + \"/*.parquet\"))[-1]), \n", - " num_rows=3, \n", - " columns=workflow.input_schema.column_names\n", + " os.path.join(sorted(glob.glob(original_data_path + \"/*.parquet\"))[-1]),\n", + " num_rows=3,\n", + " columns=input_cols\n", ")\n", "batch" ] @@ -299,12 +292,11 @@ "source": [ "\n", "# create inputs and outputs\n", - "\n", "inputs = convert_df_to_triton_input(workflow.input_schema.column_names, batch.fillna(0), grpcclient.InferInput)\n", - "\n", + "output_cols = ensemble.graph.output_schema.column_names\n", "outputs = [\n", " grpcclient.InferRequestedOutput(col)\n", - " for col in ensemble.graph.output_schema.column_names\n", + " for col in output_cols\n", "]" ] }, @@ -354,11 +346,11 @@ "source": [ "## Summary\n", "\n", - "In this example, we deployed a recommender system pipeline as an ensemble. First, NVTabular created features and afterwards, Merlin Models TensorFlow predicted the processed data. The DLRM architecture was used as a model. This process ensures that the training and production environments use the same feature engineering. \n", + "In this example, we deployed a recommender system pipeline as an ensemble. First, NVTabular created features and afterwards, Merlin Models TensorFlow predicted the processed data. The DLRM architecture was used as a model. This process ensures that the training and production environments use the same feature engineering.\n", "\n", "## Next steps\n", "\n", - "If you are interested in more details of the pipeline, we recommend to try out the [Merlin System example](https://github.com/NVIDIA-Merlin/systems/blob/main/examples/Serving-Ranking-Models-With-Merlin-Systems.ipynb). \n", + "If you are interested in more details of the pipeline, we recommend to try out the [Merlin System example](https://github.com/NVIDIA-Merlin/systems/blob/main/examples/Serving-Ranking-Models-With-Merlin-Systems.ipynb).\n", "\n", "In our Merlin repository, we provide [another end-to-end example](../Building-and-deploying-multi-stage-RecSys/) using a candidate retrieval and ranking model. In addition, we use approximate nearest neighbor and a feature store." ] @@ -366,7 +358,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3.9.7 64-bit", "language": "python", "name": "python3" }, @@ -380,7 +372,12 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.9.7" + }, + "vscode": { + "interpreter": { + "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" + } } }, "nbformat": 4, diff --git a/tests/unit/examples/test_scaling_criteo_merlin_models.py b/tests/unit/examples/test_scaling_criteo_merlin_models.py index 0a4985813..de7bf50cc 100644 --- a/tests/unit/examples/test_scaling_criteo_merlin_models.py +++ b/tests/unit/examples/test_scaling_criteo_merlin_models.py @@ -3,6 +3,9 @@ import pytest from testbook import testbook from tests.conftest import REPO_ROOT +from merlin.core.dispatch import get_lib +from merlin.models.loader.tf_utils import configure_tensorflow +from merlin.systems.triton.utils import run_ensemble_on_tritonserver pytest.importorskip("tensorflow") @@ -80,20 +83,24 @@ def test_func(): ) NUM_OF_CELLS = len(tb3.cells) tb3.execute_cell(list(range(0, NUM_OF_CELLS - 5))) - tb3.inject( - """ - import shutil - - from merlin.systems.triton.utils import run_ensemble_on_tritonserver - outputs = ensemble.graph.output_schema.column_names - response = run_ensemble_on_tritonserver( - "/tmp/output/criteo/ensemble/", outputs, batch.fillna(0), "ensemble_model" - ) - response = [x.tolist()[0] for x in response["label/binary_classification_task"]] - shutil.rmtree("/tmp/input/criteo", ignore_errors=True) - shutil.rmtree("/tmp/output/criteo", ignore_errors=True) - """ + input_cols = tb3.ref("input_cols") + outputs = tb3.ref("output_cols") + # read in data for request + df_lib = get_lib() + in_dtypes = {} + for col in input_cols: + if col.startswith("C"): + in_dtypes[col] = "int64" + if col.startswith("I"): + in_dtypes[col] = "float64" + batch = df_lib.read_parquet( + os.path.join("/tmp/output/criteo/", "valid", "part_0.parquet"), + num_rows=3, + columns=input_cols, + ) + batch = batch.astype(in_dtypes) + configure_tensorflow() + response = run_ensemble_on_tritonserver( + "/tmp/output/criteo/ensemble/", outputs, batch, "ensemble_model" ) - tb3.execute_cell(NUM_OF_CELLS - 4) - response = tb3.ref("response") - assert len(response) == 3 + assert len(response["label/binary_classification_task"]) == 3