Skip to content

Commit

Permalink
Respect full_feature_names for ODFVs (#2144)
Browse files Browse the repository at this point in the history
* Respect `full_feature_names` for ODFVs

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Add test for ODFV `full_feature_names`

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Correct feature names in tests

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
  • Loading branch information
judahrand authored Dec 17, 2021
1 parent 5ce0fdb commit 97fbd3e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 26 deletions.
19 changes: 9 additions & 10 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,11 @@ def _augment_response_with_on_demand_transforms(
for feature_ref in feature_refs:
view_name, feature_name = feature_ref.split(":")
if view_name in requested_odfv_feature_names:
odfv_feature_refs[view_name].append(feature_name)
odfv_feature_refs[view_name].append(
f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}"
if full_feature_names
else feature_name
)

initial_response = OnlineResponse(
GetOnlineFeaturesResponse(field_values=result_rows)
Expand All @@ -1359,7 +1363,7 @@ def _augment_response_with_on_demand_transforms(
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = requested_odfv_map[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
initial_response_df
initial_response_df, full_feature_names,
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
Expand All @@ -1369,18 +1373,13 @@ def _augment_response_with_on_demand_transforms(
]

for transformed_feature in selected_subset:
transformed_feature_name = (
f"{odfv.projection.name_to_use()}__{transformed_feature}"
if full_feature_names
else transformed_feature
)
odfv_result_names.add(transformed_feature_name)
odfv_result_names.add(transformed_feature)
proto_value = python_value_to_proto_value(
transformed_features_df[transformed_feature].values[row_idx]
)
result_row.fields[transformed_feature_name].CopyFrom(proto_value)
result_row.fields[transformed_feature].CopyFrom(proto_value)
result_row.statuses[
transformed_feature_name
transformed_feature
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

# Drop values that aren't needed
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def to_df(self) -> pd.DataFrame:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(features_df)
odfv.get_transformed_features_df(features_df, self.full_feature_names,)
)
return features_df

Expand All @@ -69,7 +69,7 @@ def to_arrow(self) -> pyarrow.Table:
features_df = self._to_df_internal()
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(features_df)
odfv.get_transformed_features_df(features_df, self.full_feature_names,)
)
return pyarrow.Table.from_pandas(features_df)

Expand Down
18 changes: 16 additions & 2 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def get_request_data_schema(self) -> Dict[str, ValueType]:
return schema

def get_transformed_features_df(
self, df_with_features: pd.DataFrame
self, df_with_features: pd.DataFrame, full_feature_names: bool = False,
) -> pd.DataFrame:
# Apply on demand transformations
columns_to_cleanup = []
Expand All @@ -183,9 +183,23 @@ def get_transformed_features_df(
# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = f"{self.projection.name_to_use()}__{feature.name}"
if (
short_name in df_with_transformed_features.columns
and full_feature_names
):
rename_columns[short_name] = long_name
elif not full_feature_names:
# Long name must be in dataframe.
rename_columns[long_name] = short_name

# Cleanup extra columns used for transformation
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features
return df_with_transformed_features.rename(columns=rename_columns)

def infer_features(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def TransformFeatures(self, request, context):

df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas()

result_df = odfv.get_transformed_features_df(df)
result_df = odfv.get_transformed_features_df(df, True)
result_arrow = pa.Table.from_pandas(result_df)
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, result_arrow.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,21 @@ def get_expected_training_df(
expected_df[col] = expected_df[col].astype(typ)

conv_feature_name = "driver_stats__conv_rate" if full_feature_names else "conv_rate"
expected_df["conv_rate_plus_100"] = expected_df[conv_feature_name] + 100
expected_df["conv_rate_plus_100_rounded"] = (
expected_df["conv_rate_plus_100"]
conv_plus_feature_name = response_feature_name(
"conv_rate_plus_100", full_feature_names
)
expected_df[conv_plus_feature_name] = expected_df[conv_feature_name] + 100
expected_df[
response_feature_name("conv_rate_plus_100_rounded", full_feature_names)
] = (
expected_df[conv_plus_feature_name]
.astype("float")
.round()
.astype(pd.Int32Dtype())
)
expected_df["conv_rate_plus_val_to_add"] = (
expected_df[conv_feature_name] + expected_df["val_to_add"]
)
expected_df[
response_feature_name("conv_rate_plus_val_to_add", full_feature_names)
] = (expected_df[conv_feature_name] + expected_df["val_to_add"])

return expected_df

Expand Down Expand Up @@ -380,10 +385,10 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
# Not requesting the on demand transform with an entity_df query (can't add request data in them)
expected_df_query = expected_df.drop(
columns=[
"conv_rate_plus_100",
"conv_rate_plus_100_rounded",
response_feature_name("conv_rate_plus_100", full_feature_names),
response_feature_name("conv_rate_plus_100_rounded", full_feature_names),
response_feature_name("conv_rate_plus_val_to_add", full_feature_names),
"val_to_add",
"conv_rate_plus_val_to_add",
"driver_age",
]
)
Expand Down Expand Up @@ -638,7 +643,15 @@ def response_feature_name(feature: str, full_feature_names: bool) -> str:
if feature in {"conv_rate", "avg_daily_trips"} and full_feature_names:
return f"driver_stats__{feature}"

if feature in {"conv_rate_plus_100"} and full_feature_names:
if (
feature
in {
"conv_rate_plus_100",
"conv_rate_plus_100_rounded",
"conv_rate_plus_val_to_add",
}
and full_feature_names
):
return f"conv_rate_plus_100__{feature}"

return feature
Expand Down Expand Up @@ -670,7 +683,7 @@ def assert_feature_service_correctness(
"driver_id",
"customer_id",
response_feature_name("conv_rate", full_feature_names),
"conv_rate_plus_100",
response_feature_name("conv_rate_plus_100", full_feature_names),
"driver_age",
]
]
Expand Down

0 comments on commit 97fbd3e

Please sign in to comment.