From 9c7ebce62a819452b74db9027f39f9c498978cfd Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 4 Jan 2022 12:03:07 +0000 Subject: [PATCH 1/2] Avoid requesting features from OnlineStore twice Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/feature_store.py | 40 +++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ce8125520e..4057b73a63 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1144,6 +1144,9 @@ def get_online_features( # Also create entity values to append to the result result_rows.append(_entity_row_to_field_values(entity_row_proto)) + # Keep track of what has been requested from the OnlineStore + # to avoid requesting the same thing twice for ODFVs. + retrieved_feature_refs: Set[str] = set() for table, requested_features in grouped_refs: table_join_keys = [ entity_name_to_join_key_map[entity_name] @@ -1158,6 +1161,11 @@ def get_online_features( table, union_of_entity_keys, ) + table_feature_names = {feature.name for feature in table.features} + retrieved_feature_refs |= { + f"{table.name}:{feature}" if feature in table_feature_names else feature + for feature in requested_features + } requested_result_row_names = self._get_requested_result_fields( result_rows, needed_request_fv_features @@ -1170,6 +1178,7 @@ def get_online_features( request_data_features, result_rows, union_of_entity_keys, + retrieved_feature_refs, ) self._augment_response_with_on_demand_transforms( @@ -1205,6 +1214,7 @@ def _populate_odfv_dependencies( request_data_features: Dict[str, List[Any]], result_rows: List[GetOnlineFeaturesResponse.FieldValues], union_of_entity_keys: List[EntityKeyProto], + retrieved_feature_refs: Set[str], ): # Add more feature values to the existing result rows for the request data features for feature_name, feature_values in request_data_features.items(): @@ -1223,19 +1233,25 @@ def _populate_odfv_dependencies( if len(grouped_odfv_refs) > 0: for odfv, _ in grouped_odfv_refs: for fv in odfv.input_feature_views.values(): - table_join_keys = [ - entity_name_to_join_key_map[entity_name] - for entity_name in fv.entities + feature_deps = [ + feature.name + for feature in fv.features + if f"{fv.name}:{feature.name}" not in retrieved_feature_refs ] - self._populate_result_rows_from_feature_view( - table_join_keys, - full_feature_names, - provider, - [feature.name for feature in fv.features], - result_rows, - fv, - union_of_entity_keys, - ) + if feature_deps: + table_join_keys = [ + entity_name_to_join_key_map[entity_name] + for entity_name in fv.entities + ] + self._populate_result_rows_from_feature_view( + table_join_keys, + full_feature_names, + provider, + feature_deps, + result_rows, + fv, + union_of_entity_keys, + ) def get_needed_request_data( self, From 916c96c9e59aefe3cdc8d37894d17af034d3b8a3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 4 Jan 2022 18:56:08 +0000 Subject: [PATCH 2/2] Fix edge case where multiple ODFVs reference the same FeatureView. Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/feature_store.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4057b73a63..0141b8f8bc 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1233,12 +1233,16 @@ def _populate_odfv_dependencies( if len(grouped_odfv_refs) > 0: for odfv, _ in grouped_odfv_refs: for fv in odfv.input_feature_views.values(): - feature_deps = [ + # Find the set of required Features which have not yet + # been retrieved. + not_yet_retrieved = { feature.name - for feature in fv.features + for feature in fv.projection.features if f"{fv.name}:{feature.name}" not in retrieved_feature_refs - ] - if feature_deps: + } + # If there are required Features which have not yet been retrieved + # retrieve them. + if not_yet_retrieved: table_join_keys = [ entity_name_to_join_key_map[entity_name] for entity_name in fv.entities @@ -1247,11 +1251,14 @@ def _populate_odfv_dependencies( table_join_keys, full_feature_names, provider, - feature_deps, + list(not_yet_retrieved), result_rows, fv, union_of_entity_keys, ) + # Update the set of retrieved Features with any newly retrieved + # Features. + retrieved_feature_refs |= not_yet_retrieved def get_needed_request_data( self,