Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid requesting features from OnlineStore twice #2185

Merged
merged 2 commits into from
Jan 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,9 @@ def get_online_features(
# Also create entity values to append to the result
result_rows.append(_entity_row_to_field_values(entity_row_proto))

# Keep track of what has been requested from the OnlineStore
# to avoid requesting the same thing twice for ODFVs.
retrieved_feature_refs: Set[str] = set()
for table, requested_features in grouped_refs:
table_join_keys = [
entity_name_to_join_key_map[entity_name]
Expand All @@ -1158,6 +1161,11 @@ def get_online_features(
table,
union_of_entity_keys,
)
table_feature_names = {feature.name for feature in table.features}
retrieved_feature_refs |= {
f"{table.name}:{feature}" if feature in table_feature_names else feature
for feature in requested_features
}

requested_result_row_names = self._get_requested_result_fields(
result_rows, needed_request_fv_features
Expand All @@ -1170,6 +1178,7 @@ def get_online_features(
request_data_features,
result_rows,
union_of_entity_keys,
retrieved_feature_refs,
)

self._augment_response_with_on_demand_transforms(
Expand Down Expand Up @@ -1205,6 +1214,7 @@ def _populate_odfv_dependencies(
request_data_features: Dict[str, List[Any]],
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
union_of_entity_keys: List[EntityKeyProto],
retrieved_feature_refs: Set[str],
):
# Add more feature values to the existing result rows for the request data features
for feature_name, feature_values in request_data_features.items():
Expand All @@ -1223,19 +1233,32 @@ def _populate_odfv_dependencies(
if len(grouped_odfv_refs) > 0:
for odfv, _ in grouped_odfv_refs:
for fv in odfv.input_feature_views.values():
table_join_keys = [
entity_name_to_join_key_map[entity_name]
for entity_name in fv.entities
]
self._populate_result_rows_from_feature_view(
table_join_keys,
full_feature_names,
provider,
[feature.name for feature in fv.features],
result_rows,
fv,
union_of_entity_keys,
)
# Find the set of required Features which have not yet
# been retrieved.
not_yet_retrieved = {
feature.name
for feature in fv.projection.features
if f"{fv.name}:{feature.name}" not in retrieved_feature_refs
}
# If there are required Features which have not yet been retrieved
# retrieve them.
if not_yet_retrieved:
table_join_keys = [
entity_name_to_join_key_map[entity_name]
for entity_name in fv.entities
]
self._populate_result_rows_from_feature_view(
table_join_keys,
full_feature_names,
provider,
list(not_yet_retrieved),
result_rows,
fv,
union_of_entity_keys,
)
# Update the set of retrieved Features with any newly retrieved
# Features.
retrieved_feature_refs |= not_yet_retrieved

def get_needed_request_data(
self,
Expand Down