diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 3be9dc866a..d217b86a3f 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -26,6 +26,7 @@ option java_package = "feast.proto.core"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "feast/core/OnDemandFeatureView.proto"; +import "feast/core/FeatureView.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; import "feast/core/Aggregation.proto"; @@ -95,4 +96,7 @@ message StreamFeatureViewMeta { // Time where this Feature View is last updated google.protobuf.Timestamp last_updated_timestamp = 2; + + // List of pairs (start_time, end_time) for which this feature view has been materialized. + repeated MaterializationInterval materialization_intervals = 3; } diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 33cd3df0ed..6b38a190fe 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -20,6 +20,9 @@ from feast.protos.feast.core.RequestFeatureView_pb2 import ( RequestFeatureView as RequestFeatureViewProto, ) +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto, +) from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) @@ -106,6 +109,7 @@ def tag_objects_for_keep_delete_update_add( FeatureServiceProto, OnDemandFeatureViewProto, RequestFeatureViewProto, + StreamFeatureViewProto, ValidationReferenceProto, ) @@ -292,6 +296,7 @@ def apply_diff_to_registry( FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, FeastObjectType.REQUEST_FEATURE_VIEW, + FeastObjectType.STREAM_FEATURE_VIEW, ]: feature_view_obj = cast( BaseFeatureView, feast_object_diff.current_feast_object @@ -331,6 +336,7 @@ def apply_diff_to_registry( FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, FeastObjectType.REQUEST_FEATURE_VIEW, + FeastObjectType.STREAM_FEATURE_VIEW, ]: registry.apply_feature_view( cast(BaseFeatureView, feast_object_diff.new_feast_object), diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f959504826..9530b66d5e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -372,6 +372,40 @@ def _get_feature_view( feature_view.entities = [] return feature_view + @log_exceptions_and_usage + def get_stream_feature_view( + self, name: str, allow_registry_cache: bool = False + ) -> StreamFeatureView: + """ + Retrieves a stream feature view. + + Args: + name: Name of stream feature view. + allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry + + Returns: + The specified stream feature view. + + Raises: + FeatureViewNotFoundException: The feature view could not be found. + """ + return self._get_stream_feature_view( + name, allow_registry_cache=allow_registry_cache + ) + + def _get_stream_feature_view( + self, + name: str, + hide_dummy_entity: bool = True, + allow_registry_cache: bool = False, + ) -> StreamFeatureView: + stream_feature_view = self._registry.get_stream_feature_view( + name, self.project, allow_cache=allow_registry_cache + ) + if hide_dummy_entity and stream_feature_view.entities[0] == DUMMY_ENTITY_NAME: + stream_feature_view.entities = [] + return stream_feature_view + @log_exceptions_and_usage def get_on_demand_feature_view(self, name: str) -> OnDemandFeatureView: """ @@ -935,7 +969,6 @@ def get_historical_features( all_feature_views, all_request_feature_views, all_on_demand_feature_views, - all_stream_feature_views, ) = self._get_feature_views_to_use(features) if all_request_feature_views: @@ -1321,9 +1354,14 @@ def write_to_online_store( ingests data directly into the Online store """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type - feature_view = self.get_feature_view( - feature_view_name, allow_registry_cache=allow_registry_cache - ) + try: + feature_view = self.get_stream_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache + ) + except FeatureViewNotFoundException: + feature_view = self.get_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache + ) entities = [] for entity_name in feature_view.entities: entities.append( @@ -1456,7 +1494,6 @@ def _get_online_features( requested_feature_views, requested_request_feature_views, requested_on_demand_feature_views, - request_stream_feature_views, ) = self._get_feature_views_to_use( features=features, allow_cache=True, hide_dummy_entity=False ) @@ -1994,15 +2031,17 @@ def _get_feature_views_to_use( allow_cache=False, hide_dummy_entity: bool = True, ) -> Tuple[ - List[FeatureView], - List[RequestFeatureView], - List[OnDemandFeatureView], - List[StreamFeatureView], + List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView], ]: fvs = { fv.name: fv - for fv in self._list_feature_views(allow_cache, hide_dummy_entity) + for fv in [ + *self._list_feature_views(allow_cache, hide_dummy_entity), + *self._registry.list_stream_feature_views( + project=self.project, allow_cache=allow_cache + ), + ] } request_fvs = { @@ -2019,15 +2058,8 @@ def _get_feature_views_to_use( ) } - sfvs = { - fv.name: fv - for fv in self._registry.list_stream_feature_views( - project=self.project, allow_cache=allow_cache - ) - } - if isinstance(features, FeatureService): - fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use = [], [], [], [] + fvs_to_use, request_fvs_to_use, od_fvs_to_use = [], [], [] for fv_name, projection in [ (projection.name, projection) for projection in features.feature_view_projections @@ -2048,23 +2080,18 @@ def _get_feature_views_to_use( fv = fvs[projection.name].with_projection(copy.copy(projection)) if fv not in fvs_to_use: fvs_to_use.append(fv) - elif fv_name in sfvs: - sfvs_to_use.append( - sfvs[fv_name].with_projection(copy.copy(projection)) - ) else: raise ValueError( f"The provided feature service {features.name} contains a reference to a feature view" f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" f'{fv_name} and that you have registered it by running "apply".' ) - views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use) + views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use) else: views_to_use = ( [*fvs.values()], [*request_fvs.values()], [*od_fvs.values()], - [*sfvs.values()], ) return views_to_use diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 5657fbe372..2f0e902942 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -211,7 +211,10 @@ def plan( path=self._get_db_path(config), name=_table_id(project, FeatureView.from_proto(view)), ) - for view in desired_registry_proto.feature_views + for view in [ + *desired_registry_proto.feature_views, + *desired_registry_proto.stream_feature_views, + ] ] return infra_objects diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 7f298b19b8..dc52013bff 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -77,6 +77,7 @@ class FeastObjectType(Enum): FEATURE_VIEW = "feature view" ON_DEMAND_FEATURE_VIEW = "on demand feature view" REQUEST_FEATURE_VIEW = "request feature view" + STREAM_FEATURE_VIEW = "stream feature view" FEATURE_SERVICE = "feature service" @staticmethod @@ -93,6 +94,9 @@ def get_objects_from_registry( FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views( project=project ), + FeastObjectType.STREAM_FEATURE_VIEW: registry.list_stream_feature_views( + project=project, + ), FeastObjectType.FEATURE_SERVICE: registry.list_feature_services( project=project ), @@ -108,6 +112,7 @@ def get_objects_from_repo_contents( FeastObjectType.FEATURE_VIEW: repo_contents.feature_views, FeastObjectType.ON_DEMAND_FEATURE_VIEW: repo_contents.on_demand_feature_views, FeastObjectType.REQUEST_FEATURE_VIEW: repo_contents.request_feature_views, + FeastObjectType.STREAM_FEATURE_VIEW: repo_contents.stream_feature_views, FeastObjectType.FEATURE_SERVICE: repo_contents.feature_services, } @@ -717,6 +722,30 @@ def get_feature_view( return FeatureView.from_proto(feature_view_proto) raise FeatureViewNotFoundException(name, project) + def get_stream_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> StreamFeatureView: + """ + Retrieves a stream feature view. + + Args: + name: Name of stream feature view + project: Feast project that this stream feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + registry_proto = self._get_registry_proto(allow_cache=allow_cache) + for feature_view_proto in registry_proto.stream_feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return StreamFeatureView.from_proto(feature_view_proto) + raise FeatureViewNotFoundException(name, project) + def delete_feature_service(self, name: str, project: str, commit: bool = True): """ Deletes a feature service or raises an exception if not found. diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 8b81a71bae..37daa6500e 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -13,7 +13,8 @@ from click.exceptions import BadParameter from feast import PushSource -from feast.data_source import DataSource +from feast.batch_feature_view import BatchFeatureView +from feast.data_source import DataSource, KafkaSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity from feast.feature_service import FeatureService @@ -25,6 +26,7 @@ from feast.repo_config import RepoConfig from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView +from feast.stream_feature_view import StreamFeatureView from feast.usage import log_exceptions_and_usage @@ -122,8 +124,11 @@ def parse_repo(repo_root: Path) -> RepoContents: ): res.data_sources.append(obj) data_sources_set.add(obj) - if isinstance(obj, FeatureView) and not any( - (obj is fv) for fv in res.feature_views + if ( + isinstance(obj, FeatureView) + and not any((obj is fv) for fv in res.feature_views) + and not isinstance(obj, StreamFeatureView) + and not isinstance(obj, BatchFeatureView) ): res.feature_views.append(obj) if isinstance(obj.stream_source, PushSource) and not any( @@ -133,6 +138,19 @@ def parse_repo(repo_root: Path) -> RepoContents: # Don't add if the push source's batch source is a duplicate of an existing batch source if push_source_dep not in data_sources_set: res.data_sources.append(push_source_dep) + elif isinstance(obj, StreamFeatureView) and not any( + (obj is sfv) for sfv in res.stream_feature_views + ): + res.stream_feature_views.append(obj) + if ( + isinstance(obj.stream_source, PushSource) + or isinstance(obj.stream_source, KafkaSource) + and not any((obj is ds) for ds in res.data_sources) + ): + batch_source_dep = obj.stream_source.batch_source + # Don't add if the push source's batch source is a duplicate of an existing batch source + if batch_source_dep and batch_source_dep not in data_sources_set: + res.data_sources.append(batch_source_dep) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities ): @@ -196,7 +214,12 @@ def extract_objects_for_apply_delete(project, registry, repo): all_to_apply: List[ Union[ - Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + Entity, + FeatureView, + RequestFeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, ] ] = [] for object_type in FEAST_OBJECT_TYPES: @@ -205,7 +228,12 @@ def extract_objects_for_apply_delete(project, registry, repo): all_to_delete: List[ Union[ - Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + Entity, + FeatureView, + RequestFeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, ] ] = [] for object_type in FEAST_OBJECT_TYPES: diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index bba16e2627..12d7f9b74b 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -7,12 +7,16 @@ import dill from google.protobuf.duration_pb2 import Duration +from feast import utils from feast.aggregation import Aggregation from feast.data_source import DataSource, KafkaSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureView_pb2 import ( + MaterializationInterval as MaterializationIntervalProto, +) from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, ) @@ -70,7 +74,7 @@ def __init__( f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " ) - self.aggregations = aggregations + self.aggregations = aggregations or [] self.mode = mode self.timestamp_field = timestamp_field self.udf = udf @@ -113,12 +117,18 @@ def __hash__(self): return super().__hash__() def to_proto(self): - meta = StreamFeatureViewMetaProto() + meta = StreamFeatureViewMetaProto(materialization_intervals=[]) if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) if self.last_updated_timestamp: meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp) + for interval in self.materialization_intervals: + interval_proto = MaterializationIntervalProto() + interval_proto.start_time.FromDatetime(interval[0]) + interval_proto.end_time.FromDatetime(interval[1]) + meta.materialization_intervals.append(interval_proto) + ttl_duration = None if self.ttl is not None: ttl_duration = Duration() @@ -169,6 +179,11 @@ def from_proto(cls, sfv_proto): if sfv_proto.spec.HasField("stream_source") else None ) + udf = ( + dill.loads(sfv_proto.spec.user_defined_function.body) + if sfv_proto.spec.HasField("user_defined_function") + else None + ) sfv_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, @@ -185,7 +200,7 @@ def from_proto(cls, sfv_proto): ), source=stream_source, mode=sfv_proto.spec.mode, - udf=dill.loads(sfv_proto.spec.user_defined_function.body), + udf=udf, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -214,6 +229,14 @@ def from_proto(cls, sfv_proto): sfv_proto.meta.last_updated_timestamp.ToDatetime() ) + for interval in sfv_proto.meta.materialization_intervals: + stream_feature_view.materialization_intervals.append( + ( + utils.make_tzaware(interval.start_time.ToDatetime()), + utils.make_tzaware(interval.end_time.ToDatetime()), + ) + ) + return sfv_feature_view diff --git a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py index b01ca434fa..d24618b270 100644 --- a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py +++ b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py @@ -11,7 +11,7 @@ @pytest.mark.integration -def test_read_pre_applied(environment) -> None: +def test_apply_stream_feature_view(environment) -> None: """ Test apply of StreamFeatureView. """ @@ -60,3 +60,12 @@ def simple_sfv(df): entities = fs.list_entities() assert len(entities) == 1 assert entities[0] == entity + + features = fs.get_online_features( + features=["simple_sfv:dummy_field"], entity_rows=[{"test_key": 1001}], + ).to_dict(include_event_timestamps=True) + + assert "test_key" in features + assert features["test_key"] == [1001] + assert "dummy_field" in features + assert features["dummy_field"] == [None]