From c961c8a33d883047c432df15d8ff47ae1be7e49b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 17 Sep 2024 15:17:29 -0400 Subject: [PATCH] feat: Updating protos for Projections to include more info Signed-off-by: Francisco Javier Arceo --- protos/feast/core/FeatureViewProjection.proto | 10 ++ protos/feast/core/OnDemandFeatureView.proto | 6 + sdk/python/feast/feature_view_projection.py | 35 ++++ sdk/python/feast/inference.py | 158 +++++++++++++----- 4 files changed, 165 insertions(+), 44 deletions(-) diff --git a/protos/feast/core/FeatureViewProjection.proto b/protos/feast/core/FeatureViewProjection.proto index 36d17632e7d..b0e697b656f 100644 --- a/protos/feast/core/FeatureViewProjection.proto +++ b/protos/feast/core/FeatureViewProjection.proto @@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto"; option java_package = "feast.proto.core"; import "feast/core/Feature.proto"; +import "feast/core/DataSource.proto"; // A projection to be applied on top of a FeatureView. @@ -22,4 +23,13 @@ message FeatureViewProjection { // Map for entity join_key overrides of feature data entity join_key to entity data join_key map join_key_map = 4; + + string timestamp_field = 5; + string date_partition_column = 6; + string created_timestamp_column = 7; + // Batch/Offline DataSource where this view can retrieve offline feature data. + DataSource batch_source = 8; + // Streaming DataSource from where this view can consume "online" feature data. + DataSource stream_source = 9; + } diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 7a5fec16504..c915e32e16a 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec { // Owner of the on demand feature view. string owner = 8; string mode = 11; + bool write_to_online_store = 12; + + // List of names of entities associated with this feature view. + repeated string entities = 13; + // List of specifications for each entity defined as part of this feature view. + repeated FeatureSpecV2 entity_columns = 14; } message OnDemandFeatureViewMeta { diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index ff5b1b6e063..75e49d0b16c 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -2,6 +2,7 @@ from attr import dataclass +from feast.data_source import DataSource from feast.field import Field from feast.protos.feast.core.FeatureViewProjection_pb2 import ( FeatureViewProjection as FeatureViewProjectionProto, @@ -27,6 +28,13 @@ class FeatureViewProjection: is not ready to be projected, i.e. still needs to go through feature inference. join_key_map: A map to modify join key columns during retrieval of this feature view projection. + timestamp_field: The timestamp field of the feature view projection. + date_partition_column: The date partition column of the feature view projection. + created_timestamp_column: The created timestamp column of the feature view projection. + batch_source: The batch source of data where this group of features + is stored. This is optional ONLY if a push source is specified as the + stream_source, since push sources contain their own batch sources. + """ name: str @@ -34,6 +42,10 @@ class FeatureViewProjection: desired_features: List[str] features: List[Field] join_key_map: Dict[str, str] = {} + timestamp_field: Optional[str] = None + date_partition_column: Optional[str] = None + created_timestamp_column: Optional[str] = None + batch_source: Optional[DataSource] = None def name_to_use(self): return self.name_alias or self.name @@ -43,6 +55,10 @@ def to_proto(self) -> FeatureViewProjectionProto: feature_view_name=self.name, feature_view_name_alias=self.name_alias or "", join_key_map=self.join_key_map, + timestamp_field=self.timestamp_field, + date_partition_column=self.date_partition_column, + created_timestamp_column=self.created_timestamp_column, + batch_source=self.batch_source.to_proto() or None, ) for feature in self.features: feature_reference_proto.feature_columns.append(feature.to_proto()) @@ -57,6 +73,10 @@ def from_proto(proto: FeatureViewProjectionProto): features=[], join_key_map=dict(proto.join_key_map), desired_features=[], + timestamp_field=proto.timestamp_field or None, + date_partition_column=proto.date_partition_column or None, + created_timestamp_column=proto.created_timestamp_column or None, + batch_source=proto.batch_source or None, ) for feature_column in proto.feature_columns: feature_view_projection.features.append(Field.from_proto(feature_column)) @@ -65,6 +85,21 @@ def from_proto(proto: FeatureViewProjectionProto): @staticmethod def from_definition(base_feature_view: "BaseFeatureView"): + # TODO need to implement this for StreamFeatureViews + if getattr(base_feature_view, "batch_source", None): + return FeatureViewProjection( + name=base_feature_view.name, + name_alias=None, + features=base_feature_view.features, + desired_features=[], + timestamp_field=base_feature_view.batch_source.created_timestamp_column + or None, + created_timestamp_column=base_feature_view.batch_source.created_timestamp_column + or None, + date_partition_column=base_feature_view.batch_source.date_partition_column + or None, + batch_source=base_feature_view.batch_source or None, + ) return FeatureViewProjection( name=base_feature_view.name, name_alias=None, diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 28a170172c8..7466edffe8c 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -13,6 +13,7 @@ from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView from feast.types import String @@ -94,7 +95,7 @@ def update_data_sources_with_inferred_event_timestamp_col( def update_feature_views_with_inferred_features_and_entities( - fvs: Union[List[FeatureView], List[StreamFeatureView]], + fvs: Union[List[FeatureView], List[StreamFeatureView], List[OnDemandFeatureView]], entities: List[Entity], config: RepoConfig, ) -> None: @@ -127,13 +128,14 @@ def update_feature_views_with_inferred_features_and_entities( # Fields whose names match a join key are considered to be entity columns; all # other fields are considered to be feature columns. + entity_columns = fv.entity_columns if fv.entity_columns else [] for field in fv.schema: if field.name in join_keys: # Do not override a preexisting field with the same name. if field.name not in [ - entity_column.name for entity_column in fv.entity_columns + entity_column.name for entity_column in entity_columns ]: - fv.entity_columns.append(field) + entity_columns.append(field) else: if field.name not in [feature.name for feature in fv.features]: fv.features.append(field) @@ -146,10 +148,10 @@ def update_feature_views_with_inferred_features_and_entities( continue if ( entity.join_key - not in [entity_column.name for entity_column in fv.entity_columns] + not in [entity_column.name for entity_column in entity_columns] and entity.value_type != ValueType.UNKNOWN ): - fv.entity_columns.append( + entity_columns.append( Field( name=entity.join_key, dtype=from_value_type(entity.value_type), @@ -160,10 +162,11 @@ def update_feature_views_with_inferred_features_and_entities( if ( len(fv.entities) == 1 and fv.entities[0] == DUMMY_ENTITY_NAME - and not fv.entity_columns + and not entity_columns ): - fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) + entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) + fv.entity_columns = entity_columns # Run inference for entity columns if there are fewer entity fields than expected. run_inference_for_entities = len(fv.entity_columns) < len(join_keys) @@ -200,49 +203,116 @@ def _infer_features_and_entities( run_inference_for_features: Whether to run inference for features. config: The config for the current feature store. """ - columns_to_exclude = { - fv.batch_source.timestamp_field, - fv.batch_source.created_timestamp_column, - } - for original_col, mapped_col in fv.batch_source.field_mapping.items(): - if mapped_col in columns_to_exclude: - columns_to_exclude.remove(mapped_col) - columns_to_exclude.add(original_col) - - table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( - config - ) - - for col_name, col_datatype in table_column_names_and_types: - if col_name in columns_to_exclude: - continue - elif col_name in join_keys: - field = Field( - name=col_name, - dtype=from_value_type( - fv.batch_source.source_datatype_to_feast_value_type()(col_datatype) - ), + entity_columns = [] + if isinstance(fv, OnDemandFeatureView): + columns_to_exclude = set() + for ( + source_feature_view_name, + source_feature_view, + ) in fv.source_feature_view_projections.items(): + columns_to_exclude.add(source_feature_view.timestamp_field) + columns_to_exclude.add(source_feature_view.created_timestamp_column) + + for ( + original_col, + mapped_col, + ) in source_feature_view.batch_source.field_mapping.items(): + if mapped_col in columns_to_exclude: + columns_to_exclude.remove(mapped_col) + columns_to_exclude.add(original_col) + + table_column_names_and_types = ( + source_feature_view.batch_source.get_table_column_names_and_types( + config + ) ) - if field.name not in [ - entity_column.name for entity_column in fv.entity_columns - ]: - fv.entity_columns.append(field) - elif not re.match( - "^__|__$", col_name - ): # double underscores often signal an internal-use column - if run_inference_for_features: - feature_name = ( - fv.batch_source.field_mapping[col_name] - if col_name in fv.batch_source.field_mapping - else col_name + + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: + field = Field( + name=col_name, + dtype=from_value_type( + source_feature_view.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), ) + if field.name not in [ + entity_column.name for entity_column in entity_columns + ]: + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + source_feature_view.batch_source.field_mapping[col_name] + if col_name in source_feature_view.batch_source.field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + source_feature_view.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [ + feature.name for feature in source_feature_view.features + ]: + source_feature_view.features.append(field) + + else: + columns_to_exclude = { + fv.batch_source.timestamp_field, + fv.batch_source.created_timestamp_column, + } + for original_col, mapped_col in fv.batch_source.field_mapping.items(): + if mapped_col in columns_to_exclude: + columns_to_exclude.remove(mapped_col) + columns_to_exclude.add(original_col) + + table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( + config + ) + + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: field = Field( - name=feature_name, + name=col_name, dtype=from_value_type( fv.batch_source.source_datatype_to_feast_value_type()( col_datatype ) ), ) - if field.name not in [feature.name for feature in fv.features]: - fv.features.append(field) + if field.name not in [ + entity_column.name for entity_column in entity_columns + ]: + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + fv.batch_source.field_mapping[col_name] + if col_name in fv.batch_source.field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + fv.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [feature.name for feature in fv.features]: + fv.features.append(field) + + fv.entity_columns = entity_columns