Skip to content

Commit

Permalink
Refactor logic into odfv
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Sep 1, 2021
1 parent d34e227 commit 61c9f4d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
20 changes: 6 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,29 +787,21 @@ def _augment_response_with_on_demand_transforms(
return initial_response
initial_response_df = initial_response.to_df()
# Apply on demand transformations
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
for odfv in all_on_demand_feature_views:
feature_ref = odfv.name
if feature_ref in feature_refs:
# Copy over un-prefixed features even if not requested since transform may need it
if full_feature_names:
for input_fv in odfv.inputs.values():
for feature in input_fv.features:
full_feature_ref = f"{input_fv.name}__{feature.name}"
if full_feature_ref in initial_response_df.keys():
initial_response_df[feature.name] = initial_response_df[
full_feature_ref
]

# Compute transformed values and apply to each result row
ret_value = odfv.udf.__call__(initial_response_df)
transformed_features_df = odfv.get_transformed_features_df(
full_feature_names, initial_response_df
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
# TODO(adchia): support multiple output features in an ODFV, which requires different naming
# conventions
result_row.fields[odfv.name].CopyFrom(
python_value_to_proto_value(
ret_value[odfv.features[0].name].values[row_idx]
transformed_features_df[odfv.features[0].name].values[
row_idx
]
)
)
result_row.statuses[
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, List

import dill
import pandas as pd

from feast.feature import Feature
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -100,6 +101,30 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):

return on_demand_feature_view_obj

def get_transformed_features_df(
self, full_feature_names: bool, df_with_features: pd.DataFrame
) -> pd.DataFrame:
# Apply on demand transformations
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
# Copy over un-prefixed features even if not requested since transform may need it
columns_to_cleanup = []
if full_feature_names:
for input_fv in self.inputs.values():
for feature in input_fv.features:
full_feature_ref = f"{input_fv.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
df_with_features[feature.name] = df_with_features[
full_feature_ref
]
columns_to_cleanup.append(feature.name)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

# Cleanup extra columns used for transformation
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features


def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]):
"""
Expand Down

0 comments on commit 61c9f4d

Please sign in to comment.