Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Merlin e2e example #117

Merged
merged 9 commits into from
Jun 9, 2022
4 changes: 2 additions & 2 deletions merlin/systems/dag/ops/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def from_feature_view(
entity_dtype = np.int64
ent_is_list = False
ent_is_ragged = False
for entity in store.list_entities():
for idx, entity in enumerate(store.list_entities()):
if entity.name == entity_id:
entity_dtype, ent_is_list, ent_is_ragged = feast_2_numpy[
store.list_entities()[0].value_type
store.list_entities()[idx].value_type
]

features = []
Expand Down
4 changes: 2 additions & 2 deletions merlin/systems/dag/ops/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def export(
node_export_path = pathlib.Path(path) / node_name
node_export_path.mkdir(parents=True, exist_ok=True)

config = model_config.ModelConfig(name=node_name, backend="nvtabular", platform="op_runner")
config = model_config.ModelConfig(name=node_name, backend="python", platform="op_runner")

config.parameters["operator_names"].string_value = json.dumps([node_name])

Expand All @@ -225,7 +225,7 @@ def export(
# this assumes the list columns are 1D tensors both for cats and conts
config.output.append(
model_config.ModelOutput(
name=col_name.split("/")[0],
name=col_name,
data_type=_convert_dtype(col_dict["dtype"]),
dims=[-1, -1],
)
Expand Down
52 changes: 22 additions & 30 deletions merlin/systems/dag/ops/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def __init__(self, model_or_path, custom_objects: dict = None):

signatures = getattr(self.model, "signatures", {}) or {}
default_signature = signatures.get("serving_default")

if not default_signature:
# roundtrip saved self.model to disk to generate signature if it doesn't exist

Expand All @@ -75,22 +74,16 @@ def __init__(self, model_or_path, custom_objects: dict = None):
reloaded = tf.keras.models.load_model(tf_model_path)
default_signature = reloaded.signatures["serving_default"]

inputs = list(default_signature.structured_input_signature[1].values())
outputs = list(default_signature.structured_outputs.values())

input_col_names = [col.name.split("/")[0] for col in inputs]
output_col_names = [col.name.split("/")[0] for col in outputs]

self.input_schema = Schema()
for col, input_col in zip(input_col_names, inputs):
self.input_schema.column_schemas[col] = ColumnSchema(
col, dtype=input_col.dtype.as_numpy_dtype
for col_name, col in default_signature.structured_input_signature[1].items():
self.input_schema.column_schemas[col_name] = ColumnSchema(
col_name, dtype=col.dtype.as_numpy_dtype
)

self.output_schema = Schema()
for col, output_col in zip(output_col_names, outputs):
self.output_schema.column_schemas[col] = ColumnSchema(
col, dtype=output_col.dtype.as_numpy_dtype
for col_name, col in default_signature.structured_outputs.items():
self.output_schema.column_schemas[col_name] = ColumnSchema(
col_name, dtype=col.dtype.as_numpy_dtype
)
super().__init__()

Expand Down Expand Up @@ -121,11 +114,17 @@ def compute_input_schema(
deps_schema: Schema,
selector: ColumnSelector,
) -> Schema:
"""
Use the input schema supplied during object creation.
"""
return self.input_schema

def compute_output_schema(
self, input_schema: Schema, col_selector: ColumnSelector, prev_output_schema: Schema = None
) -> Schema:
"""
Use the output schema supplied during object creation.
"""
return self.output_schema

def _export_model(self, model, name, output_path, version=1):
Expand All @@ -145,35 +144,28 @@ def _export_model(self, model, name, output_path, version=1):
name=name, backend="tensorflow", platform="tensorflow_savedmodel"
)

inputs, outputs = model.inputs, model.outputs

if not inputs or not outputs:
signatures = getattr(model, "signatures", {}) or {}
default_signature = signatures.get("serving_default")
if not default_signature:
# roundtrip saved model to disk to generate signature if it doesn't exist

reloaded = tf.keras.models.load_model(tf_model_path)
default_signature = reloaded.signatures["serving_default"]

inputs = list(default_signature.structured_input_signature[1].values())
outputs = list(default_signature.structured_outputs.values())
signatures = getattr(model, "signatures", {}) or {}
default_signature = signatures.get("serving_default")
if not default_signature:
# roundtrip saved model to disk to generate signature if it doesn't exist
reloaded = tf.keras.models.load_model(tf_model_path)
default_signature = reloaded.signatures["serving_default"]

config.parameters["TF_GRAPH_TAG"].string_value = "serve"
config.parameters["TF_SIGNATURE_DEF"].string_value = "serving_default"

for col in inputs:
for col_name, col in default_signature.structured_input_signature[1].items():
config.input.append(
model_config.ModelInput(
name=f"{col.name}", data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]]
name=col_name, data_type=_convert_dtype(col.dtype), dims=[-1, col.shape[1]]
)
)

for col in outputs:
for col_name, col in default_signature.structured_outputs.items():
# this assumes the list columns are 1D tensors both for cats and conts
config.output.append(
model_config.ModelOutput(
name=col.name.split("/")[0],
name=col_name,
data_type=_convert_dtype(col.dtype),
dims=[-1, col.shape[1]],
)
Expand Down