Skip to content

Commit

Permalink
feat: Updating protos for Projections to include more info
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Sep 17, 2024
1 parent 7535b40 commit c961c8a
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 44 deletions.
10 changes: 10 additions & 0 deletions protos/feast/core/FeatureViewProjection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto";
option java_package = "feast.proto.core";

import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";


// A projection to be applied on top of a FeatureView.
Expand All @@ -22,4 +23,13 @@ message FeatureViewProjection {

// Map for entity join_key overrides of feature data entity join_key to entity data join_key
map<string,string> join_key_map = 4;

string timestamp_field = 5;
string date_partition_column = 6;
string created_timestamp_column = 7;
// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource batch_source = 8;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 9;

}
6 changes: 6 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec {
// Owner of the on demand feature view.
string owner = 8;
string mode = 11;
bool write_to_online_store = 12;

// List of names of entities associated with this feature view.
repeated string entities = 13;
// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 14;
}

message OnDemandFeatureViewMeta {
Expand Down
35 changes: 35 additions & 0 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from attr import dataclass

from feast.data_source import DataSource
from feast.field import Field
from feast.protos.feast.core.FeatureViewProjection_pb2 import (
FeatureViewProjection as FeatureViewProjectionProto,
Expand All @@ -27,13 +28,24 @@ class FeatureViewProjection:
is not ready to be projected, i.e. still needs to go through feature inference.
join_key_map: A map to modify join key columns during retrieval of this feature
view projection.
timestamp_field: The timestamp field of the feature view projection.
date_partition_column: The date partition column of the feature view projection.
created_timestamp_column: The created timestamp column of the feature view projection.
batch_source: The batch source of data where this group of features
is stored. This is optional ONLY if a push source is specified as the
stream_source, since push sources contain their own batch sources.
"""

name: str
name_alias: Optional[str]
desired_features: List[str]
features: List[Field]
join_key_map: Dict[str, str] = {}
timestamp_field: Optional[str] = None
date_partition_column: Optional[str] = None
created_timestamp_column: Optional[str] = None
batch_source: Optional[DataSource] = None

def name_to_use(self):
return self.name_alias or self.name
Expand All @@ -43,6 +55,10 @@ def to_proto(self) -> FeatureViewProjectionProto:
feature_view_name=self.name,
feature_view_name_alias=self.name_alias or "",
join_key_map=self.join_key_map,
timestamp_field=self.timestamp_field,
date_partition_column=self.date_partition_column,
created_timestamp_column=self.created_timestamp_column,
batch_source=self.batch_source.to_proto() or None,
)
for feature in self.features:
feature_reference_proto.feature_columns.append(feature.to_proto())
Expand All @@ -57,6 +73,10 @@ def from_proto(proto: FeatureViewProjectionProto):
features=[],
join_key_map=dict(proto.join_key_map),
desired_features=[],
timestamp_field=proto.timestamp_field or None,
date_partition_column=proto.date_partition_column or None,
created_timestamp_column=proto.created_timestamp_column or None,
batch_source=proto.batch_source or None,
)
for feature_column in proto.feature_columns:
feature_view_projection.features.append(Field.from_proto(feature_column))
Expand All @@ -65,6 +85,21 @@ def from_proto(proto: FeatureViewProjectionProto):

@staticmethod
def from_definition(base_feature_view: "BaseFeatureView"):
# TODO need to implement this for StreamFeatureViews
if getattr(base_feature_view, "batch_source", None):
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
timestamp_field=base_feature_view.batch_source.created_timestamp_column
or None,
created_timestamp_column=base_feature_view.batch_source.created_timestamp_column
or None,
date_partition_column=base_feature_view.batch_source.date_partition_column
or None,
batch_source=base_feature_view.batch_source or None,
)
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
Expand Down
158 changes: 114 additions & 44 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.types import String
Expand Down Expand Up @@ -94,7 +95,7 @@ def update_data_sources_with_inferred_event_timestamp_col(


def update_feature_views_with_inferred_features_and_entities(
fvs: Union[List[FeatureView], List[StreamFeatureView]],
fvs: Union[List[FeatureView], List[StreamFeatureView], List[OnDemandFeatureView]],
entities: List[Entity],
config: RepoConfig,
) -> None:
Expand Down Expand Up @@ -127,13 +128,14 @@ def update_feature_views_with_inferred_features_and_entities(

# Fields whose names match a join key are considered to be entity columns; all
# other fields are considered to be feature columns.
entity_columns = fv.entity_columns if fv.entity_columns else []
for field in fv.schema:
if field.name in join_keys:
# Do not override a preexisting field with the same name.
if field.name not in [
entity_column.name for entity_column in fv.entity_columns
entity_column.name for entity_column in entity_columns
]:
fv.entity_columns.append(field)
entity_columns.append(field)
else:
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)
Expand All @@ -146,10 +148,10 @@ def update_feature_views_with_inferred_features_and_entities(
continue
if (
entity.join_key
not in [entity_column.name for entity_column in fv.entity_columns]
not in [entity_column.name for entity_column in entity_columns]
and entity.value_type != ValueType.UNKNOWN
):
fv.entity_columns.append(
entity_columns.append(
Field(
name=entity.join_key,
dtype=from_value_type(entity.value_type),
Expand All @@ -160,10 +162,11 @@ def update_feature_views_with_inferred_features_and_entities(
if (
len(fv.entities) == 1
and fv.entities[0] == DUMMY_ENTITY_NAME
and not fv.entity_columns
and not entity_columns
):
fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String))
entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String))

fv.entity_columns = entity_columns
# Run inference for entity columns if there are fewer entity fields than expected.
run_inference_for_entities = len(fv.entity_columns) < len(join_keys)

Expand Down Expand Up @@ -200,49 +203,116 @@ def _infer_features_and_entities(
run_inference_for_features: Whether to run inference for features.
config: The config for the current feature store.
"""
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
}
for original_col, mapped_col in fv.batch_source.field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
config
)

