From 61c9f4d44b8b2b3b46b4ec47e074dd1d68f7099a Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Wed, 1 Sep 2021 18:39:35 -0400 Subject: [PATCH] Refactor logic into odfv Signed-off-by: Danny Chiao --- sdk/python/feast/feature_store.py | 20 ++++++----------- sdk/python/feast/on_demand_feature_view.py | 25 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 11757ae31f..c4f1987572 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -787,29 +787,21 @@ def _augment_response_with_on_demand_transforms( return initial_response initial_response_df = initial_response.to_df() # Apply on demand transformations - # TODO(adchia): Include only the feature values from the specified input FVs in the ODFV. for odfv in all_on_demand_feature_views: feature_ref = odfv.name if feature_ref in feature_refs: - # Copy over un-prefixed features even if not requested since transform may need it - if full_feature_names: - for input_fv in odfv.inputs.values(): - for feature in input_fv.features: - full_feature_ref = f"{input_fv.name}__{feature.name}" - if full_feature_ref in initial_response_df.keys(): - initial_response_df[feature.name] = initial_response_df[ - full_feature_ref - ] - - # Compute transformed values and apply to each result row - ret_value = odfv.udf.__call__(initial_response_df) + transformed_features_df = odfv.get_transformed_features_df( + full_feature_names, initial_response_df + ) for row_idx in range(len(result_rows)): result_row = result_rows[row_idx] # TODO(adchia): support multiple output features in an ODFV, which requires different naming # conventions result_row.fields[odfv.name].CopyFrom( python_value_to_proto_value( - ret_value[odfv.features[0].name].values[row_idx] + transformed_features_df[odfv.features[0].name].values[ + row_idx + ] ) ) result_row.statuses[ diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 6b3f812df5..b5b71c164c 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -3,6 +3,7 @@ from typing import Dict, List import dill +import pandas as pd from feast.feature import Feature from feast.feature_view import FeatureView @@ -100,6 +101,30 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): return on_demand_feature_view_obj + def get_transformed_features_df( + self, full_feature_names: bool, df_with_features: pd.DataFrame + ) -> pd.DataFrame: + # Apply on demand transformations + # TODO(adchia): Include only the feature values from the specified input FVs in the ODFV. + # Copy over un-prefixed features even if not requested since transform may need it + columns_to_cleanup = [] + if full_feature_names: + for input_fv in self.inputs.values(): + for feature in input_fv.features: + full_feature_ref = f"{input_fv.name}__{feature.name}" + if full_feature_ref in df_with_features.keys(): + df_with_features[feature.name] = df_with_features[ + full_feature_ref + ] + columns_to_cleanup.append(feature.name) + + # Compute transformed values and apply to each result row + df_with_transformed_features = self.udf.__call__(df_with_features) + + # Cleanup extra columns used for transformation + df_with_features.drop(columns=columns_to_cleanup, inplace=True) + return df_with_transformed_features + def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]): """