diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 061e65cfd9..952b30eb0a 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -23,8 +23,8 @@ message FeatureServiceSpec { // Name of Feast project that this Feature Service belongs to. string project = 2; - // List of features that this feature service encapsulates. - // Stored as a list of references to other features views and the features from those views. + // Represents a projection that's to be applied on top of the FeatureView. + // Contains data such as the features to use from a FeatureView. repeated FeatureViewProjection features = 3; // User defined metadata diff --git a/protos/feast/core/FeatureViewProjection.proto b/protos/feast/core/FeatureViewProjection.proto index a7b9ae9a89..d9c80db0b8 100644 --- a/protos/feast/core/FeatureViewProjection.proto +++ b/protos/feast/core/FeatureViewProjection.proto @@ -8,7 +8,8 @@ option java_package = "feast.proto.core"; import "feast/core/Feature.proto"; -// A reference to features in a feature view +// A projection to be applied on top of a FeatureView. +// Contains the modifications to a FeatureView such as the features subset to use. message FeatureViewProjection { // The feature view name string feature_view_name = 1; diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 2701b6d4ed..12be1e356d 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -175,7 +175,7 @@ def feature_service_list(ctx: click.Context): feature_services = [] for feature_service in store.list_feature_services(): feature_names = [] - for projection in feature_service.features: + for projection in feature_service.feature_view_projections: feature_names.extend( [f"{projection.name}:{feature.name}" for feature in projection.features] ) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 594e2bdb1e..9f8e9af1fb 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -31,7 +31,7 @@ class FeatureService: """ name: str - features: List[FeatureViewProjection] + feature_view_projections: List[FeatureViewProjection] tags: Dict[str, str] description: Optional[str] = None created_timestamp: Optional[datetime] = None @@ -41,9 +41,7 @@ class FeatureService: def __init__( self, name: str, - features: List[ - Union[FeatureTable, FeatureView, OnDemandFeatureView, FeatureViewProjection] - ], + features: List[Union[FeatureTable, FeatureView, OnDemandFeatureView]], tags: Optional[Dict[str, str]] = None, description: Optional[str] = None, ): @@ -54,18 +52,23 @@ def __init__( ValueError: If one of the specified features is not a valid type. """ self.name = name - self.features = [] - for feature in features: - if ( - isinstance(feature, FeatureTable) - or isinstance(feature, FeatureView) - or isinstance(feature, OnDemandFeatureView) + self.feature_view_projections = [] + + for feature_grouping in features: + if isinstance(feature_grouping, FeatureTable): + self.feature_view_projections.append( + FeatureViewProjection.from_definition(feature_grouping) + ) + elif isinstance(feature_grouping, FeatureView) or isinstance( + feature_grouping, OnDemandFeatureView ): - self.features.append(FeatureViewProjection.from_definition(feature)) - elif isinstance(feature, FeatureViewProjection): - self.features.append(feature) + self.feature_view_projections.append(feature_grouping.projection) else: - raise ValueError(f"Unexpected type: {type(feature)}") + raise ValueError( + "The FeatureService {fs_name} has been provided with an invalid type" + f'{type(feature_grouping)} as part of the "features" argument.)' + ) + self.tags = tags or {} self.description = description self.created_timestamp = None @@ -89,7 +92,9 @@ def __eq__(self, other): if self.tags != other.tags or self.name != other.name: return False - if sorted(self.features) != sorted(other.features): + if sorted(self.feature_view_projections) != sorted( + other.feature_view_projections + ): return False return True @@ -104,10 +109,7 @@ def from_proto(feature_service_proto: FeatureServiceProto): """ fs = FeatureService( name=feature_service_proto.spec.name, - features=[ - FeatureViewProjection.from_proto(fp) - for fp in feature_service_proto.spec.features - ], + features=[], tags=dict(feature_service_proto.spec.tags), description=( feature_service_proto.spec.description @@ -115,6 +117,12 @@ def from_proto(feature_service_proto: FeatureServiceProto): else None ), ) + fs.feature_view_projections.extend( + [ + FeatureViewProjection.from_proto(projection) + for projection in feature_service_proto.spec.features + ] + ) if feature_service_proto.meta.HasField("created_timestamp"): fs.created_timestamp = ( @@ -138,19 +146,12 @@ def to_proto(self) -> FeatureServiceProto: if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) - spec = FeatureServiceSpec() - spec.name = self.name - for definition in self.features: - if isinstance(definition, FeatureTable) or isinstance( - definition, FeatureView - ): - feature_ref = FeatureViewProjection( - definition.name, definition.features - ) - else: - feature_ref = definition - - spec.features.append(feature_ref.to_proto()) + spec = FeatureServiceSpec( + name=self.name, + features=[ + projection.to_proto() for projection in self.feature_view_projections + ], + ) if self.tags: spec.tags.update(self.tags) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 68627220cb..8c93a39767 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -316,12 +316,19 @@ def _get_features( if not _features: raise ValueError("No features specified for retrieval") - _feature_refs: List[str] + _feature_refs = [] if isinstance(_features, FeatureService): - # Get the latest value of the feature service, in case the object passed in has been updated underneath us. - _feature_refs = _get_feature_refs_from_feature_services( - self.get_feature_service(_features.name) - ) + feature_service_from_registry = self.get_feature_service(_features.name) + if feature_service_from_registry != _features: + warnings.warn( + "The FeatureService object that has been passed in as an argument is" + "inconsistent with the version from Registry. Potentially a newer version" + "of the FeatureService has been applied to the registry." + ) + for projection in feature_service_from_registry.feature_view_projections: + _feature_refs.extend( + [f"{projection.name}:{f.name}" for f in projection.features] + ) else: assert isinstance(_features, list) _feature_refs = _features @@ -542,10 +549,8 @@ def get_historical_features( ) _feature_refs = self._get_features(features, feature_refs) - - all_feature_views = self.list_feature_views() - all_on_demand_feature_views = self._registry.list_on_demand_feature_views( - project=self.project + all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use( + features ) # TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider. @@ -805,11 +810,8 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._list_feature_views( - allow_cache=True, hide_dummy_entity=False - ) - all_on_demand_feature_views = self._registry.list_on_demand_feature_views( - project=self.project, allow_cache=True + all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use( + features=features, allow_cache=True, hide_dummy_entity=False ) _validate_feature_refs(_feature_refs, full_feature_names) @@ -1018,6 +1020,43 @@ def _augment_response_with_on_demand_transforms( ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows)) + def _get_feature_views_to_use( + self, + features: Optional[Union[List[str], FeatureService]], + allow_cache=False, + hide_dummy_entity: bool = True, + ) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]: + + fvs = { + fv.name: fv + for fv in self._list_feature_views(allow_cache, hide_dummy_entity) + } + + od_fvs = { + fv.name: fv + for fv in self._registry.list_on_demand_feature_views( + project=self.project, allow_cache=allow_cache + ) + } + + if isinstance(features, FeatureService): + for fv_name, projection in { + projection.name: projection + for projection in features.feature_view_projections + }.items(): + if fv_name in fvs: + fvs[fv_name].set_projection(projection) + elif fv_name in od_fvs: + od_fvs[fv_name].set_projection(projection) + else: + raise ValueError( + f"The provided feature service {features.name} contains a reference to a feature view" + f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" + f'{fv_name} and that you have registered it by running "apply".' + ) + + return [*fvs.values()], [*od_fvs.values()] + @log_exceptions_and_usage def serve(self, port: int) -> None: """Start the feature consumption server locally on a given port.""" @@ -1070,7 +1109,7 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( - features: Union[List[str], FeatureService], + features: List[str], all_feature_views: List[FeatureView], all_on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[ @@ -1090,21 +1129,14 @@ def _group_feature_refs( # on demand view name to feature names on_demand_view_features = defaultdict(list) - if isinstance(features, list) and isinstance(features[0], str): - for ref in features: - view_name, feat_name = ref.split(":") - if view_name in view_index: - views_features[view_name].append(feat_name) - elif view_name in on_demand_view_index: - on_demand_view_features[view_name].append(feat_name) - else: - raise FeatureViewNotFoundException(view_name) - elif isinstance(features, FeatureService): - for feature_projection in features.features: - projected_features = feature_projection.features - views_features[feature_projection.name].extend( - [f.name for f in projected_features] - ) + for ref in features: + view_name, feat_name = ref.split(":") + if view_name in view_index: + views_features[view_name].append(feat_name) + elif view_name in on_demand_view_index: + on_demand_view_features[view_name].append(feat_name) + else: + raise FeatureViewNotFoundException(view_name) fvs_result: List[Tuple[FeatureView, List[str]]] = [] odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = [] @@ -1116,17 +1148,6 @@ def _group_feature_refs( return fvs_result, odfvs_result -def _get_feature_refs_from_feature_services( - feature_service: FeatureService, -) -> List[str]: - feature_refs = [] - for projection in feature_service.features: - feature_refs.extend( - [f"{projection.name}:{f.name}" for f in projection.features] - ) - return feature_refs - - def _get_table_entity_keys( table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str], ) -> List[EntityKeyProto]: diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 54c5032c6d..9752449c59 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -78,6 +78,7 @@ class FeatureView: created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None materialization_intervals: List[Tuple[datetime, datetime]] + projection: FeatureViewProjection @log_exceptions def __init__( @@ -141,6 +142,8 @@ def __init__( self.created_timestamp: Optional[datetime] = None self.last_updated_timestamp: Optional[datetime] = None + self.projection = FeatureViewProjection.from_definition(self) + def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) return f"<{self.__class__.__name__}({', '.join(items)})>" @@ -151,7 +154,7 @@ def __str__(self): def __hash__(self): return hash((id(self), self.name)) - def __getitem__(self, item) -> FeatureViewProjection: + def __getitem__(self, item): assert isinstance(item, list) referenced_features = [] @@ -159,7 +162,9 @@ def __getitem__(self, item) -> FeatureViewProjection: if feature.name in item: referenced_features.append(feature) - return FeatureViewProjection(self.name, referenced_features) + self.projection.features = referenced_features + + return self def __eq__(self, other): if not isinstance(other, FeatureView): @@ -283,6 +288,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): stream_source=stream_source, ) + # FeatureViewProjections are not saved in the FeatureView proto. + # Create the default projection. + feature_view.projection = FeatureViewProjection.from_definition(feature_view) + if feature_view_proto.meta.HasField("created_timestamp"): feature_view.created_timestamp = ( feature_view_proto.meta.created_timestamp.ToDatetime() @@ -379,3 +388,31 @@ def infer_features_from_batch_source(self, config: RepoConfig): "FeatureView", f"Could not infer Features for the FeatureView named {self.name}.", ) + + def set_projection(self, feature_view_projection: FeatureViewProjection) -> None: + """ + Setter for the projection object held by this FeatureView. A projection is an + object that stores the modifications to a FeatureView that is applied to the FeatureView + when the FeatureView is used such as during feature_store.get_historical_features. + This method also performs checks to ensure the projection is consistent with this + FeatureView before doing the set. + + Args: + feature_view_projection: The FeatureViewProjection object to set this FeatureView's + 'projection' field to. + """ + if feature_view_projection.name != self.name: + raise ValueError( + f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. " + f"The projection is named {feature_view_projection.name} and the name indicates which " + "FeatureView the projection is for." + ) + + for feature in feature_view_projection.features: + if feature not in self.features: + raise ValueError( + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the " + "FeatureView doesn't have." + ) + + self.projection = feature_view_projection diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index 15b24889da..1b2961302a 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -31,7 +31,7 @@ def from_proto(proto: FeatureViewProjectionProto): return ref @staticmethod - def from_definition(feature_definition): + def from_definition(feature_grouping): return FeatureViewProjection( - name=feature_definition.name, features=feature_definition.features + name=feature_grouping.name, features=feature_grouping.features ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 9cb274eb0d..d97da20781 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -45,6 +45,7 @@ class OnDemandFeatureView: features: List[Feature] inputs: Dict[str, Union[FeatureView, RequestDataSource]] udf: MethodType + projection: FeatureViewProjection @log_exceptions def __init__( @@ -62,6 +63,7 @@ def __init__( self.features = features self.inputs = inputs self.udf = udf + self.projection = FeatureViewProjection.from_definition(self) def __hash__(self) -> int: return hash((id(self), self.name)) @@ -135,6 +137,12 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ), ) + # FeatureViewProjections are not saved in the OnDemandFeatureView proto. + # Create the default projection. + on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( + on_demand_feature_view_obj + ) + return on_demand_feature_view_obj def get_transformed_features_df( @@ -164,7 +172,7 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features - def __getitem__(self, item) -> FeatureViewProjection: + def __getitem__(self, item): assert isinstance(item, list) referenced_features = [] @@ -172,7 +180,9 @@ def __getitem__(self, item) -> FeatureViewProjection: if feature.name in item: referenced_features.append(feature) - return FeatureViewProjection(self.name, referenced_features) + self.projection.features = referenced_features + + return self def infer_features(self): """ @@ -237,6 +247,26 @@ def get_requested_odfvs(feature_refs, project, registry): break return requested_on_demand_feature_views + def set_projection(self, feature_view_projection: FeatureViewProjection) -> None: + """ + Setter for the projection object held by this FeatureView. Performs checks to ensure + the projection is consistent with this FeatureView before doing the set. + + Args: + feature_view_projection: The FeatureViewProjection object to set this FeatureView's + 'projection' field to. + """ + assert feature_view_projection.name == self.name + + for feature in feature_view_projection.features: + if feature not in self.features: + raise ValueError( + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the" + "FeatureView doesn't have." + ) + + self.projection = feature_view_projection + def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]): """ diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 3f124632e7..aeab91e5f8 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -253,7 +253,12 @@ def assert_feature_service_correctness( assert ( len(feature_service_keys) - == sum([len(projection.features) for projection in feature_service.features]) + == sum( + [ + len(projection.features) + for projection in feature_service.feature_view_projections + ] + ) + 3 ) # Add two for the driver id and the customer id entity keys and val_to_add request data