Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding support for Native Python feature transformations for On Demand Feature Views #4045

Merged
merged 16 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ def transformation_callback(
# the typeguard requirement.
full_feature_names = bool(full_feature_names)

if odfv.mode != "pandas":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string 'pandas' can be replaced by a enum to avoid typo, but we don't have many "modes" so I think it's good for now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i was going to refactor later and do some clean up to centralize

raise Exception(
f"OnDemandFeatureView mode '{odfv.mode} not supported by EmbeddedOnlineFeatureServer."
)

output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
Expand Down
55 changes: 41 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2096,26 +2096,53 @@ def _augment_response_with_on_demand_transforms(
)

initial_response = OnlineResponse(online_features_response)
initial_response_df = initial_response.to_df()
initial_response_df: Optional[pd.DataFrame] = None
initial_response_dict: Optional[Dict[str, List[Any]]] = None

# Apply on demand transformations and augment the result rows
odfv_result_names = set()
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = requested_odfv_map[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
initial_response_df,
full_feature_names,
)
selected_subset = [
f for f in transformed_features_df.columns if f in _feature_refs
]

proto_values = [
python_values_to_proto_values(
transformed_features_df[feature].values, ValueType.UNKNOWN
if odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
transformed_features_dict: Dict[
str, List[Any]
] = odfv.get_transformed_features(
initial_response_dict,
full_feature_names,
)
for feature in selected_subset
]
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_df is None:
initial_response_df = initial_response.to_df()
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
initial_response_df,
full_feature_names,
)
else:
raise Exception(
f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'."
)

transformed_features = (
transformed_features_dict
if odfv.mode == "python"
else transformed_features_df
)
transformed_columns = (
transformed_features.columns
if isinstance(transformed_features, pd.DataFrame)
else transformed_features
)
selected_subset = [f for f in transformed_columns if f in _feature_refs]

proto_values = []
for selected_feature in selected_subset:
if odfv.mode in ["python", "pandas"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kinda looks like it's breaking substrait, but I guess it's fine. seems like online path isn't tested for substrait in ci, I'll add tests and a fix (if necessary) for this later.

feature_vector = transformed_features[selected_feature]
proto_values.append(
python_values_to_proto_values(feature_vector, ValueType.UNKNOWN)
)

odfv_result_names |= set(selected_subset)

Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def to_df(
if self.on_demand_feature_views:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
for odfv in self.on_demand_feature_views:
if odfv.mode not in {"pandas", "substrait"}:
raise Exception(
f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.'
)
features_df = features_df.join(
odfv.get_transformed_features_df(
features_df,
Expand Down Expand Up @@ -124,6 +128,10 @@ def to_arrow(
features_df = self._to_df_internal(timeout=timeout)
if self.on_demand_feature_views:
for odfv in self.on_demand_feature_views:
if odfv.mode != "pandas":
raise Exception(
f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.'
)
features_df = features_df.join(
odfv.get_transformed_features_df(
features_df,
Expand Down
Loading
Loading