diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index d98f54825a..f39fcf5e73 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -59,7 +59,9 @@ message FeatureViewSpec { google.protobuf.Duration ttl = 6; // Batch/Offline DataSource where this view can retrieve offline feature data. - DataSource input = 7; + DataSource batch_source = 7; + // Streaming DataSource from where this view can consume "online" feature data. + DataSource stream_source = 9; // Whether these features should be served online or not bool online = 8; diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index a5a620b55b..7480d7fd4f 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -869,6 +869,9 @@ def __init__( ) def __eq__(self, other): + if other is None: + return False + if not isinstance(other, KinesisSource): raise TypeError( "Comparisons should only involve KinesisSource class objects." diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index db22fd7e4a..3d20b9334f 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -50,7 +50,8 @@ class FeatureView: ttl: Optional[timedelta] online: bool input: DataSource - + batch_source: Optional[DataSource] = None + stream_source: Optional[DataSource] = None created_timestamp: Optional[Timestamp] = None last_updated_timestamp: Optional[Timestamp] = None materialization_intervals: List[Tuple[datetime, datetime]] @@ -62,15 +63,22 @@ def __init__( entities: List[str], ttl: Optional[Union[Duration, timedelta]], input: DataSource, + batch_source: Optional[DataSource] = None, + stream_source: Optional[DataSource] = None, features: List[Feature] = [], tags: Optional[Dict[str, str]] = None, online: bool = True, ): + _input = input or batch_source + assert _input is not None + cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: - if input.field_mapping is not None and col in input.field_mapping.keys(): + if _input.field_mapping is not None and col in _input.field_mapping.keys(): raise ValueError( - f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name." + f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. " + f"Please either remove this field mapping or use {_input.field_mapping[col]} as the " + f"Entity or Feature name." ) self.name = name @@ -84,7 +92,9 @@ def __init__( self.ttl = ttl self.online = online - self.input = input + self.input = _input + self.batch_source = _input + self.stream_source = stream_source self.materialization_intervals = [] @@ -118,6 +128,8 @@ def __eq__(self, other): return False if self.input != other.input: return False + if self.stream_source != other.stream_source: + return False return True @@ -157,6 +169,8 @@ def to_proto(self) -> FeatureViewProto: ttl_duration = Duration() ttl_duration.FromTimedelta(self.ttl) + print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}") + spec = FeatureViewSpecProto( name=self.name, entities=self.entities, @@ -164,7 +178,12 @@ def to_proto(self) -> FeatureViewProto: tags=self.tags, ttl=(ttl_duration if ttl_duration is not None else None), online=self.online, - input=self.input.to_proto(), + batch_source=self.input.to_proto(), + stream_source=( + self.stream_source.to_proto() + if self.stream_source is not None + else None + ), ) return FeatureViewProto(spec=spec, meta=meta) @@ -181,6 +200,12 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): Returns a FeatureViewProto object based on the feature view protobuf """ + _input = DataSource.from_proto(feature_view_proto.spec.batch_source) + stream_source = ( + DataSource.from_proto(feature_view_proto.spec.stream_source) + if feature_view_proto.spec.HasField("stream_source") + else None + ) feature_view = cls( name=feature_view_proto.spec.name, entities=[entity for entity in feature_view_proto.spec.entities], @@ -200,7 +225,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): and feature_view_proto.spec.ttl.nanos == 0 else feature_view_proto.spec.ttl ), - input=DataSource.from_proto(feature_view_proto.spec.input), + input=_input, + batch_source=_input, + stream_source=stream_source, ) feature_view.created_timestamp = feature_view_proto.meta.created_timestamp diff --git a/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py index d732119ead..4b6dec828c 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/path.proto - +"""Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection diff --git a/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py index 78fda8003d..d3bfc50616 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/schema.proto - +"""Generated protocol buffer code.""" from google.protobuf.internal import enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message diff --git a/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py index d8e12bd120..21473adc75 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/statistics.proto - +"""Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection