Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unit scaling criteo inference serving #559

Merged
merged 6 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@
"<img src=\"http://developer.download.nvidia.com/compute/machine-learning/frameworks/nvidia_logo.png\" style=\"width: 90px; float: right;\">\n",
"\n",
"# Scaling Criteo: Triton Inference with Merlin Models TensorFlow\n",
"\n",
"This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container.\n",
"\n",
"## Overview\n",
"\n",
"In the previous notebook, we processed the [criteo dataset with NVTabular](02-ETL-with-NVTabular) and [trained a DLRM model with Merlin Model Tensorflow](04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb). Finally, we want to deploy our pipeline to [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server), which can serve our model in a production environment. \n",
"In the previous notebook, we processed the [criteo dataset with NVTabular](02-ETL-with-NVTabular) and [trained a DLRM model with Merlin Model Tensorflow](04-Triton-Inference-with-Merlin-Models-TensorFlow.ipynb). Finally, we want to deploy our pipeline to [Triton Inference Server (TIS)](https://github.com/triton-inference-server/server), which can serve our model in a production environment.\n",
"\n",
"We can send raw data to the API endpoint. TIS will execute the same NVTabular workflow for feature engineering and predict the processed data with Merlin Models TensorFlow. We deploy the pipeline as an ensemble and receive the predict scores. This notebook is based on the Example, [Serving Ranking Models With Merlin Systems](https://github.com/NVIDIA-Merlin/systems/blob/main/examples/Serving-Ranking-Models-With-Merlin-Systems.ipynb), in Merlin systems. If you are interested in more details, we recommend to go through the example, first.\n",
"\n",
"## Learning objectives\n",
"\n",
"- Deploy an ensemble pipeline of NVTabular and Merlin Models TensorFlow to Triton Inference Server\n",
"- Get prediction from Triton Inference Server"
]
Expand Down Expand Up @@ -71,22 +68,18 @@
"source": [
"import os\n",
"import glob\n",
"os.environ[\"TF_GPU_ALLOCATOR\"]=\"cuda_malloc_async\"\n",
"os.environ[\"TF_GPU_ALLOCATOR\"] = \"cuda_malloc_async\"\n",
"\n",
"import tensorflow as tf\n",
"import numpy as np\n",
"import tritonclient.grpc as grpcclient\n",
"\n",
"import merlin.models.tf as mm\n",
"\n",
"import tritonclient.grpc as grpcclient\n",
"from nvtabular.workflow import Workflow\n",
"\n",
"import merlin.models.tf as mm\n",
"from merlin.schema.tags import Tags\n",
"from merlin.systems.dag.ops.workflow import TransformWorkflow\n",
"from merlin.systems.dag.ops.tensorflow import PredictTensorflow\n",
"from merlin.systems.dag.ensemble import Ensemble\n",
"from merlin.systems.triton import convert_df_to_triton_input\n",
"\n",
"from merlin.core.dispatch import get_lib"
]
},
Expand Down Expand Up @@ -270,12 +263,12 @@
"outputs": [],
"source": [
"df_lib = get_lib()\n",
"\n",
"input_cols = workflow.input_schema.column_names\n",
"# read in data for request\n",
"batch = df_lib.read_parquet(\n",
" os.path.join(sorted(glob.glob(original_data_path + \"/*.parquet\"))[-1]), \n",
" num_rows=3, \n",
" columns=workflow.input_schema.column_names\n",
" os.path.join(sorted(glob.glob(original_data_path + \"/*.parquet\"))[-1]),\n",
" num_rows=3,\n",
" columns=input_cols\n",
")\n",
"batch"
]
Expand All @@ -299,12 +292,11 @@
"source": [
"\n",
"# create inputs and outputs\n",
"\n",
"inputs = convert_df_to_triton_input(workflow.input_schema.column_names, batch.fillna(0), grpcclient.InferInput)\n",
"\n",
"output_cols = ensemble.graph.output_schema.column_names\n",
"outputs = [\n",
" grpcclient.InferRequestedOutput(col)\n",
" for col in ensemble.graph.output_schema.column_names\n",
" for col in output_cols\n",
"]"
]
},
Expand Down Expand Up @@ -354,19 +346,19 @@
"source": [
"## Summary\n",
"\n",
"In this example, we deployed a recommender system pipeline as an ensemble. First, NVTabular created features and afterwards, Merlin Models TensorFlow predicted the processed data. The DLRM architecture was used as a model. This process ensures that the training and production environments use the same feature engineering. \n",
"In this example, we deployed a recommender system pipeline as an ensemble. First, NVTabular created features and afterwards, Merlin Models TensorFlow predicted the processed data. The DLRM architecture was used as a model. This process ensures that the training and production environments use the same feature engineering.\n",
"\n",
"## Next steps\n",
"\n",
"If you are interested in more details of the pipeline, we recommend to try out the [Merlin System example](https://github.com/NVIDIA-Merlin/systems/blob/main/examples/Serving-Ranking-Models-With-Merlin-Systems.ipynb). \n",
"If you are interested in more details of the pipeline, we recommend to try out the [Merlin System example](https://github.com/NVIDIA-Merlin/systems/blob/main/examples/Serving-Ranking-Models-With-Merlin-Systems.ipynb).\n",
"\n",
"In our Merlin repository, we provide [another end-to-end example](../Building-and-deploying-multi-stage-RecSys/) using a candidate retrieval and ranking model. In addition, we use approximate nearest neighbor and a feature store."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python 3.9.7 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -380,7 +372,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.9.7"
},
"vscode": {
"interpreter": {
"hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
}
}
},
"nbformat": 4,
Expand Down
39 changes: 23 additions & 16 deletions tests/unit/examples/test_scaling_criteo_merlin_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pytest
from testbook import testbook
from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib
from merlin.models.loader.tf_utils import configure_tensorflow
from merlin.systems.triton.utils import run_ensemble_on_tritonserver

pytest.importorskip("tensorflow")

Expand Down Expand Up @@ -80,20 +83,24 @@ def test_func():
)
NUM_OF_CELLS = len(tb3.cells)
tb3.execute_cell(list(range(0, NUM_OF_CELLS - 5)))
tb3.inject(
"""
import shutil

from merlin.systems.triton.utils import run_ensemble_on_tritonserver
outputs = ensemble.graph.output_schema.column_names
response = run_ensemble_on_tritonserver(
"/tmp/output/criteo/ensemble/", outputs, batch.fillna(0), "ensemble_model"
)
response = [x.tolist()[0] for x in response["label/binary_classification_task"]]
shutil.rmtree("/tmp/input/criteo", ignore_errors=True)
shutil.rmtree("/tmp/output/criteo", ignore_errors=True)
"""
input_cols = tb3.ref("input_cols")
outputs = tb3.ref("output_cols")
# read in data for request
df_lib = get_lib()
in_dtypes = {}
for col in input_cols:
if col.startswith("C"):
in_dtypes[col] = "int64"
if col.startswith("I"):
in_dtypes[col] = "float64"
batch = df_lib.read_parquet(
os.path.join("/tmp/output/criteo/", "valid", "part_0.parquet"),
num_rows=3,
columns=input_cols,
)
batch = batch.astype(in_dtypes)
configure_tensorflow()
response = run_ensemble_on_tritonserver(
"/tmp/output/criteo/ensemble/", outputs, batch, "ensemble_model"
)
tb3.execute_cell(NUM_OF_CELLS - 4)
response = tb3.ref("response")
assert len(response) == 3
assert len(response["label/binary_classification_task"]) == 3