diff --git a/protos/feast/core/Registry.proto b/protos/feast/core/Registry.proto index b8570301e9..035e87a49f 100644 --- a/protos/feast/core/Registry.proto +++ b/protos/feast/core/Registry.proto @@ -26,6 +26,7 @@ import "feast/core/FeatureService.proto"; import "feast/core/FeatureTable.proto"; import "feast/core/FeatureView.proto"; import "feast/core/OnDemandFeatureView.proto"; +import "feast/core/RequestFeatureView.proto"; import "google/protobuf/timestamp.proto"; message Registry { @@ -33,6 +34,7 @@ message Registry { repeated FeatureTable feature_tables = 2; repeated FeatureView feature_views = 6; repeated OnDemandFeatureView on_demand_feature_views = 8; + repeated RequestFeatureView request_feature_views = 9; repeated FeatureService feature_services = 7; string registry_schema_version = 3; // to support migrations; incremented when schema is changed diff --git a/protos/feast/core/RequestFeatureView.proto b/protos/feast/core/RequestFeatureView.proto new file mode 100644 index 0000000000..c9ee540e6f --- /dev/null +++ b/protos/feast/core/RequestFeatureView.proto @@ -0,0 +1,43 @@ +// +// Copyright 2021 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + + +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "RequestFeatureViewProto"; +option java_package = "feast.proto.core"; + +import "feast/core/FeatureView.proto"; +import "feast/core/Feature.proto"; +import "feast/core/DataSource.proto"; + +message RequestFeatureView { + // User-specified specifications of this feature view. + RequestFeatureViewSpec spec = 1; +} + +message RequestFeatureViewSpec { + // Name of the feature view. Must be unique. Not updated. + string name = 1; + + // Name of Feast project that this feature view belongs to. + string project = 2; + + // Request data which contains the underlying data schema and list of associated features + DataSource request_data_source = 3; +} diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py new file mode 100644 index 0000000000..10f949d9a1 --- /dev/null +++ b/sdk/python/feast/base_feature_view.py @@ -0,0 +1,204 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import warnings +from abc import ABC, abstractmethod +from typing import List, Type + +from google.protobuf.json_format import MessageToJson +from proto import Message + +from feast.feature import Feature +from feast.feature_view_projection import FeatureViewProjection + +warnings.simplefilter("once", DeprecationWarning) + + +class BaseFeatureView(ABC): + """A FeatureView defines a logical grouping of features to be served.""" + + @abstractmethod + def __init__(self, name: str, features: List[Feature]): + self._name = name + self._features = features + self._projection = FeatureViewProjection.from_definition(self) + + @property + def name(self) -> str: + return self._name + + @property + def features(self) -> List[Feature]: + return self._features + + @features.setter + def features(self, value): + self._features = value + + @property + def projection(self) -> FeatureViewProjection: + return self._projection + + @projection.setter + def projection(self, value): + self._projection = value + + @property + @abstractmethod + def proto_class(self) -> Type[Message]: + pass + + @abstractmethod + def to_proto(self) -> Message: + pass + + @classmethod + @abstractmethod + def from_proto(cls, feature_view_proto): + pass + + @abstractmethod + def __copy__(self): + """ + Generates a deep copy of this feature view + + Returns: + A copy of this FeatureView + """ + pass + + def __repr__(self): + items = (f"{k} = {v}" for k, v in self.__dict__.items()) + return f"<{self.__class__.__name__}({', '.join(items)})>" + + def __str__(self): + return str(MessageToJson(self.to_proto())) + + def __hash__(self): + return hash((id(self), self.name)) + + def __getitem__(self, item): + assert isinstance(item, list) + + referenced_features = [] + for feature in self.features: + if feature.name in item: + referenced_features.append(feature) + + cp = self.__copy__() + cp.projection.features = referenced_features + + return cp + + def __eq__(self, other): + if not isinstance(other, BaseFeatureView): + raise TypeError( + "Comparisons should only involve BaseFeatureView class objects." + ) + + if self.name != other.name: + return False + + if sorted(self.features) != sorted(other.features): + return False + + return True + + def ensure_valid(self): + """ + Validates the state of this feature view locally. + + Raises: + ValueError: The feature view is invalid. + """ + if not self.name: + raise ValueError("Feature view needs a name.") + + def with_name(self, name: str): + """ + Renames this feature view by returning a copy of this feature view with an alias + set for the feature view name. This rename operation is only used as part of query + operations and will not modify the underlying FeatureView. + + Args: + name: Name to assign to the FeatureView copy. + + Returns: + A copy of this FeatureView with the name replaced with the 'name' input. + """ + cp = self.__copy__() + cp.projection.name_alias = name + + return cp + + def set_projection(self, feature_view_projection: FeatureViewProjection) -> None: + """ + Setter for the projection object held by this FeatureView. A projection is an + object that stores the modifications to a FeatureView that is applied to the FeatureView + when the FeatureView is used such as during feature_store.get_historical_features. + This method also performs checks to ensure the projection is consistent with this + FeatureView before doing the set. + + Args: + feature_view_projection: The FeatureViewProjection object to set this FeatureView's + 'projection' field to. + """ + if feature_view_projection.name != self.name: + raise ValueError( + f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. " + f"The projection is named {feature_view_projection.name} and the name indicates which " + "FeatureView the projection is for." + ) + + for feature in feature_view_projection.features: + if feature not in self.features: + raise ValueError( + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the " + "FeatureView doesn't have." + ) + + self.projection = feature_view_projection + + def with_projection(self, feature_view_projection: FeatureViewProjection): + """ + Sets the feature view projection by returning a copy of this on-demand feature view + with its projection set to the given projection. A projection is an + object that stores the modifications to a feature view that is used during + query operations. + + Args: + feature_view_projection: The FeatureViewProjection object to link to this + OnDemandFeatureView. + + Returns: + A copy of this OnDemandFeatureView with its projection replaced with the + 'feature_view_projection' argument. + """ + if feature_view_projection.name != self.name: + raise ValueError( + f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. " + f"The projection is named {feature_view_projection.name} and the name indicates which " + "FeatureView the projection is for." + ) + + for feature in feature_view_projection.features: + if feature not in self.features: + raise ValueError( + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the " + "FeatureView doesn't have." + ) + + cp = self.__copy__() + cp.projection = feature_view_projection + + return cp diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index a01d387a22..71d7ceab64 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -25,6 +25,8 @@ from feast import flags, flags_helper, utils from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError from feast.feature_store import FeatureStore +from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import load_repo_config from feast.repo_operations import ( apply_total, @@ -261,12 +263,29 @@ def feature_view_list(ctx: click.Context): cli_check_repo(repo) store = FeatureStore(repo_path=str(repo)) table = [] - for feature_view in store.list_feature_views(): - table.append([feature_view.name, feature_view.entities]) + for feature_view in [ + *store.list_feature_views(), + *store.list_request_feature_views(), + *store.list_on_demand_feature_views(), + ]: + entities = set() + if isinstance(feature_view, FeatureView): + entities.update(feature_view.entities) + elif isinstance(feature_view, OnDemandFeatureView): + for backing_fv in feature_view.inputs.values(): + if isinstance(backing_fv, FeatureView): + entities.update(backing_fv.entities) + table.append( + [ + feature_view.name, + entities if len(entities) > 0 else "n/a", + type(feature_view).__name__, + ] + ) from tabulate import tabulate - print(tabulate(table, headers=["NAME", "ENTITIES"], tablefmt="plain")) + print(tabulate(table, headers=["NAME", "ENTITIES", "TYPE"], tablefmt="plain")) @cli.group(name="on-demand-feature-views") diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index f95e199105..03dd2409a7 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -53,14 +53,14 @@ def __init__(self, name, project=None): class RequestDataNotFoundInEntityDfException(FeastObjectNotFoundException): def __init__(self, feature_name, feature_view_name): super().__init__( - f"Feature {feature_name} not found in the entity dataframe, but required by on demand feature view {feature_view_name}" + f"Feature {feature_name} not found in the entity dataframe, but required by feature view {feature_view_name}" ) class RequestDataNotFoundInEntityRowsException(FeastObjectNotFoundException): def __init__(self, feature_names): super().__init__( - f"Required request data source features {feature_names} not found in the entity rows, but required by on demand feature views" + f"Required request data source features {feature_names} not found in the entity rows, but required by feature views" ) @@ -263,9 +263,10 @@ def __init__(self, entity_type: type): class ConflictingFeatureViewNames(Exception): + # TODO: print file location of conflicting feature views def __init__(self, feature_view_name: str): super().__init__( - f"The feature view name: {feature_view_name} refers to both an on-demand feature view and a feature view" + f"The feature view name: {feature_view_name} refers to feature views of different types." ) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 9f8e9af1fb..46afdfff1e 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -3,6 +3,7 @@ from google.protobuf.json_format import MessageToJson +from feast.base_feature_view import BaseFeatureView from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection @@ -59,9 +60,7 @@ def __init__( self.feature_view_projections.append( FeatureViewProjection.from_definition(feature_grouping) ) - elif isinstance(feature_grouping, FeatureView) or isinstance( - feature_grouping, OnDemandFeatureView - ): + elif isinstance(feature_grouping, BaseFeatureView): self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 3fc510eb38..2af349cd3e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import copy +import itertools import os import warnings from collections import Counter, OrderedDict, defaultdict @@ -24,7 +25,7 @@ from tqdm import tqdm from feast import feature_server, flags, flags_helper, utils -from feast.data_source import RequestDataSource +from feast.base_feature_view import BaseFeatureView from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -56,6 +57,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config +from feast.request_feature_view import RequestFeatureView from feast.type_map import python_value_to_proto_value from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage from feast.value_type import ValueType @@ -190,8 +192,25 @@ def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]: """ return self._list_feature_views(allow_cache) + @log_exceptions_and_usage + def list_request_feature_views( + self, allow_cache: bool = False + ) -> List[RequestFeatureView]: + """ + Retrieves the list of feature views from the registry. + + Args: + allow_cache: Whether to allow returning entities from a cached registry. + + Returns: + A list of feature views. + """ + return self._registry.list_request_feature_views( + self.project, allow_cache=allow_cache + ) + def _list_feature_views( - self, allow_cache: bool = False, hide_dummy_entity: bool = True + self, allow_cache: bool = False, hide_dummy_entity: bool = True, ) -> List[FeatureView]: feature_views = [] for fv in self._registry.list_feature_views( @@ -347,8 +366,17 @@ def apply( Entity, FeatureView, OnDemandFeatureView, + RequestFeatureView, FeatureService, - List[Union[FeatureView, OnDemandFeatureView, Entity, FeatureService]], + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + ] + ], ], commit: bool = True, ): @@ -394,6 +422,9 @@ def apply( assert isinstance(objects, list) views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] + request_views_to_update = [ + ob for ob in objects if isinstance(ob, RequestFeatureView) + ] odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] if ( not flags_helper.enable_on_demand_feature_views(self.config) @@ -404,7 +435,9 @@ def apply( if len(odfvs_to_update) > 0: log_event(UsageEvent.APPLY_WITH_ODFV) - _validate_feature_views(views_to_update) + _validate_feature_views( + [*views_to_update, *odfvs_to_update, *request_views_to_update] + ) entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] @@ -425,7 +458,7 @@ def apply( if len(views_to_update) + len(entities_to_update) + len( services_to_update - ) + len(odfvs_to_update) != len(objects): + ) + len(odfvs_to_update) + len(request_views_to_update) != len(objects): raise ValueError("Unknown object type provided as part of apply() call") # DUMMY_ENTITY is a placeholder entity used in entityless FeatureViews @@ -436,12 +469,10 @@ def apply( ) entities_to_update.append(DUMMY_ENTITY) - for view in views_to_update: + for view in itertools.chain( + views_to_update, odfvs_to_update, request_views_to_update + ): self._registry.apply_feature_view(view, project=self.project, commit=False) - for odfv in odfvs_to_update: - self._registry.apply_on_demand_feature_view( - odfv, project=self.project, commit=False - ) for ent in entities_to_update: self._registry.apply_entity(ent, project=self.project, commit=False) for feature_service in services_to_update: @@ -555,38 +586,50 @@ def get_historical_features( ) _feature_refs = self._get_features(features, feature_refs) - all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use( - features - ) + ( + all_feature_views, + all_request_feature_views, + all_on_demand_feature_views, + ) = self._get_feature_views_to_use(features) # TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider. # This is a weird interface quirk - we should revisit the `get_historical_features` to # pass in the on demand feature views as well. - fvs, odfvs = _group_feature_refs( - _feature_refs, all_feature_views, all_on_demand_feature_views + fvs, odfvs, request_fvs, request_fv_refs = _group_feature_refs( + _feature_refs, + all_feature_views, + all_request_feature_views, + all_on_demand_feature_views, ) feature_views = list(view for view, _ in fvs) on_demand_feature_views = list(view for view, _ in odfvs) + request_feature_views = list(view for view, _ in request_fvs) if len(on_demand_feature_views) > 0: log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV) + if len(request_feature_views) > 0: + log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_REQUEST_FV) # Check that the right request data is present in the entity_df if type(entity_df) == pd.DataFrame: entity_pd_df = cast(pd.DataFrame, entity_df) + for fv in request_feature_views: + for feature in fv.features: + if feature.name not in entity_pd_df.columns: + raise RequestDataNotFoundInEntityDfException( + feature_name=feature.name, feature_view_name=fv.name + ) for odfv in on_demand_feature_views: - odfv_inputs = odfv.inputs.values() - for odfv_input in odfv_inputs: - if type(odfv_input) == RequestDataSource: - request_data_source = cast(RequestDataSource, odfv_input) - for feature_name in request_data_source.schema.keys(): - if feature_name not in entity_pd_df.columns: - raise RequestDataNotFoundInEntityDfException( - feature_name=feature_name, - feature_view_name=odfv.name, - ) + odfv_request_data_schema = odfv.get_request_data_schema() + for feature_name in odfv_request_data_schema.keys(): + if feature_name not in entity_pd_df.columns: + raise RequestDataNotFoundInEntityDfException( + feature_name=feature_name, feature_view_name=odfv.name, + ) _validate_feature_refs(_feature_refs, full_feature_names) - + # Drop refs that refer to RequestFeatureViews since they don't need to be fetched and + # already exist in the entity_df + _feature_refs = [ref for ref in _feature_refs if ref not in request_fv_refs] provider = self._get_provider() job = provider.get_historical_features( @@ -633,7 +676,7 @@ def materialize_incremental( ... """ - feature_views_to_materialize = [] + feature_views_to_materialize: List[FeatureView] = [] if feature_views is None: feature_views_to_materialize = self._list_feature_views( hide_dummy_entity=False @@ -725,7 +768,7 @@ def materialize( f"The given start_date {start_date} is greater than the given end_date {end_date}." ) - feature_views_to_materialize = [] + feature_views_to_materialize: List[FeatureView] = [] if feature_views is None: feature_views_to_materialize = self._list_feature_views( hide_dummy_entity=False @@ -816,16 +859,30 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use( + ( + all_feature_views, + all_request_feature_views, + all_on_demand_feature_views, + ) = self._get_feature_views_to_use( features=features, allow_cache=True, hide_dummy_entity=False ) _validate_feature_refs(_feature_refs, full_feature_names) - grouped_refs, grouped_odfv_refs = _group_feature_refs( - _feature_refs, all_feature_views, all_on_demand_feature_views + ( + grouped_refs, + grouped_odfv_refs, + grouped_request_fv_refs, + _, + ) = _group_feature_refs( + _feature_refs, + all_feature_views, + all_request_feature_views, + all_on_demand_feature_views, ) if len(grouped_odfv_refs) > 0: log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV) + if len(grouped_request_fv_refs) > 0: + log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_REQUEST_FV) feature_views = list(view for view, _ in grouped_refs) entityless_case = DUMMY_ENTITY_NAME in [ @@ -854,8 +911,8 @@ def get_online_features( ) entity_name_to_join_key_map[entity_name] = join_key - needed_request_data_features = self._get_needed_request_data_features( - grouped_odfv_refs + needed_request_data, needed_request_fv_features = self.get_needed_request_data( + grouped_odfv_refs, grouped_request_fv_refs ) join_key_rows = [] @@ -865,7 +922,10 @@ def get_online_features( join_key_row = {} for entity_name, entity_value in row.items(): # Found request data - if entity_name in needed_request_data_features: + if ( + entity_name in needed_request_data + or entity_name in needed_request_fv_features + ): if entity_name not in request_data_features: request_data_features[entity_name] = [] request_data_features[entity_name].append(entity_value) @@ -881,10 +941,9 @@ def get_online_features( # May be empty if this entity row was request data join_key_rows.append(join_key_row) - if len(needed_request_data_features) != len(request_data_features.keys()): - raise RequestDataNotFoundInEntityRowsException( - feature_names=needed_request_data_features - ) + self.ensure_request_data_values_exist( + needed_request_data, needed_request_fv_features, request_data_features + ) entity_row_proto_list = _infer_online_entity_rows(join_key_rows) @@ -934,6 +993,41 @@ def get_online_features( result_rows, ) + def get_needed_request_data( + self, + grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], + grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], + ) -> Tuple[Set[str], Set[str]]: + needed_request_data: Set[str] = set() + needed_request_fv_features: Set[str] = set() + for odfv, _ in grouped_odfv_refs: + odfv_request_data_schema = odfv.get_request_data_schema() + needed_request_data.update(odfv_request_data_schema.keys()) + for request_fv, _ in grouped_request_fv_refs: + for feature in request_fv.features: + needed_request_fv_features.add(feature.name) + return needed_request_data, needed_request_fv_features + + def ensure_request_data_values_exist( + self, + needed_request_data: Set[str], + needed_request_fv_features: Set[str], + request_data_features: Dict[str, List[Any]], + ): + if len(needed_request_data) + len(needed_request_fv_features) != len( + request_data_features.keys() + ): + missing_features = [ + x + for x in itertools.chain( + needed_request_data, needed_request_fv_features + ) + if x not in request_data_features + ] + raise RequestDataNotFoundInEntityRowsException( + feature_names=missing_features + ) + def _populate_result_rows_from_feature_view( self, table_join_keys: List[str], @@ -983,18 +1077,24 @@ def _populate_result_rows_from_feature_view( feature_ref ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT - def _get_needed_request_data_features(self, grouped_odfv_refs) -> Set[str]: + def _get_needed_request_data_features( + self, + grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], + grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], + ) -> Set[str]: needed_request_data_features = set() for odfv_to_feature_names in grouped_odfv_refs: odfv, requested_feature_names = odfv_to_feature_names - odfv_inputs = odfv.inputs.values() - for odfv_input in odfv_inputs: - if type(odfv_input) == RequestDataSource: - request_data_source = cast(RequestDataSource, odfv_input) - for feature_name in request_data_source.schema.keys(): - needed_request_data_features.add(feature_name) + odfv_request_data_schema = odfv.get_request_data_schema() + for feature_name in odfv_request_data_schema.keys(): + needed_request_data_features.add(feature_name) + for request_fv_to_feature_names in grouped_request_fv_refs: + request_fv, requested_feature_names = request_fv_to_feature_names + for fv in request_fv.features: + needed_request_data_features.add(fv.name) return needed_request_data_features + # TODO(adchia): remove request data, which isn't part of the feature_refs def _augment_response_with_on_demand_transforms( self, feature_refs: List[str], @@ -1049,13 +1149,20 @@ def _get_feature_views_to_use( features: Optional[Union[List[str], FeatureService]], allow_cache=False, hide_dummy_entity: bool = True, - ) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]: + ) -> Tuple[List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView]]: fvs = { fv.name: fv for fv in self._list_feature_views(allow_cache, hide_dummy_entity) } + request_fvs = { + fv.name: fv + for fv in self._registry.list_request_feature_views( + project=self.project, allow_cache=allow_cache + ) + } + od_fvs = { fv.name: fv for fv in self._registry.list_on_demand_feature_views( @@ -1064,7 +1171,7 @@ def _get_feature_views_to_use( } if isinstance(features, FeatureService): - fvs_to_use, od_fvs_to_use = [], [] + fvs_to_use, request_fvs_to_use, od_fvs_to_use = [], [], [] for fv_name, projection in [ (projection.name, projection) for projection in features.feature_view_projections @@ -1073,6 +1180,10 @@ def _get_feature_views_to_use( fvs_to_use.append( fvs[fv_name].with_projection(copy.copy(projection)) ) + elif fv_name in request_fvs: + request_fvs_to_use.append( + request_fvs[fv_name].with_projection(copy.copy(projection)) + ) elif fv_name in od_fvs: od_fvs_to_use.append( od_fvs[fv_name].with_projection(copy.copy(projection)) @@ -1083,9 +1194,13 @@ def _get_feature_views_to_use( f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" f'{fv_name} and that you have registered it by running "apply".' ) - views_to_use = (fvs_to_use, od_fvs_to_use) + views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use) else: - views_to_use = ([*fvs.values()], [*od_fvs.values()]) + views_to_use = ( + [*fvs.values()], + [*request_fvs.values()], + [*od_fvs.values()], + ) return views_to_use @@ -1160,15 +1275,24 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( features: List[str], all_feature_views: List[FeatureView], + all_request_feature_views: List[RequestFeatureView], all_on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[ - List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]] + List[Tuple[FeatureView, List[str]]], + List[Tuple[OnDemandFeatureView, List[str]]], + List[Tuple[RequestFeatureView, List[str]]], + Set[str], ]: """ Get list of feature views and corresponding feature names based on feature references""" # view name to view proto view_index = {view.projection.name_to_use(): view for view in all_feature_views} + # request view name to proto + request_view_index = { + view.projection.name_to_use(): view for view in all_request_feature_views + } + # on demand view to on demand view proto on_demand_view_index = { view.projection.name_to_use(): view for view in all_on_demand_feature_views @@ -1176,6 +1300,8 @@ def _group_feature_refs( # view name to feature names views_features = defaultdict(list) + request_views_features = defaultdict(list) + request_view_refs = set() # on demand view name to feature names on_demand_view_features = defaultdict(list) @@ -1186,17 +1312,23 @@ def _group_feature_refs( views_features[view_name].append(feat_name) elif view_name in on_demand_view_index: on_demand_view_features[view_name].append(feat_name) + elif view_name in request_view_index: + request_views_features[view_name].append(feat_name) + request_view_refs.add(ref) else: raise FeatureViewNotFoundException(view_name) fvs_result: List[Tuple[FeatureView, List[str]]] = [] odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = [] + request_fvs_result: List[Tuple[RequestFeatureView, List[str]]] = [] for view_name, feature_names in views_features.items(): fvs_result.append((view_index[view_name], feature_names)) + for view_name, feature_names in request_views_features.items(): + request_fvs_result.append((request_view_index[view_name], feature_names)) for view_name, feature_names in on_demand_view_features.items(): odfvs_result.append((on_demand_view_index[view_name], feature_names)) - return fvs_result, odfvs_result + return fvs_result, odfvs_result, request_fvs_result, request_view_refs def _get_table_entity_keys( @@ -1257,7 +1389,7 @@ def _print_materialization_log( ) -def _validate_feature_views(feature_views: List[FeatureView]): +def _validate_feature_views(feature_views: List[BaseFeatureView]): """ Verify feature views have case-insensitively unique names""" fv_names = set() for fv in feature_views: diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 90a80a9233..a8186991cd 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -15,12 +15,12 @@ import re import warnings from datetime import datetime, timedelta -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Type, Union from google.protobuf.duration_pb2 import Duration -from google.protobuf.json_format import MessageToJson from feast import utils +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature import Feature @@ -47,7 +47,7 @@ DUMMY_ENTITY_VAL = "" -class FeatureView: +class FeatureView(BaseFeatureView): """ A FeatureView defines a logical grouping of serveable features. @@ -67,9 +67,7 @@ class FeatureView: FeatureViews. """ - name: str entities: List[str] - features: List[Feature] tags: Optional[Dict[str, str]] ttl: timedelta online: bool @@ -79,7 +77,6 @@ class FeatureView: created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None materialization_intervals: List[Tuple[datetime, datetime]] - projection: FeatureViewProjection @log_exceptions def __init__( @@ -123,10 +120,9 @@ def __init__( f"Entity or Feature name." ) - self.name = name + super().__init__(name, _features) self.entities = entities if entities else [DUMMY_ENTITY_NAME] - self.features = _features - self.tags = tags if tags else {} + self.tags = tags if tags is not None else {} if isinstance(ttl, Duration): self.ttl = timedelta(seconds=int(ttl.seconds)) @@ -143,17 +139,9 @@ def __init__( self.created_timestamp: Optional[datetime] = None self.last_updated_timestamp: Optional[datetime] = None - self.projection = FeatureViewProjection.from_definition(self) - - def __repr__(self): - items = (f"{k} = {v}" for k, v in self.__dict__.items()) - return f"<{self.__class__.__name__}({', '.join(items)})>" - - def __str__(self): - return str(MessageToJson(self.to_proto())) - + # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): - return hash((id(self), self.name)) + return super().__hash__() def __copy__(self): fv = FeatureView( @@ -170,28 +158,17 @@ def __copy__(self): fv.projection = copy.copy(self.projection) return fv - def __getitem__(self, item): - assert isinstance(item, list) - - referenced_features = [] - for feature in self.features: - if feature.name in item: - referenced_features.append(feature) - - cp = self.__copy__() - cp.projection.features = referenced_features - - return cp - def __eq__(self, other): if not isinstance(other, FeatureView): raise TypeError( "Comparisons should only involve FeatureView class objects." ) + if not super().__eq__(other): + return False + if ( self.tags != other.tags - or self.name != other.name or self.ttl != other.ttl or self.online != other.online ): @@ -199,8 +176,6 @@ def __eq__(self, other): if sorted(self.entities) != sorted(other.entities): return False - if sorted(self.features) != sorted(other.features): - return False if self.batch_source != other.batch_source: return False if self.stream_source != other.stream_source: @@ -208,19 +183,22 @@ def __eq__(self, other): return True - def is_valid(self): + def ensure_valid(self): """ Validates the state of this feature view locally. Raises: ValueError: The feature view does not have a name or does not have entities. """ - if not self.name: - raise ValueError("Feature view needs a name.") + super().ensure_valid() if not self.entities: raise ValueError("Feature view has no entities.") + @property + def proto_class(self) -> Type[FeatureViewProto]: + return FeatureViewProto + def with_name(self, name: str): """ Renames this feature view by returning a copy of this feature view with an alias diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 4fa108d86b..48c70f9d6c 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -44,6 +44,7 @@ def to_df(self) -> pd.DataFrame: if self.on_demand_feature_views is None: return features_df + # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: features_df = features_df.join( odfv.get_transformed_features_df(self.full_feature_names, features_df) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 0516346986..95832fa143 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -1,12 +1,13 @@ import copy import functools from types import MethodType -from typing import Dict, List, Union, cast +from typing import Dict, List, Type, Union import dill import pandas as pd from feast import errors +from feast.base_feature_view import BaseFeatureView from feast.data_source import RequestDataSource from feast.errors import RegistryInferenceFailure from feast.feature import Feature @@ -30,7 +31,7 @@ from feast.value_type import ValueType -class OnDemandFeatureView: +class OnDemandFeatureView(BaseFeatureView): """ [Experimental] An OnDemandFeatureView defines on demand transformations on existing feature view values and request data. @@ -42,11 +43,11 @@ class OnDemandFeatureView: udf: User defined transformation function that takes as input pandas dataframes """ - name: str - features: List[Feature] + # TODO(adchia): remove inputs from proto and declaration inputs: Dict[str, Union[FeatureView, RequestDataSource]] + input_feature_views: Dict[str, FeatureView] + input_request_data_sources: Dict[str, RequestDataSource] udf: MethodType - projection: FeatureViewProjection @log_exceptions def __init__( @@ -59,15 +60,21 @@ def __init__( """ Creates an OnDemandFeatureView object. """ - - self.name = name - self.features = features + super().__init__(name, features) self.inputs = inputs + self.input_feature_views = {} + self.input_request_data_sources = {} + for input_ref, odfv_input in inputs.items(): + if isinstance(odfv_input, RequestDataSource): + self.input_request_data_sources[input_ref] = odfv_input + else: + self.input_feature_views[input_ref] = odfv_input + self.udf = udf - self.projection = FeatureViewProjection.from_definition(self) - def __hash__(self) -> int: - return hash((id(self), self.name)) + @property + def proto_class(self) -> Type[OnDemandFeatureViewProto]: + return OnDemandFeatureViewProto def __copy__(self): fv = OnDemandFeatureView( @@ -76,70 +83,6 @@ def __copy__(self): fv.projection = copy.copy(self.projection) return fv - def __getitem__(self, item): - assert isinstance(item, list) - - referenced_features = [] - for feature in self.features: - if feature.name in item: - referenced_features.append(feature) - - cp = self.__copy__() - cp.projection.features = referenced_features - - return cp - - def with_name(self, name: str): - """ - Renames this on-demand feature view by returning a copy of this feature view with an alias - set for the feature view name. This rename operation is only used as part of query - operations and will not modify the underlying OnDemandFeatureView. - - Args: - name: Name to assign to the OnDemandFeatureView copy. - - Returns: - A copy of this OnDemandFeatureView with the name replaced with the 'name' input. - """ - cp = self.__copy__() - cp.projection.name_alias = name - - return cp - - def with_projection(self, feature_view_projection: FeatureViewProjection): - """ - Sets the feature view projection by returning a copy of this on-demand feature view - with its projection set to the given projection. A projection is an - object that stores the modifications to a feature view that is used during - query operations. - - Args: - feature_view_projection: The FeatureViewProjection object to link to this - OnDemandFeatureView. - - Returns: - A copy of this OnDemandFeatureView with its projection replaced with the - 'feature_view_projection' argument. - """ - if feature_view_projection.name != self.name: - raise ValueError( - f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. " - f"The projection is named {feature_view_projection.name} and the name indicates which " - "FeatureView the projection is for." - ) - - for feature in feature_view_projection.features: - if feature not in self.features: - raise ValueError( - f"The projection for {self.name} cannot be applied because it contains {feature.name} which the " - "FeatureView doesn't have." - ) - - cp = self.__copy__() - cp.projection = feature_view_projection - - return cp - def to_proto(self) -> OnDemandFeatureViewProto: """ Converts an on demand feature view object to its protobuf representation. @@ -148,15 +91,12 @@ def to_proto(self) -> OnDemandFeatureViewProto: A OnDemandFeatureViewProto protobuf. """ inputs = {} - for feature_ref, input in self.inputs.items(): - if type(input) == FeatureView: - fv = cast(FeatureView, input) - inputs[feature_ref] = OnDemandInput(feature_view=fv.to_proto()) - else: - request_data_source = cast(RequestDataSource, input) - inputs[feature_ref] = OnDemandInput( - request_data_source=request_data_source.to_proto() - ) + for input_ref, fv in self.input_feature_views.items(): + inputs[input_ref] = OnDemandInput(feature_view=fv.to_proto()) + for input_ref, request_data_source in self.input_request_data_sources.items(): + inputs[input_ref] = OnDemandInput( + request_data_source=request_data_source.to_proto() + ) spec = OnDemandFeatureViewSpec( name=self.name, @@ -217,6 +157,12 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): return on_demand_feature_view_obj + def get_request_data_schema(self) -> Dict[str, ValueType]: + schema: Dict[str, ValueType] = {} + for request_data_source in self.input_request_data_sources.values(): + schema.update(request_data_source.schema) + return schema + def get_transformed_features_df( self, full_feature_names: bool, df_with_features: pd.DataFrame ) -> pd.DataFrame: @@ -225,10 +171,7 @@ def get_transformed_features_df( # Copy over un-prefixed features even if not requested since transform may need it columns_to_cleanup = [] if full_feature_names: - for input in self.inputs.values(): - if type(input) != FeatureView: - continue - input_fv = cast(FeatureView, input) + for input_fv in self.input_feature_views.values(): for feature in input_fv.features: full_feature_ref = f"{input_fv.name}__{feature.name}" if full_feature_ref in df_with_features.keys(): @@ -248,25 +191,19 @@ def infer_features(self): """ Infers the set of features associated to this feature view from the input source. - Args: - config: Configuration object used to configure the feature store. - Raises: RegistryInferenceFailure: The set of features could not be inferred. """ df = pd.DataFrame() - for input in self.inputs.values(): - if type(input) == FeatureView: - feature_view = cast(FeatureView, input) - for feature in feature_view.features: - dtype = feast_value_type_to_pandas_type(feature.dtype) - df[f"{feature_view.name}__{feature.name}"] = pd.Series(dtype=dtype) - df[f"{feature.name}"] = pd.Series(dtype=dtype) - else: - request_data = cast(RequestDataSource, input) - for feature_name, feature_type in request_data.schema.items(): - dtype = feast_value_type_to_pandas_type(feature_type) - df[f"{feature_name}"] = pd.Series(dtype=dtype) + for feature_view in self.input_feature_views.values(): + for feature in feature_view.features: + dtype = feast_value_type_to_pandas_type(feature.dtype) + df[f"{feature_view.name}__{feature.name}"] = pd.Series(dtype=dtype) + df[f"{feature.name}"] = pd.Series(dtype=dtype) + for request_data in self.input_request_data_sources.values(): + for feature_name, feature_type in request_data.schema.items(): + dtype = feast_value_type_to_pandas_type(feature_type) + df[f"{feature_name}"] = pd.Series(dtype=dtype) output_df: pd.DataFrame = self.udf.__call__(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 0681ecb679..ba1397a44e 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -14,10 +14,14 @@ from datetime import datetime, timedelta from pathlib import Path -from typing import List, Optional, Set +from typing import Dict, List, Optional from urllib.parse import urlparse +from google.protobuf.internal.containers import RepeatedCompositeFieldContainer +from proto import Message + from feast import importer +from feast.base_feature_view import BaseFeatureView from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, @@ -33,6 +37,7 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.repo_config import RegistryConfig +from feast.request_feature_view import RequestFeatureView REGISTRY_SCHEMA_VERSION = "1" @@ -290,7 +295,7 @@ def apply_feature_table( self.commit() def apply_feature_view( - self, feature_view: FeatureView, project: str, commit: bool = True + self, feature_view: BaseFeatureView, project: str, commit: bool = True ): """ Registers a single feature view with Feast @@ -300,74 +305,46 @@ def apply_feature_view( project: Feast project that this feature view belongs to commit: Whether the change should be persisted immediately """ - feature_view.is_valid() + feature_view.ensure_valid() feature_view_proto = feature_view.to_proto() feature_view_proto.spec.project = project self._prepare_registry_for_changes() assert self.cached_registry_proto - if feature_view.name in self._get_existing_on_demand_feature_view_names(): - raise ConflictingFeatureViewNames(feature_view.name) + self._check_conflicting_feature_view_names(feature_view) + existing_feature_views_of_same_type: RepeatedCompositeFieldContainer + if isinstance(feature_view, FeatureView): + existing_feature_views_of_same_type = ( + self.cached_registry_proto.feature_views + ) + elif isinstance(feature_view, OnDemandFeatureView): + existing_feature_views_of_same_type = ( + self.cached_registry_proto.on_demand_feature_views + ) + elif isinstance(feature_view, RequestFeatureView): + existing_feature_views_of_same_type = ( + self.cached_registry_proto.request_feature_views + ) + else: + raise ValueError(f"Unexpected feature view type: {type(feature_view)}") for idx, existing_feature_view_proto in enumerate( - self.cached_registry_proto.feature_views + existing_feature_views_of_same_type ): if ( existing_feature_view_proto.spec.name == feature_view_proto.spec.name and existing_feature_view_proto.spec.project == project - ): - if FeatureView.from_proto(existing_feature_view_proto) == feature_view: - return - else: - del self.cached_registry_proto.feature_views[idx] - break - - self.cached_registry_proto.feature_views.append(feature_view_proto) - if commit: - self.commit() - - def apply_on_demand_feature_view( - self, - on_demand_feature_view: OnDemandFeatureView, - project: str, - commit: bool = True, - ): - """ - Registers a single on demand feature view with Feast - - Args: - on_demand_feature_view: Feature view that will be registered - project: Feast project that this feature view belongs to - commit: Whether the change should be persisted immediately - """ - on_demand_feature_view_proto = on_demand_feature_view.to_proto() - on_demand_feature_view_proto.spec.project = project - self._prepare_registry_for_changes() - assert self.cached_registry_proto - - if on_demand_feature_view.name in self._get_existing_feature_view_names(): - raise ConflictingFeatureViewNames(on_demand_feature_view.name) - - for idx, existing_feature_view_proto in enumerate( - self.cached_registry_proto.on_demand_feature_views - ): - if ( - existing_feature_view_proto.spec.name - == on_demand_feature_view_proto.spec.name - and existing_feature_view_proto.spec.project == project ): if ( - OnDemandFeatureView.from_proto(existing_feature_view_proto) - == on_demand_feature_view + feature_view.__class__.from_proto(existing_feature_view_proto) + == feature_view ): return else: - del self.cached_registry_proto.on_demand_feature_views[idx] + del self.cached_registry_proto.feature_views[idx] break - self.cached_registry_proto.on_demand_feature_views.append( - on_demand_feature_view_proto - ) + existing_feature_views_of_same_type.append(feature_view_proto) if commit: self.commit() @@ -487,18 +464,40 @@ def list_feature_views( Args: allow_cache: Allow returning feature views from the cached registry - project: Filter feature tables based on project name + project: Filter feature views based on project name Returns: List of feature views """ registry_proto = self._get_registry_proto(allow_cache=allow_cache) - feature_views = [] + feature_views: List[FeatureView] = [] for feature_view_proto in registry_proto.feature_views: if feature_view_proto.spec.project == project: feature_views.append(FeatureView.from_proto(feature_view_proto)) return feature_views + def list_request_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[RequestFeatureView]: + """ + Retrieve a list of request feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature views based on project name + + Returns: + List of feature views + """ + registry_proto = self._get_registry_proto(allow_cache=allow_cache) + feature_views: List[RequestFeatureView] = [] + for request_feature_view_proto in registry_proto.request_feature_views: + if request_feature_view_proto.spec.project == project: + feature_views.append( + RequestFeatureView.from_proto(request_feature_view_proto) + ) + return feature_views + def get_feature_table(self, name: str, project: str) -> FeatureTable: """ Retrieves a feature table. @@ -616,6 +615,18 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): self.commit() return + for idx, existing_request_feature_view_proto in enumerate( + self.cached_registry_proto.request_feature_views + ): + if ( + existing_request_feature_view_proto.spec.name == name + and existing_request_feature_view_proto.spec.project == project + ): + del self.cached_registry_proto.request_feature_views[idx] + if commit: + self.commit() + return + raise FeatureViewNotFoundException(name, project) def commit(self): @@ -675,15 +686,22 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: self.cache_being_updated = False return registry_proto - def _get_existing_feature_view_names(self) -> Set[str]: - assert self.cached_registry_proto - return set([fv.spec.name for fv in self.cached_registry_proto.feature_views]) + def _check_conflicting_feature_view_names(self, feature_view: BaseFeatureView): + name_to_fv_protos = self._existing_feature_view_names_to_fvs() + if feature_view.name in name_to_fv_protos: + if not isinstance( + name_to_fv_protos.get(feature_view.name), feature_view.proto_class + ): + raise ConflictingFeatureViewNames(feature_view.name) - def _get_existing_on_demand_feature_view_names(self) -> Set[str]: + def _existing_feature_view_names_to_fvs(self) -> Dict[str, Message]: assert self.cached_registry_proto - return set( - [ - odfv.spec.name - for odfv in self.cached_registry_proto.on_demand_feature_views - ] - ) + odfvs = { + fv.spec.name: fv + for fv in self.cached_registry_proto.on_demand_feature_views + } + fvs = {fv.spec.name: fv for fv in self.cached_registry_proto.feature_views} + request_fvs = { + fv.spec.name: fv for fv in self.cached_registry_proto.request_feature_views + } + return {**odfvs, **fvs, **request_fvs} diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 9378cbdf31..39ab8f21f4 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -5,12 +5,13 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Tuple, Union +from typing import List, NamedTuple, Set, Tuple, Union, cast import click from click.exceptions import BadParameter from feast import Entity, FeatureTable +from feast.base_feature_view import BaseFeatureView from feast.feature_service import FeatureService from feast.feature_store import FeatureStore, _validate_feature_views from feast.feature_view import FeatureView @@ -19,6 +20,7 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry from feast.repo_config import RepoConfig +from feast.request_feature_view import RequestFeatureView from feast.usage import log_exceptions_and_usage @@ -34,6 +36,7 @@ class ParsedRepo(NamedTuple): feature_tables: Set[FeatureTable] feature_views: Set[FeatureView] on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] entities: Set[Entity] feature_services: Set[FeatureService] @@ -99,6 +102,7 @@ def parse_repo(repo_root: Path) -> ParsedRepo: feature_views=set(), feature_services=set(), on_demand_feature_views=set(), + request_feature_views=set(), ) for repo_file in get_repo_files(repo_root): @@ -116,6 +120,8 @@ def parse_repo(repo_root: Path) -> ParsedRepo: res.feature_services.add(obj) elif isinstance(obj, OnDemandFeatureView): res.on_demand_feature_views.add(obj) + elif isinstance(obj, RequestFeatureView): + res.request_feature_views.add(obj) return res @@ -136,7 +142,13 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) - _validate_feature_views(list(repo.feature_views)) + _validate_feature_views( + [ + *list(repo.feature_views), + *list(repo.on_demand_feature_views), + *list(repo.request_feature_views), + ] + ) if not skip_source_validation: data_sources = [t.batch_source for t in repo.feature_views] @@ -193,7 +205,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation # Add / update views + entities + services all_to_apply: List[ - Union[Entity, FeatureView, FeatureService, OnDemandFeatureView] + Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] ] = [] all_to_apply.extend(entities_to_keep) all_to_apply.extend(views_to_keep) @@ -225,13 +237,19 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) infra_provider = get_provider(repo_config, repo_path) + views_to_keep_in_infra = [ + view for view in views_to_keep if isinstance(view, FeatureView) + ] for name in [view.name for view in repo.feature_tables] + [ - table.name for table in repo.feature_views + table.name for table in views_to_keep_in_infra ]: click.echo( f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" ) - for name in [view.name for view in views_to_delete] + [ + views_to_delete_from_infra = [ + view for view in views_to_delete if isinstance(view, FeatureView) + ] + for name in [view.name for view in views_to_delete_from_infra] + [ table.name for table in tables_to_delete ]: click.echo( @@ -241,10 +259,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation all_to_delete: List[Union[FeatureTable, FeatureView]] = [] all_to_delete.extend(tables_to_delete) - all_to_delete.extend(views_to_delete) + all_to_delete.extend(views_to_delete_from_infra) all_to_keep: List[Union[FeatureTable, FeatureView]] = [] all_to_keep.extend(tables_to_keep) - all_to_keep.extend(views_to_delete) + all_to_keep.extend(views_to_keep_in_infra) infra_provider.update_infra( project, tables_to_delete=all_to_delete, @@ -272,9 +290,11 @@ def _tag_registry_entities_for_keep_delete( def _tag_registry_views_for_keep_delete( project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureView], Set[FeatureView]]: - views_to_keep: Set[FeatureView] = repo.feature_views - views_to_delete: Set[FeatureView] = set() +) -> Tuple[Set[BaseFeatureView], Set[BaseFeatureView]]: + views_to_keep: Set[BaseFeatureView] = cast(Set[BaseFeatureView], repo.feature_views) + for request_fv in repo.request_feature_views: + views_to_keep.add(request_fv) + views_to_delete: Set[BaseFeatureView] = set() repo_feature_view_names = set(t.name for t in repo.feature_views) for registry_view in registry.list_feature_views(project=project): if registry_view.name not in repo_feature_view_names: diff --git a/sdk/python/feast/request_feature_view.py b/sdk/python/feast/request_feature_view.py new file mode 100644 index 0000000000..5d716f2f8d --- /dev/null +++ b/sdk/python/feast/request_feature_view.py @@ -0,0 +1,91 @@ +import copy +from typing import Type + +from feast.base_feature_view import BaseFeatureView +from feast.data_source import RequestDataSource +from feast.feature import Feature +from feast.feature_view_projection import FeatureViewProjection +from feast.protos.feast.core.RequestFeatureView_pb2 import ( + RequestFeatureView as RequestFeatureViewProto, +) +from feast.protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec +from feast.usage import log_exceptions + + +class RequestFeatureView(BaseFeatureView): + """ + [Experimental] An RequestFeatureView defines a feature that is available from the inference request. + + Args: + name: Name of the group of features. + request_data_source: Request data source that specifies the schema and features + """ + + request_data_source: RequestDataSource + + @log_exceptions + def __init__( + self, name: str, request_data_source: RequestDataSource, + ): + """ + Creates an RequestFeatureView object. + """ + super().__init__( + name=name, + features=[ + Feature(name=name, dtype=dtype) + for name, dtype in request_data_source.schema.items() + ], + ) + self.request_data_source = request_data_source + + @property + def proto_class(self) -> Type[RequestFeatureViewProto]: + return RequestFeatureViewProto + + def to_proto(self) -> RequestFeatureViewProto: + """ + Converts an request feature view object to its protobuf representation. + + Returns: + A RequestFeatureViewProto protobuf. + """ + spec = RequestFeatureViewSpec( + name=self.name, request_data_source=self.request_data_source.to_proto() + ) + + return RequestFeatureViewProto(spec=spec) + + @classmethod + def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto): + """ + Creates a request feature view from a protobuf representation. + + Args: + request_feature_view_proto: A protobuf representation of an request feature view. + + Returns: + A RequestFeatureView object based on the request feature view protobuf. + """ + + request_feature_view_obj = cls( + name=request_feature_view_proto.spec.name, + request_data_source=RequestDataSource.from_proto( + request_feature_view_proto.spec.request_data_source + ), + ) + + # FeatureViewProjections are not saved in the RequestFeatureView proto. + # Create the default projection. + request_feature_view_obj.projection = FeatureViewProjection.from_definition( + request_feature_view_obj + ) + + return request_feature_view_obj + + def __copy__(self): + fv = RequestFeatureView( + name=self.name, request_data_source=self.request_data_source + ) + fv.projection = copy.copy(self.projection) + return fv diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index 855f622fa7..1a0a89471d 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -47,6 +47,8 @@ class UsageEvent(enum.Enum): APPLY_WITH_ODFV = 1 GET_HISTORICAL_FEATURES_WITH_ODFV = 2 GET_ONLINE_FEATURES_WITH_ODFV = 3 + GET_HISTORICAL_FEATURES_WITH_REQUEST_FV = 4 + GET_ONLINE_FEATURES_WITH_REQUEST_FV = 5 def __str__(self): return self.name.lower() diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index b98de8a142..c51585823f 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -29,6 +29,7 @@ conv_rate_plus_100_feature_view, create_conv_rate_request_data_source, create_customer_daily_profile_feature_view, + create_driver_age_request_feature_view, create_driver_hourly_stats_feature_view, create_global_stats_feature_view, create_location_stats_feature_view, @@ -222,6 +223,7 @@ def construct_universal_feature_views( "input_request": create_conv_rate_request_data_source(), } ), + "driver_age_request_fv": create_driver_age_request_feature_view(), "order": create_order_feature_view(data_sources["orders"]), "location": create_location_stats_feature_view(data_sources["location"]), } diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index f68d1a6555..b566fa4609 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -5,6 +5,7 @@ from feast import Feature, FeatureView, OnDemandFeatureView, ValueType from feast.data_source import DataSource, RequestDataSource +from feast.request_feature_view import RequestFeatureView def driver_feature_view( @@ -63,6 +64,15 @@ def conv_rate_plus_100_feature_view( ) +def create_driver_age_request_feature_view(): + return RequestFeatureView( + name="driver_age", + request_data_source=RequestDataSource( + name="driver_age_source", schema={"driver_age": ValueType.INT32} + ), + ) + + def create_conv_rate_request_data_source(): return RequestDataSource( name="conv_rate_input", schema={"val_to_add": ValueType.INT32} diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index e6194cb012..b558bbe3e8 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -256,19 +256,35 @@ def test_historical_features(environment, universal_data_sources, full_feature_n entity_df_with_request_data["val_to_add"] = [ i for i in range(len(entity_df_with_request_data)) ] + entity_df_with_request_data["driver_age"] = [ + i + 100 for i in range(len(entity_df_with_request_data)) + ] - customer_fv, driver_fv, driver_odfv, location_fv, order_fv, global_fv = ( + ( + customer_fv, + driver_fv, + driver_odfv, + location_fv, + order_fv, + global_fv, + driver_age_request_fv, + ) = ( feature_views["customer"], feature_views["driver"], feature_views["driver_odfv"], feature_views["location"], feature_views["order"], feature_views["global"], + feature_views["driver_age_request_fv"], ) feature_service = FeatureService( name="convrate_plus100", - features=[feature_views["driver"][["conv_rate"]], feature_views["driver_odfv"]], + features=[ + feature_views["driver"][["conv_rate"]], + driver_odfv, + driver_age_request_fv, + ], ) feature_service_entity_mapping = FeatureService( name="entity_mapping", @@ -291,6 +307,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n location_fv, order_fv, global_fv, + driver_age_request_fv, driver(), customer(), location(), @@ -356,7 +373,12 @@ def test_historical_features(environment, universal_data_sources, full_feature_n # Not requesting the on demand transform with an entity_df query (can't add request data in them) expected_df_query = expected_df.drop( - columns=["conv_rate_plus_100", "val_to_add", "conv_rate_plus_val_to_add"] + columns=[ + "conv_rate_plus_100", + "val_to_add", + "conv_rate_plus_val_to_add", + "driver_age", + ] ) assert sorted(expected_df_query.columns) == sorted( actual_df_from_sql_entities.columns @@ -408,6 +430,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", + "driver_age:driver_age", ], full_feature_names=full_feature_names, ) @@ -483,6 +506,22 @@ def test_historical_features(environment, universal_data_sources, full_feature_n ], full_feature_names=full_feature_names, ) + # If request data is missing that's needed for a request feature view, throw an error + with pytest.raises(RequestDataNotFoundInEntityDfException): + store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips", + "customer_profile:current_balance", + "customer_profile:avg_passenger_count", + "customer_profile:lifetime_trip_count", + "driver_age:driver_age", + "global_stats:num_rides", + "global_stats:avg_ride_length", + ], + full_feature_names=full_feature_names, + ) @pytest.mark.integration @@ -624,6 +663,7 @@ def assert_feature_service_correctness( "customer_id", response_feature_name("conv_rate", full_feature_names), "conv_rate_plus_100", + "driver_age", ] ] actual_df_from_df_entities = ( diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 381635061e..b9824ae163 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -32,7 +32,11 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name feature_service = FeatureService( "convrate_plus100", - features=[feature_views["driver"][["conv_rate"]], feature_views["driver_odfv"]], + features=[ + feature_views["driver"][["conv_rate"]], + feature_views["driver_odfv"], + feature_views["driver_age_request_fv"], + ], ) feature_service_entity_mapping = FeatureService( name="entity_mapping", @@ -95,7 +99,7 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name global_df = datasets["global"] entity_rows = [ - {"driver": d, "customer_id": c, "val_to_add": 50} + {"driver": d, "customer_id": c, "val_to_add": 50, "driver_age": 25} for (d, c) in zip(sample_drivers, sample_customers) ] @@ -110,6 +114,7 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", + "driver_age:driver_age", ] unprefixed_feature_refs = [f.rsplit(":", 1)[-1] for f in feature_refs if ":" in f] # Remove the on demand feature view output features, since they're not present in the source dataframe @@ -129,7 +134,8 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name len(keys) == len(feature_refs) + 3 ) # Add three for the driver id and the customer id entity keys + val_to_add request data. for feature in feature_refs: - if full_feature_names: + # full_feature_names does not apply to request feature views + if full_feature_names and feature != "driver_age:driver_age": assert feature.replace(":", "__") in keys else: assert feature.rsplit(":", 1)[-1] in keys @@ -178,12 +184,14 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name # Check what happens for missing values missing_responses_dict = fs.get_online_features( features=feature_refs, - entity_rows=[{"driver": 0, "customer_id": 0, "val_to_add": 100}], + entity_rows=[ + {"driver": 0, "customer_id": 0, "val_to_add": 100, "driver_age": 125} + ], full_feature_names=full_feature_names, ).to_dict() assert missing_responses_dict is not None for unprefixed_feature_ref in unprefixed_feature_refs: - if unprefixed_feature_ref not in {"num_rides", "avg_ride_length"}: + if unprefixed_feature_ref not in {"num_rides", "avg_ride_length", "driver_age"}: tc.assertIsNone( missing_responses_dict[ response_feature_name(unprefixed_feature_ref, full_feature_names) @@ -198,6 +206,14 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name full_feature_names=full_feature_names, ).to_dict() + # Also with request data + with pytest.raises(RequestDataNotFoundInEntityRowsException): + fs.get_online_features( + features=feature_refs, + entity_rows=[{"driver": 0, "customer_id": 0, "val_to_add": 20}], + full_feature_names=full_feature_names, + ).to_dict() + assert_feature_service_correctness( fs, feature_service,