From 54a33f1ea3e902303e06050391fc7a205bfc78a9 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 2 Jun 2022 14:55:47 -0700 Subject: [PATCH 1/6] Fix apply workflow Signed-off-by: Kevin Zhang --- protos/feast/core/StreamFeatureView.proto | 4 + sdk/python/feast/diff/registry_diff.py | 6 + sdk/python/feast/feature_store.py | 67 ++++-- .../feast/infra/online_stores/sqlite.py | 2 +- sdk/python/feast/registry.py | 29 +++ sdk/python/feast/repo_operations.py | 19 +- sdk/python/feast/stream_feature_view.py | 29 ++- sdk/python/feast/stream_processor.py | 201 ++++++++++++++++++ 8 files changed, 328 insertions(+), 29 deletions(-) create mode 100644 sdk/python/feast/stream_processor.py diff --git a/protos/feast/core/StreamFeatureView.proto b/protos/feast/core/StreamFeatureView.proto index 3be9dc866a..d217b86a3f 100644 --- a/protos/feast/core/StreamFeatureView.proto +++ b/protos/feast/core/StreamFeatureView.proto @@ -26,6 +26,7 @@ option java_package = "feast.proto.core"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "feast/core/OnDemandFeatureView.proto"; +import "feast/core/FeatureView.proto"; import "feast/core/Feature.proto"; import "feast/core/DataSource.proto"; import "feast/core/Aggregation.proto"; @@ -95,4 +96,7 @@ message StreamFeatureViewMeta { // Time where this Feature View is last updated google.protobuf.Timestamp last_updated_timestamp = 2; + + // List of pairs (start_time, end_time) for which this feature view has been materialized. + repeated MaterializationInterval materialization_intervals = 3; } diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 33cd3df0ed..9473aeff3b 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -20,6 +20,9 @@ from feast.protos.feast.core.RequestFeatureView_pb2 import ( RequestFeatureView as RequestFeatureViewProto, ) +from feast.protos.feast.core.StreamFeatureView_pb2 import ( + StreamFeatureView as StreamFeatureViewProto +) from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) @@ -106,6 +109,7 @@ def tag_objects_for_keep_delete_update_add( FeatureServiceProto, OnDemandFeatureViewProto, RequestFeatureViewProto, + StreamFeatureViewProto, ValidationReferenceProto, ) @@ -292,6 +296,7 @@ def apply_diff_to_registry( FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, FeastObjectType.REQUEST_FEATURE_VIEW, + FeastObjectType.STREAM_FEATURE_VIEW, ]: feature_view_obj = cast( BaseFeatureView, feast_object_diff.current_feast_object @@ -331,6 +336,7 @@ def apply_diff_to_registry( FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, FeastObjectType.REQUEST_FEATURE_VIEW, + FeastObjectType.STREAM_FEATURE_VIEW, ]: registry.apply_feature_view( cast(BaseFeatureView, feast_object_diff.new_feast_object), diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f959504826..56c91a8059 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -93,7 +93,6 @@ from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute from feast.value_type import ValueType from feast.version import get_version - warnings.simplefilter("once", DeprecationWarning) if TYPE_CHECKING: @@ -372,6 +371,38 @@ def _get_feature_view( feature_view.entities = [] return feature_view + @log_exceptions_and_usage + def get_stream_feature_view( + self, name: str, allow_registry_cache: bool = False + ) -> StreamFeatureView: + """ + Retrieves a stream feature view. + + Args: + name: Name of stream feature view. + allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry + + Returns: + The specified stream feature view. + + Raises: + FeatureViewNotFoundException: The feature view could not be found. + """ + return self._get_stream_feature_view(name, allow_registry_cache=allow_registry_cache) + + def _get_stream_feature_view( + self, + name: str, + hide_dummy_entity: bool = True, + allow_registry_cache: bool = False, + ) -> StreamFeatureView: + stream_feature_view = self._registry.get_stream_feature_view( + name, self.project, allow_cache=allow_registry_cache + ) + if hide_dummy_entity and stream_feature_view.entities[0] == DUMMY_ENTITY_NAME: + stream_feature_view.entities = [] + return stream_feature_view + @log_exceptions_and_usage def get_on_demand_feature_view(self, name: str) -> OnDemandFeatureView: """ @@ -1321,9 +1352,14 @@ def write_to_online_store( ingests data directly into the Online store """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type - feature_view = self.get_feature_view( - feature_view_name, allow_registry_cache=allow_registry_cache - ) + try: + feature_view = self.get_stream_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache + ) + except FeatureViewNotFoundException: + feature_view = self.get_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache + ) entities = [] for entity_name in feature_view.entities: entities.append( @@ -1456,7 +1492,6 @@ def _get_online_features( requested_feature_views, requested_request_feature_views, requested_on_demand_feature_views, - request_stream_feature_views, ) = self._get_feature_views_to_use( features=features, allow_cache=True, hide_dummy_entity=False ) @@ -1997,12 +2032,14 @@ def _get_feature_views_to_use( List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView], - List[StreamFeatureView], ]: fvs = { fv.name: fv - for fv in self._list_feature_views(allow_cache, hide_dummy_entity) + for fv in [ + *self._list_feature_views(allow_cache, hide_dummy_entity), + *self._registry.list_stream_feature_views(project=self.project, allow_cache=allow_cache + )] } request_fvs = { @@ -2019,15 +2056,8 @@ def _get_feature_views_to_use( ) } - sfvs = { - fv.name: fv - for fv in self._registry.list_stream_feature_views( - project=self.project, allow_cache=allow_cache - ) - } - if isinstance(features, FeatureService): - fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use = [], [], [], [] + fvs_to_use, request_fvs_to_use, od_fvs_to_use = [], [], [] for fv_name, projection in [ (projection.name, projection) for projection in features.feature_view_projections @@ -2048,23 +2078,18 @@ def _get_feature_views_to_use( fv = fvs[projection.name].with_projection(copy.copy(projection)) if fv not in fvs_to_use: fvs_to_use.append(fv) - elif fv_name in sfvs: - sfvs_to_use.append( - sfvs[fv_name].with_projection(copy.copy(projection)) - ) else: raise ValueError( f"The provided feature service {features.name} contains a reference to a feature view" f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" f'{fv_name} and that you have registered it by running "apply".' ) - views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use) + views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use) else: views_to_use = ( [*fvs.values()], [*request_fvs.values()], [*od_fvs.values()], - [*sfvs.values()], ) return views_to_use diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 5657fbe372..adb26f479a 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -211,7 +211,7 @@ def plan( path=self._get_db_path(config), name=_table_id(project, FeatureView.from_proto(view)), ) - for view in desired_registry_proto.feature_views + for view in [*desired_registry_proto.feature_views, *desired_registry_proto.stream_feature_views] ] return infra_objects diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 7f298b19b8..dc52013bff 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -77,6 +77,7 @@ class FeastObjectType(Enum): FEATURE_VIEW = "feature view" ON_DEMAND_FEATURE_VIEW = "on demand feature view" REQUEST_FEATURE_VIEW = "request feature view" + STREAM_FEATURE_VIEW = "stream feature view" FEATURE_SERVICE = "feature service" @staticmethod @@ -93,6 +94,9 @@ def get_objects_from_registry( FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views( project=project ), + FeastObjectType.STREAM_FEATURE_VIEW: registry.list_stream_feature_views( + project=project, + ), FeastObjectType.FEATURE_SERVICE: registry.list_feature_services( project=project ), @@ -108,6 +112,7 @@ def get_objects_from_repo_contents( FeastObjectType.FEATURE_VIEW: repo_contents.feature_views, FeastObjectType.ON_DEMAND_FEATURE_VIEW: repo_contents.on_demand_feature_views, FeastObjectType.REQUEST_FEATURE_VIEW: repo_contents.request_feature_views, + FeastObjectType.STREAM_FEATURE_VIEW: repo_contents.stream_feature_views, FeastObjectType.FEATURE_SERVICE: repo_contents.feature_services, } @@ -717,6 +722,30 @@ def get_feature_view( return FeatureView.from_proto(feature_view_proto) raise FeatureViewNotFoundException(name, project) + def get_stream_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> StreamFeatureView: + """ + Retrieves a stream feature view. + + Args: + name: Name of stream feature view + project: Feast project that this stream feature view belongs to + allow_cache: Allow returning feature view from the cached registry + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + registry_proto = self._get_registry_proto(allow_cache=allow_cache) + for feature_view_proto in registry_proto.stream_feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return StreamFeatureView.from_proto(feature_view_proto) + raise FeatureViewNotFoundException(name, project) + def delete_feature_service(self, name: str, project: str, commit: bool = True): """ Deletes a feature service or raises an exception if not found. diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 8b81a71bae..96f73c3701 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -13,7 +13,7 @@ from click.exceptions import BadParameter from feast import PushSource -from feast.data_source import DataSource +from feast.data_source import DataSource, KafkaSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity from feast.feature_service import FeatureService @@ -25,6 +25,8 @@ from feast.repo_config import RepoConfig from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView +from feast.stream_feature_view import StreamFeatureView +from feast.batch_feature_view import BatchFeatureView from feast.usage import log_exceptions_and_usage @@ -124,7 +126,7 @@ def parse_repo(repo_root: Path) -> RepoContents: data_sources_set.add(obj) if isinstance(obj, FeatureView) and not any( (obj is fv) for fv in res.feature_views - ): + ) and not isinstance(obj, StreamFeatureView) and not isinstance(obj, BatchFeatureView): res.feature_views.append(obj) if isinstance(obj.stream_source, PushSource) and not any( (obj is ds) for ds in res.data_sources @@ -133,6 +135,15 @@ def parse_repo(repo_root: Path) -> RepoContents: # Don't add if the push source's batch source is a duplicate of an existing batch source if push_source_dep not in data_sources_set: res.data_sources.append(push_source_dep) + elif isinstance(obj, StreamFeatureView): + res.stream_feature_views.append(obj) + if isinstance(obj.stream_source, PushSource) or isinstance(obj.stream_source, KafkaSource) and not any( + (obj is ds) for ds in res.data_sources + ): + batch_source_dep = obj.stream_source.batch_source + # Don't add if the push source's batch source is a duplicate of an existing batch source + if batch_source_dep not in data_sources_set: + res.data_sources.append(batch_source_dep) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities ): @@ -196,7 +207,7 @@ def extract_objects_for_apply_delete(project, registry, repo): all_to_apply: List[ Union[ - Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, StreamFeatureView, FeatureService ] ] = [] for object_type in FEAST_OBJECT_TYPES: @@ -205,7 +216,7 @@ def extract_objects_for_apply_delete(project, registry, repo): all_to_delete: List[ Union[ - Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, StreamFeatureView, FeatureService ] ] = [] for object_type in FEAST_OBJECT_TYPES: diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index bba16e2627..12d7f9b74b 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -7,12 +7,16 @@ import dill from google.protobuf.duration_pb2 import Duration +from feast import utils from feast.aggregation import Aggregation from feast.data_source import DataSource, KafkaSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureView_pb2 import ( + MaterializationInterval as MaterializationIntervalProto, +) from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, ) @@ -70,7 +74,7 @@ def __init__( f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " ) - self.aggregations = aggregations + self.aggregations = aggregations or [] self.mode = mode self.timestamp_field = timestamp_field self.udf = udf @@ -113,12 +117,18 @@ def __hash__(self): return super().__hash__() def to_proto(self): - meta = StreamFeatureViewMetaProto() + meta = StreamFeatureViewMetaProto(materialization_intervals=[]) if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) if self.last_updated_timestamp: meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp) + for interval in self.materialization_intervals: + interval_proto = MaterializationIntervalProto() + interval_proto.start_time.FromDatetime(interval[0]) + interval_proto.end_time.FromDatetime(interval[1]) + meta.materialization_intervals.append(interval_proto) + ttl_duration = None if self.ttl is not None: ttl_duration = Duration() @@ -169,6 +179,11 @@ def from_proto(cls, sfv_proto): if sfv_proto.spec.HasField("stream_source") else None ) + udf = ( + dill.loads(sfv_proto.spec.user_defined_function.body) + if sfv_proto.spec.HasField("user_defined_function") + else None + ) sfv_feature_view = cls( name=sfv_proto.spec.name, description=sfv_proto.spec.description, @@ -185,7 +200,7 @@ def from_proto(cls, sfv_proto): ), source=stream_source, mode=sfv_proto.spec.mode, - udf=dill.loads(sfv_proto.spec.user_defined_function.body), + udf=udf, aggregations=[ Aggregation.from_proto(agg_proto) for agg_proto in sfv_proto.spec.aggregations @@ -214,6 +229,14 @@ def from_proto(cls, sfv_proto): sfv_proto.meta.last_updated_timestamp.ToDatetime() ) + for interval in sfv_proto.meta.materialization_intervals: + stream_feature_view.materialization_intervals.append( + ( + utils.make_tzaware(interval.start_time.ToDatetime()), + utils.make_tzaware(interval.end_time.ToDatetime()), + ) + ) + return sfv_feature_view diff --git a/sdk/python/feast/stream_processor.py b/sdk/python/feast/stream_processor.py new file mode 100644 index 0000000000..bd1328afb9 --- /dev/null +++ b/sdk/python/feast/stream_processor.py @@ -0,0 +1,201 @@ +import abc +from datetime import timedelta +from types import MethodType +from typing import ( + TYPE_CHECKING, + Callable, + List, + Any, +) + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.avro.functions import from_avro +from pyspark.sql.functions import col, from_json + +from feast.data_format import AvroFormat, JsonFormat +from feast.data_source import DataSource, KafkaSource +from feast.stream_feature_view import StreamFeatureView +from feast.repo_config import FeastConfigBaseModel + +StreamTable = DataFrame # Can add more to this later(change to union). + +class ProcessorConfig(FeastConfigBaseModel): + # Processor mode(spark, etc) + mode: str + # Ingestion source(kafka, kinesis, etc) + source: str + +class SparkProcessorConfig(ProcessorConfig): + spark_session: SparkSession + +class StreamProcessor(abc.ABC): + data_source: DataSource + sfv: StreamFeatureView + + def __init__(self, sfv: StreamFeatureView, data_source: DataSource): + self.sfv = sfv + self.data_source = data_source + + def _ingest_stream_data(self) -> StreamTable: + """ + Ingests data into StreamTable depending on what type of data it is + """ + pass + + def _construct_transformation_plan(self, table: StreamTable) -> StreamTable: + """ + Applies transformations on top of StreamTable object. Since stream engines use lazy + evaluation, the StreamTable will not be materialized until it is actually evaluated. + For example: df.collect() in spark or tbl.execute() in Flink. + """ + pass + + def _write_to_online_store(self, table: StreamTable): + """ + Returns query for writing stream. + """ + pass + + def transform_stream_data(self) -> StreamTable: + pass + + def ingest_stream_feature_view(self): + pass + + def transform_and_write(self, table: StreamTable): + pass + + +class SparkStreamKafkaProcessor(StreamProcessor): + # TODO: wrap spark data in some kind of config + # includes session, format, checkpoint location etc. + spark: SparkSession + format: str + write_function: MethodType + join_keys: List[str] + + def __init__( + self, + sfv: StreamFeatureView, + spark_session: SparkSession, + write_function: MethodType, + ): + if not isinstance(sfv.stream_source, KafkaSource): + raise ValueError("data source is not kafka source") + if not isinstance( + sfv.stream_source.kafka_options.message_format, AvroFormat + ) and not isinstance( + sfv.stream_source.kafka_options.message_format, JsonFormat + ): + raise ValueError( + "spark streaming currently only supports json or avro format for kafka source schema" + ) + # if not sfv.mode == "spark": + # raise ValueError(f"stream feature view mode is {sfv.mode}, but only supports spark") + self.format = ( + "json" + if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat) + else "avro" + ) + self.spark = spark_session + self.write_function = write_function + super().__init__(sfv=sfv, data_source=sfv.stream_source) + + def _ingest_stream_data(self) -> StreamTable: + """ + Ingests data into StreamTable depending on what type of data format it is in. + Only supports json and avro formats currently. + """ + if self.format == "json": + streamingDF = ( + self.spark.readStream.format("kafka") + .option( + "kafka.bootstrap.servers", + self.data_source.kafka_options.bootstrap_servers, + ) + .option("subscribe", self.data_source.kafka_options.topic) + .option("startingOffsets", "latest") # Query start + .load() + .selectExpr("CAST(value AS STRING)") + .select( + from_json( + col("value"), + self.data_source.kafka_options.message_format.schema_json, + ).alias("table") + ) + .select("table.*") + ) + else: + streamingDF = ( + self.spark.readStream.format("kafka") + .option( + "kafka.bootstrap.servers", + self.data_source.kafka_options.bootstrap_servers, + ) + .option("subscribe", self.data_source.kafka_options.topic) + .option("startingOffsets", "latest") # Query start + .load() + .selectExpr("CAST(value AS STRING)") + .select( + from_avro( + col("value"), + self.data_source.kafka_options.message_format.schema_json, + ).alias("table") + ) + .select("table.*") + ) + return streamingDF + + def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: + """ + Applies transformations on top of StreamTable object. Since stream engines use lazy + evaluation, the StreamTable will not be materialized until it is actually evaluated. + For example: df.collect() in spark or tbl.execute() in Flink. + """ + if not self.sfv.udf: + return df + else: + return self.sfv.udf(df) + + def _write_to_online_store(self, df: StreamTable): + """ + Returns query for writing stream. + """ + # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. + def batch_write(row: DataFrame, batch_id: int): + pd_row = row.toPandas() + self.write_function(pd_row, input_timestamp="event_timestamp", output_timestamp="") + query = ( + df.writeStream.outputMode("update") + .option("checkpointLocation", "/tmp/checkpoint/") + .trigger(processingTime="30 seconds") + .foreachBatch(batch_write) + .start() + ) + query.awaitTermination(timeout=30) + return query + + def transform_stream_data(self) -> StreamTable: + df = self._ingest_stream_data() + return self._construct_transformation_plan(df) + + def ingest_stream_feature_view(self): + ingested_stream_df = self._ingest_stream_data() + transformed_df = self._construct_transformation_plan(ingested_stream_df) + online_store_query = self._write_to_online_store(transformed_df) + return online_store_query + + def transform_and_write(self, table: StreamTable): + pass + +def get_stream_processor(config: ProcessorConfig, sfv: StreamFeatureView, write_function): + if config.mode == "spark" and config.source == "kafka": + if not isinstance(config, SparkProcessorConfig): + raise ValueError(f"spark processor config required, got {type(config)}") + return SparkStreamKafkaProcessor( + sfv=sfv, + spark_session = config.spark_session, + write_function=write_function, + ) + else: + raise ValueError("other processors besides spark-kafka not supported") From 562b854d2ac0ee509c914e7c5e8cd4abed323847 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 2 Jun 2022 15:02:55 -0700 Subject: [PATCH 2/6] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/stream_processor.py | 201 ------------------ .../test_stream_feature_view_apply.py | 18 +- 2 files changed, 17 insertions(+), 202 deletions(-) delete mode 100644 sdk/python/feast/stream_processor.py diff --git a/sdk/python/feast/stream_processor.py b/sdk/python/feast/stream_processor.py deleted file mode 100644 index bd1328afb9..0000000000 --- a/sdk/python/feast/stream_processor.py +++ /dev/null @@ -1,201 +0,0 @@ -import abc -from datetime import timedelta -from types import MethodType -from typing import ( - TYPE_CHECKING, - Callable, - List, - Any, -) - -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql.avro.functions import from_avro -from pyspark.sql.functions import col, from_json - -from feast.data_format import AvroFormat, JsonFormat -from feast.data_source import DataSource, KafkaSource -from feast.stream_feature_view import StreamFeatureView -from feast.repo_config import FeastConfigBaseModel - -StreamTable = DataFrame # Can add more to this later(change to union). - -class ProcessorConfig(FeastConfigBaseModel): - # Processor mode(spark, etc) - mode: str - # Ingestion source(kafka, kinesis, etc) - source: str - -class SparkProcessorConfig(ProcessorConfig): - spark_session: SparkSession - -class StreamProcessor(abc.ABC): - data_source: DataSource - sfv: StreamFeatureView - - def __init__(self, sfv: StreamFeatureView, data_source: DataSource): - self.sfv = sfv - self.data_source = data_source - - def _ingest_stream_data(self) -> StreamTable: - """ - Ingests data into StreamTable depending on what type of data it is - """ - pass - - def _construct_transformation_plan(self, table: StreamTable) -> StreamTable: - """ - Applies transformations on top of StreamTable object. Since stream engines use lazy - evaluation, the StreamTable will not be materialized until it is actually evaluated. - For example: df.collect() in spark or tbl.execute() in Flink. - """ - pass - - def _write_to_online_store(self, table: StreamTable): - """ - Returns query for writing stream. - """ - pass - - def transform_stream_data(self) -> StreamTable: - pass - - def ingest_stream_feature_view(self): - pass - - def transform_and_write(self, table: StreamTable): - pass - - -class SparkStreamKafkaProcessor(StreamProcessor): - # TODO: wrap spark data in some kind of config - # includes session, format, checkpoint location etc. - spark: SparkSession - format: str - write_function: MethodType - join_keys: List[str] - - def __init__( - self, - sfv: StreamFeatureView, - spark_session: SparkSession, - write_function: MethodType, - ): - if not isinstance(sfv.stream_source, KafkaSource): - raise ValueError("data source is not kafka source") - if not isinstance( - sfv.stream_source.kafka_options.message_format, AvroFormat - ) and not isinstance( - sfv.stream_source.kafka_options.message_format, JsonFormat - ): - raise ValueError( - "spark streaming currently only supports json or avro format for kafka source schema" - ) - # if not sfv.mode == "spark": - # raise ValueError(f"stream feature view mode is {sfv.mode}, but only supports spark") - self.format = ( - "json" - if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat) - else "avro" - ) - self.spark = spark_session - self.write_function = write_function - super().__init__(sfv=sfv, data_source=sfv.stream_source) - - def _ingest_stream_data(self) -> StreamTable: - """ - Ingests data into StreamTable depending on what type of data format it is in. - Only supports json and avro formats currently. - """ - if self.format == "json": - streamingDF = ( - self.spark.readStream.format("kafka") - .option( - "kafka.bootstrap.servers", - self.data_source.kafka_options.bootstrap_servers, - ) - .option("subscribe", self.data_source.kafka_options.topic) - .option("startingOffsets", "latest") # Query start - .load() - .selectExpr("CAST(value AS STRING)") - .select( - from_json( - col("value"), - self.data_source.kafka_options.message_format.schema_json, - ).alias("table") - ) - .select("table.*") - ) - else: - streamingDF = ( - self.spark.readStream.format("kafka") - .option( - "kafka.bootstrap.servers", - self.data_source.kafka_options.bootstrap_servers, - ) - .option("subscribe", self.data_source.kafka_options.topic) - .option("startingOffsets", "latest") # Query start - .load() - .selectExpr("CAST(value AS STRING)") - .select( - from_avro( - col("value"), - self.data_source.kafka_options.message_format.schema_json, - ).alias("table") - ) - .select("table.*") - ) - return streamingDF - - def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: - """ - Applies transformations on top of StreamTable object. Since stream engines use lazy - evaluation, the StreamTable will not be materialized until it is actually evaluated. - For example: df.collect() in spark or tbl.execute() in Flink. - """ - if not self.sfv.udf: - return df - else: - return self.sfv.udf(df) - - def _write_to_online_store(self, df: StreamTable): - """ - Returns query for writing stream. - """ - # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. - def batch_write(row: DataFrame, batch_id: int): - pd_row = row.toPandas() - self.write_function(pd_row, input_timestamp="event_timestamp", output_timestamp="") - query = ( - df.writeStream.outputMode("update") - .option("checkpointLocation", "/tmp/checkpoint/") - .trigger(processingTime="30 seconds") - .foreachBatch(batch_write) - .start() - ) - query.awaitTermination(timeout=30) - return query - - def transform_stream_data(self) -> StreamTable: - df = self._ingest_stream_data() - return self._construct_transformation_plan(df) - - def ingest_stream_feature_view(self): - ingested_stream_df = self._ingest_stream_data() - transformed_df = self._construct_transformation_plan(ingested_stream_df) - online_store_query = self._write_to_online_store(transformed_df) - return online_store_query - - def transform_and_write(self, table: StreamTable): - pass - -def get_stream_processor(config: ProcessorConfig, sfv: StreamFeatureView, write_function): - if config.mode == "spark" and config.source == "kafka": - if not isinstance(config, SparkProcessorConfig): - raise ValueError(f"spark processor config required, got {type(config)}") - return SparkStreamKafkaProcessor( - sfv=sfv, - spark_session = config.spark_session, - write_function=write_function, - ) - else: - raise ValueError("other processors besides spark-kafka not supported") diff --git a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py index b01ca434fa..89c4976d33 100644 --- a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py +++ b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py @@ -11,7 +11,7 @@ @pytest.mark.integration -def test_read_pre_applied(environment) -> None: +def test_apply_stream_feature_view(environment) -> None: """ Test apply of StreamFeatureView. """ @@ -60,3 +60,19 @@ def simple_sfv(df): entities = fs.list_entities() assert len(entities) == 1 assert entities[0] == entity + + features = fs.get_online_features( + features=[ + "simple_sfv:dummy_field", + ], + entity_rows=[ + { + "test_key": 1001 + } + ], + ).to_dict(include_event_timestamps=True) + + assert 'test_key' in features + assert features['test_key'] == [1001] + assert 'dummy_field' in features + assert features['dummy_field'] == [None] From cdc3b0b9db83a010c520a50a280305e12b76ec60 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 2 Jun 2022 15:06:07 -0700 Subject: [PATCH 3/6] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/diff/registry_diff.py | 2 +- sdk/python/feast/feature_store.py | 16 +++++----- .../feast/infra/online_stores/sqlite.py | 5 ++- sdk/python/feast/repo_operations.py | 31 ++++++++++++++----- .../test_stream_feature_view_apply.py | 17 +++------- 5 files changed, 42 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/diff/registry_diff.py b/sdk/python/feast/diff/registry_diff.py index 9473aeff3b..6b38a190fe 100644 --- a/sdk/python/feast/diff/registry_diff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -21,7 +21,7 @@ RequestFeatureView as RequestFeatureViewProto, ) from feast.protos.feast.core.StreamFeatureView_pb2 import ( - StreamFeatureView as StreamFeatureViewProto + StreamFeatureView as StreamFeatureViewProto, ) from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 56c91a8059..9530b66d5e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -93,6 +93,7 @@ from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute from feast.value_type import ValueType from feast.version import get_version + warnings.simplefilter("once", DeprecationWarning) if TYPE_CHECKING: @@ -388,7 +389,9 @@ def get_stream_feature_view( Raises: FeatureViewNotFoundException: The feature view could not be found. """ - return self._get_stream_feature_view(name, allow_registry_cache=allow_registry_cache) + return self._get_stream_feature_view( + name, allow_registry_cache=allow_registry_cache + ) def _get_stream_feature_view( self, @@ -966,7 +969,6 @@ def get_historical_features( all_feature_views, all_request_feature_views, all_on_demand_feature_views, - all_stream_feature_views, ) = self._get_feature_views_to_use(features) if all_request_feature_views: @@ -2029,17 +2031,17 @@ def _get_feature_views_to_use( allow_cache=False, hide_dummy_entity: bool = True, ) -> Tuple[ - List[FeatureView], - List[RequestFeatureView], - List[OnDemandFeatureView], + List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView], ]: fvs = { fv.name: fv for fv in [ *self._list_feature_views(allow_cache, hide_dummy_entity), - *self._registry.list_stream_feature_views(project=self.project, allow_cache=allow_cache - )] + *self._registry.list_stream_feature_views( + project=self.project, allow_cache=allow_cache + ), + ] } request_fvs = { diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index adb26f479a..2f0e902942 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -211,7 +211,10 @@ def plan( path=self._get_db_path(config), name=_table_id(project, FeatureView.from_proto(view)), ) - for view in [*desired_registry_proto.feature_views, *desired_registry_proto.stream_feature_views] + for view in [ + *desired_registry_proto.feature_views, + *desired_registry_proto.stream_feature_views, + ] ] return infra_objects diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 96f73c3701..b0cd04a193 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -13,6 +13,7 @@ from click.exceptions import BadParameter from feast import PushSource +from feast.batch_feature_view import BatchFeatureView from feast.data_source import DataSource, KafkaSource from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity @@ -26,7 +27,6 @@ from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView from feast.stream_feature_view import StreamFeatureView -from feast.batch_feature_view import BatchFeatureView from feast.usage import log_exceptions_and_usage @@ -124,9 +124,12 @@ def parse_repo(repo_root: Path) -> RepoContents: ): res.data_sources.append(obj) data_sources_set.add(obj) - if isinstance(obj, FeatureView) and not any( - (obj is fv) for fv in res.feature_views - ) and not isinstance(obj, StreamFeatureView) and not isinstance(obj, BatchFeatureView): + if ( + isinstance(obj, FeatureView) + and not any((obj is fv) for fv in res.feature_views) + and not isinstance(obj, StreamFeatureView) + and not isinstance(obj, BatchFeatureView) + ): res.feature_views.append(obj) if isinstance(obj.stream_source, PushSource) and not any( (obj is ds) for ds in res.data_sources @@ -137,8 +140,10 @@ def parse_repo(repo_root: Path) -> RepoContents: res.data_sources.append(push_source_dep) elif isinstance(obj, StreamFeatureView): res.stream_feature_views.append(obj) - if isinstance(obj.stream_source, PushSource) or isinstance(obj.stream_source, KafkaSource) and not any( - (obj is ds) for ds in res.data_sources + if ( + isinstance(obj.stream_source, PushSource) + or isinstance(obj.stream_source, KafkaSource) + and not any((obj is ds) for ds in res.data_sources) ): batch_source_dep = obj.stream_source.batch_source # Don't add if the push source's batch source is a duplicate of an existing batch source @@ -207,7 +212,12 @@ def extract_objects_for_apply_delete(project, registry, repo): all_to_apply: List[ Union[ - Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, StreamFeatureView, FeatureService + Entity, + FeatureView, + RequestFeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, ] ] = [] for object_type in FEAST_OBJECT_TYPES: @@ -216,7 +226,12 @@ def extract_objects_for_apply_delete(project, registry, repo): all_to_delete: List[ Union[ - Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, StreamFeatureView, FeatureService + Entity, + FeatureView, + RequestFeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, ] ] = [] for object_type in FEAST_OBJECT_TYPES: diff --git a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py index 89c4976d33..dbf1ea6b64 100644 --- a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py +++ b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py @@ -62,17 +62,10 @@ def simple_sfv(df): assert entities[0] == entity features = fs.get_online_features( - features=[ - "simple_sfv:dummy_field", - ], - entity_rows=[ - { - "test_key": 1001 - } - ], + features=["simple_sfv:dummy_field",], entity_rows=[{"test_key": 1001}], ).to_dict(include_event_timestamps=True) - assert 'test_key' in features - assert features['test_key'] == [1001] - assert 'dummy_field' in features - assert features['dummy_field'] == [None] + assert "test_key" in features + assert features["test_key"] == [1001] + assert "dummy_field" in features + assert features["dummy_field"] == [None] From 710169d4d097fc39c526eed210d485286aa823a2 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 2 Jun 2022 15:09:05 -0700 Subject: [PATCH 4/6] Fix issues Signed-off-by: Kevin Zhang --- sdk/python/feast/repo_operations.py | 2 +- .../integration/registration/test_stream_feature_view_apply.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index b0cd04a193..353d49ece1 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -147,7 +147,7 @@ def parse_repo(repo_root: Path) -> RepoContents: ): batch_source_dep = obj.stream_source.batch_source # Don't add if the push source's batch source is a duplicate of an existing batch source - if batch_source_dep not in data_sources_set: + if batch_source_dep and batch_source_dep not in data_sources_set: res.data_sources.append(batch_source_dep) elif isinstance(obj, Entity) and not any( (obj is entity) for entity in res.entities diff --git a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py index dbf1ea6b64..d24618b270 100644 --- a/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py +++ b/sdk/python/tests/integration/registration/test_stream_feature_view_apply.py @@ -62,7 +62,7 @@ def simple_sfv(df): assert entities[0] == entity features = fs.get_online_features( - features=["simple_sfv:dummy_field",], entity_rows=[{"test_key": 1001}], + features=["simple_sfv:dummy_field"], entity_rows=[{"test_key": 1001}], ).to_dict(include_event_timestamps=True) assert "test_key" in features From 24f5ae7e3eff4e6cfb9577c27bb0e5f9b40cb2c7 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 2 Jun 2022 15:59:06 -0700 Subject: [PATCH 5/6] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/repo_operations.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 353d49ece1..c0f28a74bd 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -138,7 +138,10 @@ def parse_repo(repo_root: Path) -> RepoContents: # Don't add if the push source's batch source is a duplicate of an existing batch source if push_source_dep not in data_sources_set: res.data_sources.append(push_source_dep) - elif isinstance(obj, StreamFeatureView): + elif ( + isinstance(obj, StreamFeatureView) + and not any((obj is sfv) for sfv in res.stream_feature_views) + ): res.stream_feature_views.append(obj) if ( isinstance(obj.stream_source, PushSource) From a2a67a0b45c5c82b5bb04721464644690f94e773 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 2 Jun 2022 16:00:14 -0700 Subject: [PATCH 6/6] Reformat Signed-off-by: Kevin Zhang --- sdk/python/feast/repo_operations.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index c0f28a74bd..37daa6500e 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -138,9 +138,8 @@ def parse_repo(repo_root: Path) -> RepoContents: # Don't add if the push source's batch source is a duplicate of an existing batch source if push_source_dep not in data_sources_set: res.data_sources.append(push_source_dep) - elif ( - isinstance(obj, StreamFeatureView) - and not any((obj is sfv) for sfv in res.stream_feature_views) + elif isinstance(obj, StreamFeatureView) and not any( + (obj is sfv) for sfv in res.stream_feature_views ): res.stream_feature_views.append(obj) if (