for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=col_name,
dtype=from_value_type(
fv.batch_source.source_datatype_to_feast_value_type()(col_datatype)
),
entity_columns = []
if isinstance(fv, OnDemandFeatureView):
columns_to_exclude = set()
for (
source_feature_view_name,
source_feature_view,
) in fv.source_feature_view_projections.items():
columns_to_exclude.add(source_feature_view.timestamp_field)
columns_to_exclude.add(source_feature_view.created_timestamp_column)

for (
original_col,
mapped_col,
) in source_feature_view.batch_source.field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = (
source_feature_view.batch_source.get_table_column_names_and_types(
config
)
)
if field.name not in [
entity_column.name for entity_column in fv.entity_columns
]:
fv.entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
fv.batch_source.field_mapping[col_name]
if col_name in fv.batch_source.field_mapping
else col_name

for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=col_name,
dtype=from_value_type(
source_feature_view.batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
entity_column.name for entity_column in entity_columns
]:
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
source_feature_view.batch_source.field_mapping[col_name]
if col_name in source_feature_view.batch_source.field_mapping
else col_name
)
field = Field(
name=feature_name,
dtype=from_value_type(
source_feature_view.batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
feature.name for feature in source_feature_view.features
]:
source_feature_view.features.append(field)

else:
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
}
for original_col, mapped_col in fv.batch_source.field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
config
)

for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=feature_name,
name=col_name,
dtype=from_value_type(
fv.batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)
if field.name not in [
entity_column.name for entity_column in entity_columns
]:
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
fv.batch_source.field_mapping[col_name]
if col_name in fv.batch_source.field_mapping
else col_name
)
field = Field(
name=feature_name,
dtype=from_value_type(
fv.batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)

fv.entity_columns = entity_columns

0 comments on commit c961c8a

Please sign in to comment.