diff --git a/merlin/systems/dag/__init__.py b/merlin/systems/dag/__init__.py index 23d317f0b..0d5067642 100644 --- a/merlin/systems/dag/__init__.py +++ b/merlin/systems/dag/__init__.py @@ -18,4 +18,3 @@ # flake8: noqa from .ensemble import Ensemble from .node import Node -from .op_runner import OperatorRunner diff --git a/merlin/systems/dag/node.py b/merlin/systems/dag/node.py index 8fd2fbcdd..14c7dc488 100644 --- a/merlin/systems/dag/node.py +++ b/merlin/systems/dag/node.py @@ -23,31 +23,11 @@ class InferenceNode(Node): """Specialized node class used in Triton Ensemble DAGs""" - def exportable(self, backend: str = None): - """ - Determine whether the current node's operator is exportable for a given back-end - - Parameters - ---------- - backend : str, optional - The Merlin Systems (not Triton) back-end to use, - either "ensemble" or "executor", by default None - - Returns - ------- - bool - True if the node's operator is exportable for the supplied back-end - """ - backends = getattr(self.op, "exportable_backends", []) - - return hasattr(self.op, "export") and backend in backends - def export( self, output_path: Union[str, os.PathLike], node_id: int = None, version: int = 1, - backend="ensemble", ): """ Export a Triton config directory for this node. @@ -72,7 +52,6 @@ def export( self.output_schema, node_id=node_id, version=version, - backend=backend, ) @property diff --git a/merlin/systems/dag/op_runner.py b/merlin/systems/dag/op_runner.py deleted file mode 100644 index 4cc1471da..000000000 --- a/merlin/systems/dag/op_runner.py +++ /dev/null @@ -1,67 +0,0 @@ -# -# Copyright (c) 2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import importlib -import json - -from merlin.dag import ColumnSelector - - -class OperatorRunner: - """Runner for collection of operators in one triton model.""" - - def __init__( - self, - config, - *, - model_repository="./", - model_version=1, - model_name=None, - ): - """Instantiate an OperatorRunner""" - operator_names = self.fetch_json_param(config, "operator_names") - op_configs = [self.fetch_json_param(config, op_name) for op_name in operator_names] - - self.operators = [] - for op_config in op_configs: - module_name = op_config["module_name"] - class_name = op_config["class_name"] - - op_module = importlib.import_module(module_name) - op_class = getattr(op_module, class_name) - - operator = op_class.from_config( - op_config, - model_repository=model_repository, - model_name=model_name, - model_version=model_version, - ) - self.operators.append(operator) - - def execute(self, tensors): - """Run transform on multiple operators""" - selector = ColumnSelector("*") - for operator in self.operators: - input_type = type(tensors) - tensors = operator.transform(selector, tensors) - if isinstance(tensors, dict): - tensors = input_type(tensors) - - return tensors - - def fetch_json_param(self, model_config, param_name): - """Extract JSON value from model config parameters""" - string_value = model_config["parameters"][param_name]["string_value"] - return json.loads(string_value) diff --git a/merlin/systems/dag/ops/faiss.py b/merlin/systems/dag/ops/faiss.py index 0998f0134..271284ca9 100644 --- a/merlin/systems/dag/ops/faiss.py +++ b/merlin/systems/dag/ops/faiss.py @@ -13,13 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import json import os -import pathlib from pathlib import Path from shutil import copy2 -from typing import Dict, List, Tuple import faiss import numpy as np @@ -61,7 +57,7 @@ def __init__(self, index_path, topk=10): self.topk = topk self._index = None - def load_artifacts(self, artifact_path): + def load_artifacts(self, artifact_path: str) -> None: filename = Path(self.index_path).name path_artifact = Path(artifact_path) if path_artifact.is_file(): @@ -74,93 +70,20 @@ def load_artifacts(self, artifact_path): index = faiss.index_cpu_to_gpu(res, 0, index) self._index = index - @classmethod - def from_config(cls, config: dict, **kwargs) -> "QueryFaiss": - """ - Instantiate a class object given a config. - - Parameters - ---------- - config : dict - - - Returns - ------- - QueryFaiss - class object instantiated with config values - """ - parameters = json.loads(config.get("params", "")) - index_path = parameters["index_path"] - topk = parameters["topk"] - - operator = QueryFaiss(index_path, topk=topk) - operator.load_artifacts(index_path) - - return operator - - @property - def exportable_backends(self): - return ["ensemble", "executor"] - - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ) -> Tuple[Dict, List]: - """ - Export the class object as a config and all related files to the user defined path. + def save_artifacts(self, artifact_path: str) -> None: + index_filename = os.path.basename(os.path.realpath(self.index_path)) + new_index_path = Path(artifact_path) / index_filename + copy2(self.index_path, new_index_path) - Parameters - ---------- - path : str - Artifact export path - input_schema : Schema - A schema with information about the inputs to this operator - output_schema : Schema - A schema with information about the outputs of this operator - params : dict, optional - Parameters dictionary of key, value pairs stored in exported config, by default None - node_id : int, optional - The placement of the node in the graph (starts at 1), by default None - version : int, optional - The version of the model, by default 1 + def __getstate__(self) -> dict: + """Return state of instance when pickled. Returns ------- - Ensemble_config: dict - Node_configs: list + dict + Returns object state excluding index attribute. """ - params = params or {} - - self_params = { - # TODO: Write the (relative) path from inside the export directory - "index_path": self.index_path, - "topk": self.topk, - } - self_params.update(params) - index_filename = os.path.basename(os.path.realpath(self.index_path)) - - if backend == "ensemble": - full_path = ( - pathlib.Path(path) / f"{node_id}_{QueryFaiss.__name__.lower()}" / str(version) - ) - else: - full_path = pathlib.Path(path) / "executor_model" / str(version) / "ensemble" - - new_index_path = full_path / index_filename - full_path.mkdir(parents=True, exist_ok=True) - copy2(self.index_path, new_index_path) - self.index_path = str(new_index_path) - - if backend == "ensemble": - return super().export(path, input_schema, output_schema, self_params, node_id, version) - else: - return ({}, []) + return {k: v for k, v in self.__dict__.items() if k != "_index"} def transform( self, col_selector: ColumnSelector, transformable: Transformable diff --git a/merlin/systems/dag/ops/fil.py b/merlin/systems/dag/ops/fil.py index 23095eb8d..8303397ff 100644 --- a/merlin/systems/dag/ops/fil.py +++ b/merlin/systems/dag/ops/fil.py @@ -88,39 +88,6 @@ def compute_input_schema( """Return the input schema representing the input columns this operator expects to use.""" return self.input_schema - @property - def exportable_backends(self): - return ["ensemble", "executor"] - - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ): - """Export the class and related files to the path specified.""" - fil_model_config = self.fil_op.export( - path, - input_schema, - output_schema, - params=params, - node_id=node_id, - version=version, - ) - - return fil_model_config - - @property - def fil_model_name(self): - return self._fil_model_name - - def set_fil_model_name(self, fil_model_name): - self._fil_model_name = fil_model_name - def transform( self, col_selector: ColumnSelector, transformable: Transformable ) -> Transformable: @@ -147,10 +114,6 @@ def transform( return type(transformable)(outputs) - def load_artifacts(self, artifact_path): - # need variable that tells me what type of model this is. - self.fil_op.load_model(artifact_path) - class FIL(InferenceOperator): """Operator for Forest Inference Library (FIL) models. @@ -265,25 +228,6 @@ def compute_output_schema( """Returns output schema for FIL op""" return Schema([ColumnSchema("output__0", dtype=np.float32)]) - def export( - self, - path, - input_schema, - output_schema, - params: dict = None, - node_id=None, - version=1, - ): - """Export the model to the supplied path. Returns the config""" - node_name = f"{node_id}_{self.export_name}" if node_id is not None else self.export_name - node_export_path = pathlib.Path(path) / node_name - version_path = node_export_path / str(version) - version_path.mkdir(parents=True, exist_ok=True) - - self.fil_model_class.save(version_path) - - return version_path - def load_model(self, version_path): version_path = pathlib.Path(version_path) self.fil_model_class.load(version_path) @@ -305,6 +249,12 @@ def save(self, version_path): Save model to version_path """ + @abstractmethod + def load(self, version_path): + """ + Load model from path + """ + @property @abstractmethod def num_classes(self): diff --git a/merlin/systems/dag/ops/implicit.py b/merlin/systems/dag/ops/implicit.py index abf0b24c0..f89544bef 100644 --- a/merlin/systems/dag/ops/implicit.py +++ b/merlin/systems/dag/ops/implicit.py @@ -66,6 +66,10 @@ def load_artifacts(self, artifact_path: str): self.model = model_cls.load(str(model_file)) + def save_artifacts(self, artifact_path: str): + model_path = pathlib.Path(artifact_path) / "model.npz" + self.model.save(str(model_path)) + def compute_input_schema( self, root_schema: Schema, @@ -85,10 +89,6 @@ def compute_output_schema( """Return the output schema representing the columns this operator returns.""" return Schema([ColumnSchema("ids", dtype="int64"), ColumnSchema("scores", dtype="float64")]) - @property - def exportable_backends(self): - return ["ensemble", "executor"] - def transform( self, col_selector: ColumnSelector, transformable: Transformable ) -> Transformable: diff --git a/merlin/systems/dag/ops/operator.py b/merlin/systems/dag/ops/operator.py index b9649634a..746011bad 100644 --- a/merlin/systems/dag/ops/operator.py +++ b/merlin/systems/dag/ops/operator.py @@ -54,19 +54,23 @@ def transform( "from the base operator." ) - def load_artifacts(self, artifact_path): - """ - Hook method that provides a way to load saved artifacts for the operator + def load_artifacts(self, artifact_path: str) -> None: + """Load artifacts from disk required for operator function. Parameters ---------- artifact_path : str - Path where artifacts for the operator are stored. + The path where artifacts are loaded from """ - @property - def exportable_backends(self): - return ["ensemble"] + def save_artifacts(self, artifact_path: str) -> None: + """Save artifacts required to be reload operator state from disk + + Parameters + ---------- + artifact_path : str + The path where artifacts are to be saved + """ @abstractmethod def export( @@ -77,7 +81,6 @@ def export( params: dict = None, node_id: int = None, version: int = 1, - backend: str = "ensemble", ): """ Export the class object as a config and all related files to the user defined path. @@ -99,15 +102,13 @@ def export( Returns ------- - Ensemble_config: dict - The config for the entire ensemble. - Node_configs: list - A list of individual configs for each step (operator) in graph. + model_config: dict + The config for the exported operator (Triton model). """ raise NotImplementedError( - "Exporting an operator to run in a particular context (i.e. Triton)" - " only makres sense when a runtime is specified. To select an " - f"operator for the appropriate runtime, replace {self.__class__.__name__}" + "Exporting an operator to run in a particular context (i.e. Triton) " + "only makes sense when a runtime is specified. To select an " + f"operator for the appropriate runtime, replace {self.__class__.__name__} " f"with a runtime-specific operator class, possibly {self.__class__.__name__}Triton" ) diff --git a/merlin/systems/dag/ops/session_filter.py b/merlin/systems/dag/ops/session_filter.py index bd07d3ac8..28162a00d 100644 --- a/merlin/systems/dag/ops/session_filter.py +++ b/merlin/systems/dag/ops/session_filter.py @@ -14,8 +14,6 @@ # limitations under the License. # -import json - import numpy as np from merlin.core.protocols import Transformable @@ -50,25 +48,6 @@ def __init__(self, filter_out: str, input_col: str = None) -> "FilterCandidates" self._filter_out_col = filter_out super().__init__() - @classmethod - def from_config(cls, config, **kwargs) -> "FilterCandidates": - """ - Instantiate a class object given a config. - - Parameters - ---------- - config : dict - - - Returns - ------- - Class object instantiated with config values - """ - parameters = json.loads(config.get("params", "")) - filter_out_col = parameters["filter_out_col"] - input_col = parameters["input_col"] - return FilterCandidates(filter_out_col, input_col) - @property def dependencies(self): return self.filter_out @@ -182,44 +161,3 @@ def transform( filtered_results = candidate_ids.values[~np.isin(candidate_ids.values, filter_ids.values)] return type(transformable)({"filtered_ids": filtered_results}) - - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ): - """ - Export the class object as a config and all related files to the user defined path. - - Parameters - ---------- - path : str - Artifact export path - input_schema : Schema - A schema with information about the inputs to this operator - output_schema : Schema - A schema with information about the outputs of this operator - params : dict, optional - Parameters dictionary of key, value pairs stored in exported config, by default None - node_id : int, optional - The placement of the node in the graph (starts at 1), by default None - version : int, optional - The version of the model, by default 1 - - Returns - ------- - Ensemble_config: dict - Node_configs: list - """ - params = params or {} - self_params = { - "input_col": self._input_col, - "filter_out_col": self._filter_out_col, - } - self_params.update(params) - return super().export(path, input_schema, output_schema, self_params, node_id, version) diff --git a/merlin/systems/dag/ops/softmax_sampling.py b/merlin/systems/dag/ops/softmax_sampling.py index 8c8e3307d..2a49b1b0c 100644 --- a/merlin/systems/dag/ops/softmax_sampling.py +++ b/merlin/systems/dag/ops/softmax_sampling.py @@ -1,5 +1,3 @@ -import json - import numpy as np from merlin.core.protocols import Transformable @@ -37,44 +35,10 @@ def __init__(self, relevance_col, temperature=20.0, topk=10, _input_col=None): self._relevance_col_name = relevance_col super().__init__() - @classmethod - def from_config(cls, config, **kwargs) -> "SoftmaxSampling": - """Load operator and properties from Triton config""" - parameters = json.loads(config.get("params", "")) - relevance_col = parameters["relevance_col"] - input_col = parameters["input_col"] - temperature = parameters["temperature"] - topk = parameters["topk"] - - return SoftmaxSampling( - relevance_col, temperature=temperature, topk=topk, _input_col=input_col - ) - @property def dependencies(self): return self.relevance_col - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ): - """Write out a Triton model config directory""" - params = params or {} - self_params = { - "input_col": self._input_col_name, - "relevance_col": self._relevance_col_name, - "temperature": self.temperature, - "topk": self.topk, - } - self_params.update(params) - return super().export(path, input_schema, output_schema, self_params, node_id, version) - def compute_input_schema( self, root_schema: Schema, diff --git a/merlin/systems/dag/ops/tensorflow.py b/merlin/systems/dag/ops/tensorflow.py index b0cf059a8..30a601504 100644 --- a/merlin/systems/dag/ops/tensorflow.py +++ b/merlin/systems/dag/ops/tensorflow.py @@ -33,7 +33,7 @@ class PredictTensorflow(InferenceOperator): """TensorFlow Model Prediction Operator.""" - def __init__(self, model_or_path, custom_objects: dict = None, backend="tensorflow"): + def __init__(self, model_or_path, custom_objects: dict = None): """ Instantiate a PredictTensorflow inference operator. @@ -108,18 +108,6 @@ def transform( # TODO: map output schema names to outputs produced by prediction return type(transformable)(dict_outputs) - @property - def export_name(self): - """ - Provides a clear common english identifier for this operator. - - Returns - ------- - String - Name of the current class as spelled in module. - """ - return self.__class__.__name__.lower() - @classmethod def from_path(cls, path, **kwargs): return cls.__init__(path, **kwargs) diff --git a/merlin/systems/dag/ops/unroll_features.py b/merlin/systems/dag/ops/unroll_features.py index 62bb341ea..14416c706 100644 --- a/merlin/systems/dag/ops/unroll_features.py +++ b/merlin/systems/dag/ops/unroll_features.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import numpy as np @@ -36,35 +35,6 @@ def __init__(self, item_id_col, unroll_cols, unrolled_prefix=""): self.unrolled_prefix = unrolled_prefix super().__init__() - @classmethod - def from_config(cls, config, **kwargs) -> "UnrollFeatures": - """Load operator and properties from Triton config""" - parameters = json.loads(config.get("params", "")) - candidate_col = parameters["item_id_col"] - unroll_cols = parameters["unroll_cols"] - unrolled_prefix = parameters["unrolled_prefix"] - return UnrollFeatures(candidate_col, unroll_cols, unrolled_prefix) - - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ): - """Write out a Triton model config directory""" - params = params or {} - self_params = { - "item_id_col": self.item_id_col, - "unroll_cols": self._unroll_col_names, - "unrolled_prefix": self.unrolled_prefix, - } - self_params.update(params) - return super().export(path, input_schema, output_schema, self_params, node_id, version) - @property def dependencies(self): return self.unroll_cols diff --git a/merlin/systems/dag/ops/workflow.py b/merlin/systems/dag/ops/workflow.py index 8ec424b8a..358b13b68 100644 --- a/merlin/systems/dag/ops/workflow.py +++ b/merlin/systems/dag/ops/workflow.py @@ -37,7 +37,6 @@ def __init__( model_framework: str = None, cats: List[str] = None, conts: List[str] = None, - backend: str = "workflow", ): """ Creates a Transform Workflow operator for a target workflow. @@ -117,16 +116,3 @@ def transform( output = TensorTable.from_df(output) return output - - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ): - """Create a directory inside supplied path based on our export name""" - raise NotImplementedError diff --git a/merlin/systems/dag/runtimes/triton/ops/fil.py b/merlin/systems/dag/runtimes/triton/ops/fil.py index e320b06c7..0f5c83b36 100644 --- a/merlin/systems/dag/runtimes/triton/ops/fil.py +++ b/merlin/systems/dag/runtimes/triton/ops/fil.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import pathlib import numpy as np @@ -22,7 +21,7 @@ from merlin.core.protocols import Transformable from merlin.dag import ColumnSelector # noqa -from merlin.schema import ColumnSchema, Schema # noqa +from merlin.schema import Schema # noqa from merlin.systems.dag.runtimes.triton.ops.operator import TritonOperator # noqa from merlin.systems.triton.conversions import ( tensor_table_to_triton_request, @@ -95,7 +94,6 @@ def export( params: dict = None, node_id: int = None, version: int = 1, - backend: str = "ensemble", ): """Export the class and related files to the path specified.""" fil_model_config = self.fil_op.export( @@ -116,31 +114,8 @@ def export( params=params, node_id=node_id, version=version, - backend=self.backend, ) - @classmethod - def from_config(cls, config: dict, **kwargs) -> "PredictForestTriton": - """Instantiate the class from a dictionary representation. - - Expected structure: - { - "input_dict": str # JSON dict with input names and schemas - "params": str # JSON dict with params saved at export - } - - """ - column_schemas = [ - ColumnSchema(name, **schema_properties) - for name, schema_properties in json.loads(config["input_dict"]).items() - ] - input_schema = Schema(column_schemas) - cls_instance = cls(None, input_schema) - params = json.loads(config["params"]) - model_name = params["fil_model_name"] - cls_instance.set_fil_model_name(model_name) - return cls_instance - @property def fil_model_name(self): return self._fil_model_name diff --git a/merlin/systems/dag/runtimes/triton/ops/implicit.py b/merlin/systems/dag/runtimes/triton/ops/implicit.py deleted file mode 100644 index 02669d15f..000000000 --- a/merlin/systems/dag/runtimes/triton/ops/implicit.py +++ /dev/null @@ -1,185 +0,0 @@ -# -# Copyright (c) 2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import importlib -import json -import pathlib - -import numpy as np - -from merlin.core.protocols import Transformable -from merlin.dag import ColumnSelector -from merlin.schema import ColumnSchema, Schema -from merlin.systems.dag.runtimes.triton.ops.operator import TritonOperator - -try: - import implicit - from packaging.version import Version - - if Version(implicit.__version__) < Version("0.6.0"): - raise RuntimeError( - "Implicit version 0.6.0 or higher required. (for model save/load methods)." - ) -except ImportError: - implicit = None - - -class PredictImplicitTriton(TritonOperator): - """Operator for running inference on Implicit models..""" - - def __init__(self, op): - """Instantiate an Implicit prediction operator. - - Parameters - ---------- - model : An Implicit Model instance - num_to_recommend : int - the number of items to return - """ - super().__init__(op) - if op: - self.model = op.model - self.model_module_name: str = self.model.__module__ - self.model_class_name: str = self.model.__class__.__name__ - self.num_to_recommend = op.num_to_recommend - - def __getstate__(self): - return {k: v for k, v in self.__dict__.items() if k != "model"} - - def load_artifacts(self, artifact_path: str): - model_file = pathlib.Path(artifact_path) / "model.npz" - - model_module_name = self.model_module_name - model_class_name = self.model_class_name - model_module = importlib.import_module(model_module_name) - model_cls = getattr(model_module, model_class_name) - - self.model = model_cls.load(str(model_file)) - - def compute_input_schema( - self, - root_schema: Schema, - parents_schema: Schema, - deps_schema: Schema, - selector: ColumnSelector, - ) -> Schema: - """Return the input schema representing the input columns this operator expects to use.""" - return Schema([ColumnSchema("user_id", dtype="int64")]) - - def compute_output_schema( - self, - input_schema: Schema, - col_selector: ColumnSelector, - prev_output_schema: Schema = None, - ) -> Schema: - """Return the output schema representing the columns this operator returns.""" - return Schema([ColumnSchema("ids", dtype="int64"), ColumnSchema("scores", dtype="float64")]) - - @property - def exportable_backends(self): - return ["ensemble", "executor"] - - def export( - self, - path: str, - input_schema: Schema, - output_schema: Schema, - params: dict = None, - node_id: int = None, - version: int = 1, - backend: str = "ensemble", - ): - """Export the class and related files to the path specified.""" - node_name = f"{node_id}_{self.export_name}" if node_id is not None else self.export_name - - if backend == "ensemble": - artifact_path = pathlib.Path(path) / node_name / str(version) - else: - artifact_path = pathlib.Path(path) / "executor_model" / str(version) / "ensemble" - - artifact_path.mkdir(parents=True, exist_ok=True) - model_path = artifact_path / "model.npz" - self.model.save(str(model_path)) - - if backend == "ensemble": - params = params or {} - params["model_module_name"] = self.model.__module__ - params["model_class_name"] = self.model.__class__.__name__ - params["num_to_recommend"] = self.num_to_recommend - return super().export( - path, - input_schema, - output_schema, - params=params, - node_id=node_id, - version=version, - ) - else: - return ({}, []) - - @classmethod - def from_config(cls, config: dict, **kwargs) -> "PredictImplicitTriton": - """Instantiate the class from a dictionary representation. - - Expected config structure: - { - "input_dict": str # JSON dict with input names and schemas - "params": str # JSON dict with params saved at export - } - - """ - params = json.loads(config["params"]) - - model_repository = kwargs["model_repository"] - model_name = kwargs["model_name"] - model_version = kwargs["model_version"] - - # load implicit model - cls_instance = cls(None) - model_module_name = params["model_module_name"] - model_class_name = params["model_class_name"] - model_module = importlib.import_module(model_module_name) - model_cls = getattr(model_module, model_class_name) - model_file = pathlib.Path(model_repository) / model_name / str(model_version) / "model.npz" - cls_instance.model_module_name = model_module_name - cls_instance.model_class_name = model_class_name - cls_instance.model = model_cls.load(str(model_file)) - - cls_instance.num_to_recommend = params["num_to_recommend"] - - return cls_instance - - def transform( - self, col_selector: ColumnSelector, transformable: Transformable - ) -> Transformable: - """Transform the dataframe by applying this operator to the set of input columns. - - Parameters - ----------- - df: TensorTable - A pandas or cudf dataframe that this operator will work on - - Returns - ------- - TensorTable - Returns a transformed dataframe for this operator""" - user_id = transformable["user_id"].values.ravel() - user_items = None - ids, scores = self.model.recommend( - user_id, user_items, N=self.num_to_recommend, filter_already_liked_items=False - ) - return type(transformable)( - {"ids": ids.astype(np.int64), "scores": scores.astype(np.float64)} - ) diff --git a/merlin/systems/dag/runtimes/triton/ops/operator.py b/merlin/systems/dag/runtimes/triton/ops/operator.py index d22c901ce..64570465e 100644 --- a/merlin/systems/dag/runtimes/triton/ops/operator.py +++ b/merlin/systems/dag/runtimes/triton/ops/operator.py @@ -13,26 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import importlib.resources -import json -import os -import pathlib -from abc import abstractmethod -from shutil import copyfile -from typing import List +from abc import ABCMeta, abstractmethod import tritonclient.grpc.model_config_pb2 as model_config -from google.protobuf import text_format from merlin.core.protocols import Transformable from merlin.dag.selector import ColumnSelector from merlin.schema import Schema -from merlin.systems.dag.ops import compute_dims from merlin.systems.dag.ops.operator import InferenceOperator from merlin.systems.triton.export import _convert_dtype -class TritonOperator(InferenceOperator): +class TritonOperator(InferenceOperator, metaclass=ABCMeta): """Base class for Triton operators.""" def __init__(self, base_op: InferenceOperator): @@ -58,38 +50,6 @@ def export_name(self): """ return self.__class__.__name__.lower() - @property - def exportable_backends(self) -> List[str]: - """Returns list of supported backends. - - Returns - ------- - List[str] - List of supported backends - """ - return ["ensemble", "executor"] - - @classmethod - @abstractmethod - def from_config(cls, config: dict, **kwargs): - """ - Instantiate a class object given a config. - - Parameters - ---------- - config : dict - **kwargs - contains the following: - * model_repository: Model repository path - * model_version: Model version - * model_name: Model name - - Returns - ------- - Class object instantiated with config values - """ - - @abstractmethod def transform( self, col_selector: ColumnSelector, transformable: Transformable ) -> Transformable: @@ -107,6 +67,7 @@ def transform( """ return transformable + @abstractmethod def export( self, path: str, @@ -115,10 +76,9 @@ def export( params: dict = None, node_id: int = None, version: int = 1, - backend: str = "python", ): """ - Export the class object as a config and all related files to the user-defined path. + Export the Operator to as a Triton Model at the path corresponding to the model repository. Parameters ---------- @@ -137,69 +97,10 @@ def export( Returns ------- - Ensemble_config: dict - The config for the entire ensemble. - Node_configs: list - A list of individual configs for each step (operator) in graph. + model_config: ModelConfig + The config for the operator (model) if defined. """ - params = params or {} - - node_name = f"{node_id}_{self.export_name}" if node_id is not None else self.export_name - - node_export_path = pathlib.Path(path) / node_name - node_export_path.mkdir(parents=True, exist_ok=True) - - config = model_config.ModelConfig(name=node_name, backend=backend, platform="op_runner") - - config.parameters["operator_names"].string_value = json.dumps([node_name]) - - config.parameters[node_name].string_value = json.dumps( - { - "module_name": self.__class__.__module__, - "class_name": self.__class__.__name__, - "input_dict": json.dumps(_schema_to_dict(input_schema)), - "output_dict": json.dumps(_schema_to_dict(output_schema)), - "params": json.dumps(params), - } - ) - - for col_schema in input_schema.column_schemas.values(): - col_dims = compute_dims(col_schema) - add_model_param(config.input, model_config.ModelInput, col_schema, col_dims) - - for col_schema in output_schema.column_schemas.values(): - col_dims = compute_dims(col_schema) - add_model_param(config.output, model_config.ModelOutput, col_schema, col_dims) - - with open(os.path.join(node_export_path, "config.pbtxt"), "w", encoding="utf-8") as o: - text_format.PrintMessage(config, o) - - os.makedirs(node_export_path, exist_ok=True) - os.makedirs(os.path.join(node_export_path, str(version)), exist_ok=True) - with importlib.resources.path( - "merlin.systems.triton.models", "oprunner_model.py" - ) as oprunner_model: - copyfile( - oprunner_model, - os.path.join(node_export_path, str(version), "model.py"), - ) - - return config - - -def _schema_to_dict(schema: Schema) -> dict: - # TODO: Write the conversion - schema_dict = {} - for col_name, col_schema in schema.column_schemas.items(): - schema_dict[col_name] = { - "dtype": col_schema.dtype.name, - "is_list": col_schema.is_list, - "is_ragged": col_schema.is_ragged, - } - - return schema_dict - def add_model_param(params, paramclass, col_schema, dims=None): if col_schema.is_list and col_schema.is_ragged: diff --git a/merlin/systems/dag/runtimes/triton/ops/pytorch.py b/merlin/systems/dag/runtimes/triton/ops/pytorch.py index 555e9bfbe..7e8ca0f7e 100644 --- a/merlin/systems/dag/runtimes/triton/ops/pytorch.py +++ b/merlin/systems/dag/runtimes/triton/ops/pytorch.py @@ -115,7 +115,6 @@ def export( params: dict = None, node_id: int = None, version: int = 1, - backend: str = "ensemble", ): """Create a directory inside supplied path based on our export name""" export_name = self.__class__.__name__.lower() diff --git a/merlin/systems/dag/runtimes/triton/ops/tensorflow.py b/merlin/systems/dag/runtimes/triton/ops/tensorflow.py index ecde5abdd..8bc88b4ab 100644 --- a/merlin/systems/dag/runtimes/triton/ops/tensorflow.py +++ b/merlin/systems/dag/runtimes/triton/ops/tensorflow.py @@ -89,7 +89,6 @@ def export( params: dict = None, node_id: int = None, version: int = 1, - backend: str = "ensemble", ): """Create a directory inside supplied path based on our export name""" # Export Triton TF back-end directory and config etc diff --git a/merlin/systems/dag/runtimes/triton/ops/workflow.py b/merlin/systems/dag/runtimes/triton/ops/workflow.py index 3bd31dd5b..5cfebbba7 100644 --- a/merlin/systems/dag/runtimes/triton/ops/workflow.py +++ b/merlin/systems/dag/runtimes/triton/ops/workflow.py @@ -158,7 +158,6 @@ def export( params: dict = None, node_id: int = None, version: int = 1, - backend: str = "ensemble", ): """Create a directory inside supplied path based on our export name""" modified_workflow = self.op.workflow.remove_inputs(self.op.label_columns) diff --git a/merlin/systems/dag/runtimes/triton/runtime.py b/merlin/systems/dag/runtimes/triton/runtime.py index ff30e91fd..08df25f45 100644 --- a/merlin/systems/dag/runtimes/triton/runtime.py +++ b/merlin/systems/dag/runtimes/triton/runtime.py @@ -33,7 +33,7 @@ ) from merlin.systems.dag.ops.workflow import TransformWorkflow from merlin.systems.dag.runtimes import Runtime -from merlin.systems.dag.runtimes.triton.ops.operator import add_model_param +from merlin.systems.dag.runtimes.triton.ops.operator import TritonOperator, add_model_param from merlin.systems.dag.runtimes.triton.ops.workflow import TransformWorkflowTriton tensorflow = None @@ -52,12 +52,6 @@ except ImportError: ... -implicit = None -try: - import implicit -except ImportError: - ... - TRITON_OP_TABLE = {} TRITON_OP_TABLE[TransformWorkflow] = TransformWorkflowTriton @@ -80,12 +74,6 @@ TRITON_OP_TABLE[PredictPyTorch] = PredictPyTorchTriton -if implicit: - from merlin.systems.dag.ops.implicit import PredictImplicit - from merlin.systems.dag.runtimes.triton.ops.implicit import PredictImplicitTriton - - TRITON_OP_TABLE[PredictImplicit] = PredictImplicitTriton - class TritonExecutorRuntime(Runtime): """Runtime for Triton. @@ -123,7 +111,7 @@ def export( Tuple[model_config.ModelConfig, List[model_config.ModelConfig]] Tuple of ensemble config and list of non-python backend model configs """ - name = name or "executor_model" + triton_model_name = name or "executor_model" nodes = list(postorder_iter_nodes(ensemble.graph.output_node)) @@ -131,20 +119,25 @@ def export( if type(node.op) in self.op_table: node.op = self.op_table[type(node.op)](node.op) - node_id_table, _ = _create_node_table(nodes, "executor") + node_id_table, _ = _create_node_table(nodes) + + # Path were extra files can be optionally saved by operators + # that don't save all state in operator when pickled + artifact_path = pathlib.Path(path) / triton_model_name / str(version) / "ensemble" + artifact_path.mkdir(parents=True, exist_ok=True) node_configs = [] for node in nodes: - if node.exportable("executor"): - node_id = node_id_table.get(node, None) - - node_config = node.export( - path, node_id=node_id, version=version, backend="executor" - ) + node_id = node_id_table.get(node, None) + if node_id is not None: + node_config = node.export(path, node_id=node_id, version=version) if node_config is not None: node_configs.append(node_config) - executor_config = self._executor_model_export(path, name, ensemble) + if hasattr(node.op, "save_artifacts"): + node.op.save_artifacts(str(artifact_path)) + + executor_config = self._executor_model_export(path, triton_model_name, ensemble) return (executor_config, node_configs) @@ -220,11 +213,11 @@ def _executor_model_export( return config -def _create_node_table(nodes, backend): +def _create_node_table(nodes): exportable_node_idx = 0 node_id_lookup = {} for node in nodes: - if node.exportable(backend): + if isinstance(node.op, TritonOperator): node_id_lookup[node] = exportable_node_idx exportable_node_idx += 1 diff --git a/merlin/systems/triton/models/oprunner_model.py b/merlin/systems/triton/models/oprunner_model.py deleted file mode 100644 index 56fb20349..000000000 --- a/merlin/systems/triton/models/oprunner_model.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import json -import pathlib - -from merlin.systems.dag.op_runner import OperatorRunner -from merlin.systems.triton.conversions import ( - tensor_table_to_triton_response, - triton_request_to_tensor_table, -) -from merlin.systems.triton.utils import triton_error_handling, triton_multi_request - - -class TritonPythonModel: - """Model for Triton Python Backend. - - Every Python model must have "TritonPythonModel" as the class name - """ - - def initialize(self, args): - """Called only once when the model is being loaded. Allowing - the model to initialize any state associated with this model. - - Parameters - ---------- - args : dict - Both keys and values are strings. The dictionary keys and values are: - * model_config: A JSON string containing the model configuration - * model_instance_kind: A string containing model instance kind - * model_instance_device_id: A string containing model instance device ID - * model_repository: Model repository path - * model_version: Model version - * model_name: Model name - """ - self.model_config = json.loads(args["model_config"]) - - self.runner = OperatorRunner( - self.model_config, - model_repository=_parse_model_repository(args["model_repository"]), - model_name=args["model_name"], - model_version=args["model_version"], - ) - - @triton_multi_request - @triton_error_handling - def execute(self, request): - """Receives a list of pb_utils.InferenceRequest as the only argument. This - function is called when an inference is requested for this model. Depending on the - batching configuration (e.g. Dynamic Batching) used, `requests` may contain - multiple requests. Every Python model, must create one pb_utils.InferenceResponse - for every pb_utils.InferenceRequest in `requests`. If there is an error, you can - set the error argument when creating a pb_utils.InferenceResponse. - - Parameters - ---------- - requests : list - A list of pb_utils.InferenceRequest - - Returns - ------- - list - A list of pb_utils.InferenceResponse. The length of this list must - be the same as `requests` - """ - params = self.model_config["parameters"] - op_names = json.loads(params["operator_names"]["string_value"]) - first_operator_name = op_names[0] - operator_params = json.loads(params[first_operator_name]["string_value"]) - input_column_names = list(json.loads(operator_params["input_dict"]).keys()) - - inputs = triton_request_to_tensor_table(request, input_column_names) - outputs = self.runner.execute(inputs) - return tensor_table_to_triton_response(outputs) - - -def _parse_model_repository(model_repository: str) -> str: - """ - Extract the model repository path from the model_repository value - passed to the TritonPythonModel initialize method. - """ - # Handle bug in Tritonserver 22.06 - # model_repository argument became path to model.py - # instead of path to model directory within the model repository - if model_repository.endswith(".py"): - return str(pathlib.Path(model_repository).parent.parent.parent) - else: - return str(pathlib.Path(model_repository).parent) diff --git a/tests/unit/systems/dag/test_op_runner.py b/tests/unit/systems/dag/test_op_runner.py deleted file mode 100644 index 36c29f9cd..000000000 --- a/tests/unit/systems/dag/test_op_runner.py +++ /dev/null @@ -1,211 +0,0 @@ -# -# Copyright (c) 2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import os -from unittest import mock - -import numpy as np -import pytest -from google.protobuf.json_format import MessageToDict - -import nvtabular as nvt -import nvtabular.ops as wf_ops -from merlin.dag import Graph -from merlin.schema import Tags -from merlin.table import TensorColumn, TensorTable -from tests.unit.systems.utils.ops import PlusTwoOp - -op_runner = pytest.importorskip("merlin.systems.dag.op_runner") -inf_op = pytest.importorskip("merlin.systems.dag.ops.operator") - - -@pytest.mark.parametrize("engine", ["parquet"]) -def test_op_runner_loads_config(tmpdir, dataset, engine): - input_columns = ["x", "y", "id"] - - # NVT - workflow_ops = input_columns >> wf_ops.Rename(postfix="_nvt") - workflow = nvt.Workflow(workflow_ops) - workflow.fit(dataset) - workflow.save(str(tmpdir)) - - repository = "repository_path/" - version = 1 - config = { - "parameters": { - "operator_names": {"string_value": json.dumps(["PlusTwoOp_1"])}, - "PlusTwoOp_1": { - "string_value": json.dumps( - { - "module_name": PlusTwoOp.__module__, - "class_name": "PlusTwoOp", - } - ) - }, - } - } - - runner = op_runner.OperatorRunner(config, model_repository=repository, model_version=version) - - loaded_op = runner.operators[0] - assert isinstance(loaded_op, PlusTwoOp) - - -@pytest.mark.parametrize("engine", ["parquet"]) -def test_op_runner_loads_multiple_ops_same(tmpdir, dataset, engine): - # NVT - schema = dataset.schema - for name in schema.column_names: - dataset.schema.column_schemas[name] = dataset.schema.column_schemas[name].with_tags( - [Tags.USER] - ) - - repository = "repository_path/" - version = 1 - config = { - "parameters": { - "operator_names": {"string_value": json.dumps(["PlusTwoOp_1", "PlusTwoOp_2"])}, - "PlusTwoOp_1": { - "string_value": json.dumps( - { - "module_name": PlusTwoOp.__module__, - "class_name": "PlusTwoOp", - } - ) - }, - "PlusTwoOp_2": { - "string_value": json.dumps( - { - "module_name": PlusTwoOp.__module__, - "class_name": "PlusTwoOp", - } - ) - }, - } - } - - runner = op_runner.OperatorRunner( - config, - model_repository=repository, - model_version=version, - ) - - assert len(runner.operators) == 2 - - for idx, loaded_op in enumerate(runner.operators): - assert isinstance(loaded_op, PlusTwoOp) - - -@pytest.mark.parametrize("engine", ["parquet"]) -def test_op_runner_loads_multiple_ops_same_execute(tmpdir, dataset, engine): - # NVT - schema = dataset.schema - for name in schema.column_names: - dataset.schema.column_schemas[name] = dataset.schema.column_schemas[name].with_tags( - [Tags.USER] - ) - - repository = "repository_path/" - version = 1 - config = { - "parameters": { - "operator_names": {"string_value": json.dumps(["PlusTwoOp_1", "PlusTwoOp_2"])}, - "PlusTwoOp_1": { - "string_value": json.dumps( - { - "module_name": PlusTwoOp.__module__, - "class_name": "PlusTwoOp", - } - ) - }, - "PlusTwoOp_2": { - "string_value": json.dumps( - { - "module_name": PlusTwoOp.__module__, - "class_name": "PlusTwoOp", - } - ) - }, - } - } - - runner = op_runner.OperatorRunner( - config, - model_repository=repository, - model_version=version, - ) - - inputs = {} - for col_name in schema.column_names: - inputs[col_name] = TensorColumn(np.random.randint(10, size=(10,))) - - table = TensorTable(inputs) - outputs = runner.execute(table) - - assert outputs["x_plus_2_plus_2"] == TensorColumn(inputs["x"].values + 4) - - -@pytest.mark.parametrize("engine", ["parquet"]) -@mock.patch.object(PlusTwoOp, "from_config", side_effect=PlusTwoOp.from_config) -def test_op_runner_single_node_export(mock_from_config, tmpdir, dataset, engine): - # assert against produced config - schema = dataset.schema - for name in schema.column_names: - dataset.schema.column_schemas[name] = dataset.schema.column_schemas[name].with_tags( - [Tags.USER] - ) - - inputs = ["x", "y"] - - node = inputs >> PlusTwoOp() - - graph = Graph(node) - graph.construct_schema(dataset.schema) - - config = node.export(tmpdir) - - file_path = os.path.join(str(tmpdir), node.export_name, "config.pbtxt") - - assert os.path.exists(file_path) - with open(file_path, "r", encoding="utf-8") as f: - config_file = f.read() - assert config_file == str(config) - assert len(config.input) == len(inputs) - assert len(config.output) == len(inputs) - for idx, conf in enumerate(config.output): - assert conf.name == inputs[idx] + "_plus_2" - - runner = op_runner.OperatorRunner( - MessageToDict( - config, preserving_proto_field_name=True, including_default_value_fields=True - ), - model_repository=str(tmpdir), - model_name=config.name, - model_version="1", - ) - inputs = TensorTable({"x": np.array([1], dtype=np.int32), "y": np.array([5], dtype=np.int32)}) - outputs = runner.execute(inputs) - - assert outputs["x_plus_2"] == TensorColumn(np.array([3], dtype=np.int32)) - assert outputs["y_plus_2"] == TensorColumn(np.array([7], dtype=np.int32)) - - assert mock_from_config.call_count == 1 - assert mock_from_config.call_args.kwargs == { - "model_repository": str(tmpdir), - "model_name": config.name, - "model_version": "1", - } diff --git a/tests/unit/systems/ops/fil/test_forest.py b/tests/unit/systems/ops/fil/test_forest.py index f948b8d51..9cc5ea918 100644 --- a/tests/unit/systems/ops/fil/test_forest.py +++ b/tests/unit/systems/ops/fil/test_forest.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import numpy as np import pandas as pd @@ -35,33 +34,6 @@ from nvtabular import ops as wf_ops -def test_load_from_config(tmpdir): - rows = 200 - num_features = 16 - X, y = sklearn.datasets.make_regression( - n_samples=rows, - n_features=num_features, - n_informative=num_features // 3, - random_state=0, - ) - model = xgboost.XGBRegressor() - model.fit(X, y) - feature_names = [str(i) for i in range(num_features)] - input_schema = Schema([ColumnSchema(col, dtype=np.float32) for col in feature_names]) - output_schema = Schema([ColumnSchema("output__0", dtype=np.float32)]) - config = PredictForestTriton(PredictForest(model, input_schema)).export( - tmpdir, input_schema, output_schema, node_id=2 - ) - node_config = json.loads(config.parameters[config.name].string_value) - - assert json.loads(node_config["output_dict"]) == { - "output__0": {"dtype": "float32", "is_list": False, "is_ragged": False} - } - - cls = PredictForestTriton.from_config(node_config) - assert "2_fil" in cls.fil_model_name - - def read_config(config_path): with open(config_path, "rb") as f: config = model_config.ModelConfig() @@ -87,11 +59,6 @@ def test_export(tmpdir): tmpdir, input_schema, output_schema, node_id=2 ) - config_path = tmpdir / "2_predictforesttriton" / "config.pbtxt" - parsed_config = read_config(config_path) - assert "2_predictforest" in parsed_config.name - assert parsed_config.backend == "python" - config_path = tmpdir / "2_filtriton" / "config.pbtxt" parsed_config = read_config(config_path) assert "2_fil" in parsed_config.name @@ -128,11 +95,6 @@ def test_export_merlin_models(tmpdir): tmpdir, input_schema, output_schema, node_id=2 ) - config_path = tmpdir / "2_predictforesttriton" / "config.pbtxt" - parsed_config = read_config(config_path) - assert "2_predictforest" in parsed_config.name - assert parsed_config.backend == "python" - config_path = tmpdir / "2_filtriton" / "config.pbtxt" parsed_config = read_config(config_path) assert "2_fil" in parsed_config.name @@ -169,11 +131,6 @@ def test_ensemble(tmpdir): triton_ens.export(tmpdir) - config_path = tmpdir / "1_predictforesttriton" / "config.pbtxt" - parsed_config = read_config(config_path) - assert "1_predictforest" in parsed_config.name - assert parsed_config.backend == "python" - config_path = tmpdir / "1_filtriton" / "config.pbtxt" parsed_config = read_config(config_path) assert "1_fil" in parsed_config.name diff --git a/tests/unit/systems/ops/implicit/test_op.py b/tests/unit/systems/ops/implicit/test_op.py index 1e7153af6..aa08a5f7a 100644 --- a/tests/unit/systems/ops/implicit/test_op.py +++ b/tests/unit/systems/ops/implicit/test_op.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json + import shutil import implicit @@ -26,7 +26,6 @@ from merlin.systems.dag.ensemble import Ensemble from merlin.systems.dag.ops.implicit import PredictImplicit from merlin.systems.dag.runtimes.triton import TritonExecutorRuntime -from merlin.systems.dag.runtimes.triton.ops.implicit import PredictImplicitTriton from merlin.systems.triton.utils import run_triton_server TRITON_SERVER_PATH = shutil.which("tritonserver") @@ -35,49 +34,6 @@ triton = pytest.importorskip("merlin.systems.triton") -@pytest.mark.parametrize( - "model_cls", - [ - implicit.bpr.BayesianPersonalizedRanking, - implicit.als.AlternatingLeastSquares, - implicit.lmf.LogisticMatrixFactorization, - ], -) -def test_reload_from_config(model_cls, tmpdir): - model = model_cls() - n = 10 - user_items = csr_matrix(np.random.choice([0, 1], size=n * n).reshape(n, n)) - model.fit(user_items) - - base = PredictImplicit(model) - op = PredictImplicitTriton(base) - - config = op.export(tmpdir, Schema(), Schema()) - - node_config = json.loads(config.parameters[config.name].string_value) - - cls = PredictImplicitTriton.from_config( - node_config, - model_repository=tmpdir, - model_name=config.name, - model_version=1, - ) - reloaded_model = cls.model - - num_to_recommend = np.random.randint(1, n) - user_items = None - ids, scores = model.recommend( - 1, user_items, N=num_to_recommend, filter_already_liked_items=False - ) - - reloaded_ids, reloaded_scores = reloaded_model.recommend( - 1, user_items, N=num_to_recommend, filter_already_liked_items=False - ) - - np.testing.assert_array_equal(ids, reloaded_ids) - np.testing.assert_array_equal(scores, reloaded_scores) - - @pytest.mark.skipif(not TRITON_SERVER_PATH, reason="triton server not found") @pytest.mark.parametrize("runtime", [None, TritonExecutorRuntime()]) @pytest.mark.parametrize(