From 3b85d25ea82d561a3edecb64c553bb7c3e72bbad Mon Sep 17 00:00:00 2001 From: root Date: Thu, 2 Jun 2022 22:27:57 +0000 Subject: [PATCH 1/5] fix output names --- merlin/systems/dag/ops/tensorflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/merlin/systems/dag/ops/tensorflow.py b/merlin/systems/dag/ops/tensorflow.py index 443b4d6fb..8027b2140 100644 --- a/merlin/systems/dag/ops/tensorflow.py +++ b/merlin/systems/dag/ops/tensorflow.py @@ -79,7 +79,7 @@ def __init__(self, model_or_path, custom_objects: dict = None): outputs = list(default_signature.structured_outputs.values()) input_col_names = [col.name.split("/")[0] for col in inputs] - output_col_names = [col.name.split("/")[0] for col in outputs] + output_col_names = [col.name for col in outputs] self.input_schema = Schema() for col, input_col in zip(input_col_names, inputs): @@ -173,7 +173,7 @@ def _export_model(self, model, name, output_path, version=1): # this assumes the list columns are 1D tensors both for cats and conts config.output.append( model_config.ModelOutput( - name=col.name.split("/")[0], + name=col.name, data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]], ) From 0f9fa18556f24ec9fc194d52cc3733a31f531826 Mon Sep 17 00:00:00 2001 From: Julio Perez Date: Wed, 8 Jun 2022 21:23:47 -0400 Subject: [PATCH 2/5] fixes to ensure use of correct keys from tf models --- merlin/systems/dag/ops/feast.py | 4 ++-- merlin/systems/dag/ops/operator.py | 4 ++-- merlin/systems/dag/ops/tensorflow.py | 35 ++++++++++++++-------------- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/merlin/systems/dag/ops/feast.py b/merlin/systems/dag/ops/feast.py index 3374812c5..dfb69d030 100644 --- a/merlin/systems/dag/ops/feast.py +++ b/merlin/systems/dag/ops/feast.py @@ -67,10 +67,10 @@ def from_feature_view( entity_dtype = np.int64 ent_is_list = False ent_is_ragged = False - for entity in store.list_entities(): + for idx, entity in enumerate(store.list_entities()): if entity.name == entity_id: entity_dtype, ent_is_list, ent_is_ragged = feast_2_numpy[ - store.list_entities()[0].value_type + store.list_entities()[idx].value_type ] features = [] diff --git a/merlin/systems/dag/ops/operator.py b/merlin/systems/dag/ops/operator.py index f784e9385..70ef01aa9 100644 --- a/merlin/systems/dag/ops/operator.py +++ b/merlin/systems/dag/ops/operator.py @@ -200,7 +200,7 @@ def export( node_export_path = pathlib.Path(path) / node_name node_export_path.mkdir(parents=True, exist_ok=True) - config = model_config.ModelConfig(name=node_name, backend="nvtabular", platform="op_runner") + config = model_config.ModelConfig(name=node_name, backend="python", platform="op_runner") config.parameters["operator_names"].string_value = json.dumps([node_name]) @@ -225,7 +225,7 @@ def export( # this assumes the list columns are 1D tensors both for cats and conts config.output.append( model_config.ModelOutput( - name=col_name.split("/")[0], + name=col_name, data_type=_convert_dtype(col_dict["dtype"]), dims=[-1, -1], ) diff --git a/merlin/systems/dag/ops/tensorflow.py b/merlin/systems/dag/ops/tensorflow.py index 8027b2140..52edf05dd 100644 --- a/merlin/systems/dag/ops/tensorflow.py +++ b/merlin/systems/dag/ops/tensorflow.py @@ -65,7 +65,6 @@ def __init__(self, model_or_path, custom_objects: dict = None): signatures = getattr(self.model, "signatures", {}) or {} default_signature = signatures.get("serving_default") - if not default_signature: # roundtrip saved self.model to disk to generate signature if it doesn't exist @@ -78,8 +77,8 @@ def __init__(self, model_or_path, custom_objects: dict = None): inputs = list(default_signature.structured_input_signature[1].values()) outputs = list(default_signature.structured_outputs.values()) - input_col_names = [col.name.split("/")[0] for col in inputs] - output_col_names = [col.name for col in outputs] + input_col_names = [*default_signature.structured_input_signature[1].keys()] + output_col_names = [*default_signature.structured_outputs.keys()] self.input_schema = Schema() for col, input_col in zip(input_col_names, inputs): @@ -145,35 +144,37 @@ def _export_model(self, model, name, output_path, version=1): name=name, backend="tensorflow", platform="tensorflow_savedmodel" ) - inputs, outputs = model.inputs, model.outputs + # inputs, outputs = model.inputs, [model.outputs] - if not inputs or not outputs: - signatures = getattr(model, "signatures", {}) or {} - default_signature = signatures.get("serving_default") - if not default_signature: - # roundtrip saved model to disk to generate signature if it doesn't exist + signatures = getattr(model, "signatures", {}) or {} + default_signature = signatures.get("serving_default") + if not default_signature: + # roundtrip saved model to disk to generate signature if it doesn't exist - reloaded = tf.keras.models.load_model(tf_model_path) - default_signature = reloaded.signatures["serving_default"] + reloaded = tf.keras.models.load_model(tf_model_path) + default_signature = reloaded.signatures["serving_default"] + + inputs = list(default_signature.structured_input_signature[1].values()) + outputs = list(default_signature.structured_outputs.values()) - inputs = list(default_signature.structured_input_signature[1].values()) - outputs = list(default_signature.structured_outputs.values()) + input_col_names = [*default_signature.structured_input_signature[1].keys()] + output_col_names = [*default_signature.structured_outputs.keys()] config.parameters["TF_GRAPH_TAG"].string_value = "serve" config.parameters["TF_SIGNATURE_DEF"].string_value = "serving_default" - for col in inputs: + for col, col_name in zip(inputs, input_col_names): config.input.append( model_config.ModelInput( - name=f"{col.name}", data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]] + name=col_name, data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]] ) ) - for col in outputs: + for col, col_name in zip(outputs, output_col_names): # this assumes the list columns are 1D tensors both for cats and conts config.output.append( model_config.ModelOutput( - name=col.name, + name=col_name, data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]], ) From 361c9d4923f43b485283585e65332c7b97cabb5c Mon Sep 17 00:00:00 2001 From: Julio Perez Date: Thu, 9 Jun 2022 00:22:47 -0400 Subject: [PATCH 3/5] clean up code --- merlin/systems/dag/ops/tensorflow.py | 37 +++++++++++----------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/merlin/systems/dag/ops/tensorflow.py b/merlin/systems/dag/ops/tensorflow.py index 52edf05dd..b936d2b65 100644 --- a/merlin/systems/dag/ops/tensorflow.py +++ b/merlin/systems/dag/ops/tensorflow.py @@ -74,22 +74,16 @@ def __init__(self, model_or_path, custom_objects: dict = None): reloaded = tf.keras.models.load_model(tf_model_path) default_signature = reloaded.signatures["serving_default"] - inputs = list(default_signature.structured_input_signature[1].values()) - outputs = list(default_signature.structured_outputs.values()) - - input_col_names = [*default_signature.structured_input_signature[1].keys()] - output_col_names = [*default_signature.structured_outputs.keys()] - self.input_schema = Schema() - for col, input_col in zip(input_col_names, inputs): - self.input_schema.column_schemas[col] = ColumnSchema( - col, dtype=input_col.dtype.as_numpy_dtype + for col_name, col in default_signature.structured_input_signature[1].items(): + self.input_schema.column_schemas[col_name] = ColumnSchema( + col_name, dtype=col.dtype.as_numpy_dtype ) self.output_schema = Schema() - for col, output_col in zip(output_col_names, outputs): - self.output_schema.column_schemas[col] = ColumnSchema( - col, dtype=output_col.dtype.as_numpy_dtype + for col_name, col in default_signature.structured_outputs.items(): + self.output_schema.column_schemas[col_name] = ColumnSchema( + col_name, dtype=col.dtype.as_numpy_dtype ) super().__init__() @@ -120,11 +114,17 @@ def compute_input_schema( deps_schema: Schema, selector: ColumnSelector, ) -> Schema: + """ + Use the input schema supplied during object creation. + """ return self.input_schema def compute_output_schema( self, input_schema: Schema, col_selector: ColumnSelector, prev_output_schema: Schema = None ) -> Schema: + """ + Use the output schema supplied during object creation. + """ return self.output_schema def _export_model(self, model, name, output_path, version=1): @@ -144,33 +144,24 @@ def _export_model(self, model, name, output_path, version=1): name=name, backend="tensorflow", platform="tensorflow_savedmodel" ) - # inputs, outputs = model.inputs, [model.outputs] - signatures = getattr(model, "signatures", {}) or {} default_signature = signatures.get("serving_default") if not default_signature: # roundtrip saved model to disk to generate signature if it doesn't exist - reloaded = tf.keras.models.load_model(tf_model_path) default_signature = reloaded.signatures["serving_default"] - inputs = list(default_signature.structured_input_signature[1].values()) - outputs = list(default_signature.structured_outputs.values()) - - input_col_names = [*default_signature.structured_input_signature[1].keys()] - output_col_names = [*default_signature.structured_outputs.keys()] - config.parameters["TF_GRAPH_TAG"].string_value = "serve" config.parameters["TF_SIGNATURE_DEF"].string_value = "serving_default" - for col, col_name in zip(inputs, input_col_names): + for col_name, col in default_signature.structured_input_signature[1].items(): config.input.append( model_config.ModelInput( name=col_name, data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]] ) ) - for col, col_name in zip(outputs, output_col_names): + for col_name, col in default_signature.structured_outputs.items(): # this assumes the list columns are 1D tensors both for cats and conts config.output.append( model_config.ModelOutput( From b15b745a7b2b7734f4a5a2128bab39edcd85c5f9 Mon Sep 17 00:00:00 2001 From: Julio Perez Date: Thu, 9 Jun 2022 11:38:10 -0400 Subject: [PATCH 4/5] scrubbing nvt backend until fixed --- merlin/systems/dag/ops/workflow.py | 2 +- merlin/systems/triton/export.py | 2 +- tests/unit/systems/test_inference_ops.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/merlin/systems/dag/ops/workflow.py b/merlin/systems/dag/ops/workflow.py index 9bc4ed8ac..b67cc0261 100644 --- a/merlin/systems/dag/ops/workflow.py +++ b/merlin/systems/dag/ops/workflow.py @@ -91,7 +91,7 @@ def export(self, path, input_schema, output_schema, node_id=None, version=1): modified_workflow, node_name, node_export_path, - backend="nvtabular", + backend="python", sparse_max=self.sparse_max, max_batch_size=self.max_batch_size, cats=self.cats, diff --git a/merlin/systems/triton/export.py b/merlin/systems/triton/export.py index 0bc88cf4b..062242bbc 100644 --- a/merlin/systems/triton/export.py +++ b/merlin/systems/triton/export.py @@ -39,7 +39,7 @@ def export_tensorflow_ensemble( label_columns=None, sparse_max=None, version=1, - nvtabular_backend="nvtabular", + nvtabular_backend="python", cats=None, conts=None, ): diff --git a/tests/unit/systems/test_inference_ops.py b/tests/unit/systems/test_inference_ops.py index 6769abce4..a1187f016 100644 --- a/tests/unit/systems/test_inference_ops.py +++ b/tests/unit/systems/test_inference_ops.py @@ -79,4 +79,4 @@ def test_workflow_op_exports_own_config(tmpdir, dataset, engine): # The config file contents are correct assert parsed.name == triton_op.export_name - assert parsed.backend == "nvtabular" + assert parsed.backend == "python" From 604d04916ee4f523233dc0031a2417b7b6460a8b Mon Sep 17 00:00:00 2001 From: Julio Perez Date: Thu, 9 Jun 2022 13:14:38 -0400 Subject: [PATCH 5/5] revert backend change --- merlin/systems/dag/ops/workflow.py | 2 +- merlin/systems/triton/export.py | 2 +- tests/unit/systems/test_inference_ops.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/merlin/systems/dag/ops/workflow.py b/merlin/systems/dag/ops/workflow.py index b67cc0261..9bc4ed8ac 100644 --- a/merlin/systems/dag/ops/workflow.py +++ b/merlin/systems/dag/ops/workflow.py @@ -91,7 +91,7 @@ def export(self, path, input_schema, output_schema, node_id=None, version=1): modified_workflow, node_name, node_export_path, - backend="python", + backend="nvtabular", sparse_max=self.sparse_max, max_batch_size=self.max_batch_size, cats=self.cats, diff --git a/merlin/systems/triton/export.py b/merlin/systems/triton/export.py index 062242bbc..0bc88cf4b 100644 --- a/merlin/systems/triton/export.py +++ b/merlin/systems/triton/export.py @@ -39,7 +39,7 @@ def export_tensorflow_ensemble( label_columns=None, sparse_max=None, version=1, - nvtabular_backend="python", + nvtabular_backend="nvtabular", cats=None, conts=None, ): diff --git a/tests/unit/systems/test_inference_ops.py b/tests/unit/systems/test_inference_ops.py index a1187f016..6769abce4 100644 --- a/tests/unit/systems/test_inference_ops.py +++ b/tests/unit/systems/test_inference_ops.py @@ -79,4 +79,4 @@ def test_workflow_op_exports_own_config(tmpdir, dataset, engine): # The config file contents are correct assert parsed.name == triton_op.export_name - assert parsed.backend == "python" + assert parsed.backend == "nvtabular"