diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ce8125520e..0141b8f8bc 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1144,6 +1144,9 @@ def get_online_features( # Also create entity values to append to the result result_rows.append(_entity_row_to_field_values(entity_row_proto)) + # Keep track of what has been requested from the OnlineStore + # to avoid requesting the same thing twice for ODFVs. + retrieved_feature_refs: Set[str] = set() for table, requested_features in grouped_refs: table_join_keys = [ entity_name_to_join_key_map[entity_name] @@ -1158,6 +1161,11 @@ def get_online_features( table, union_of_entity_keys, ) + table_feature_names = {feature.name for feature in table.features} + retrieved_feature_refs |= { + f"{table.name}:{feature}" if feature in table_feature_names else feature + for feature in requested_features + } requested_result_row_names = self._get_requested_result_fields( result_rows, needed_request_fv_features @@ -1170,6 +1178,7 @@ def get_online_features( request_data_features, result_rows, union_of_entity_keys, + retrieved_feature_refs, ) self._augment_response_with_on_demand_transforms( @@ -1205,6 +1214,7 @@ def _populate_odfv_dependencies( request_data_features: Dict[str, List[Any]], result_rows: List[GetOnlineFeaturesResponse.FieldValues], union_of_entity_keys: List[EntityKeyProto], + retrieved_feature_refs: Set[str], ): # Add more feature values to the existing result rows for the request data features for feature_name, feature_values in request_data_features.items(): @@ -1223,19 +1233,32 @@ def _populate_odfv_dependencies( if len(grouped_odfv_refs) > 0: for odfv, _ in grouped_odfv_refs: for fv in odfv.input_feature_views.values(): - table_join_keys = [ - entity_name_to_join_key_map[entity_name] - for entity_name in fv.entities - ] - self._populate_result_rows_from_feature_view( - table_join_keys, - full_feature_names, - provider, - [feature.name for feature in fv.features], - result_rows, - fv, - union_of_entity_keys, - ) + # Find the set of required Features which have not yet + # been retrieved. + not_yet_retrieved = { + feature.name + for feature in fv.projection.features + if f"{fv.name}:{feature.name}" not in retrieved_feature_refs + } + # If there are required Features which have not yet been retrieved + # retrieve them. + if not_yet_retrieved: + table_join_keys = [ + entity_name_to_join_key_map[entity_name] + for entity_name in fv.entities + ] + self._populate_result_rows_from_feature_view( + table_join_keys, + full_feature_names, + provider, + list(not_yet_retrieved), + result_rows, + fv, + union_of_entity_keys, + ) + # Update the set of retrieved Features with any newly retrieved + # Features. + retrieved_feature_refs |= not_yet_retrieved def get_needed_request_data( self,