Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming sources to the FeatureView API #1664

Merged
merged 9 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ message FeatureViewSpec {
google.protobuf.Duration ttl = 6;

// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource input = 7;
DataSource batch_source = 7;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 9;
woop marked this conversation as resolved.
Show resolved Hide resolved

// Whether these features should be served online or not
bool online = 8;
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,9 @@ def __init__(
)

def __eq__(self, other):
if other is None:
return False

if not isinstance(other, KinesisSource):
raise TypeError(
"Comparisons should only involve KinesisSource class objects."
Expand Down
39 changes: 33 additions & 6 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class FeatureView:
ttl: Optional[timedelta]
online: bool
input: DataSource

batch_source: Optional[DataSource] = None
woop marked this conversation as resolved.
Show resolved Hide resolved
stream_source: Optional[DataSource] = None
created_timestamp: Optional[Timestamp] = None
last_updated_timestamp: Optional[Timestamp] = None
materialization_intervals: List[Tuple[datetime, datetime]]
Expand All @@ -62,15 +63,22 @@ def __init__(
entities: List[str],
ttl: Optional[Union[Duration, timedelta]],
input: DataSource,
batch_source: Optional[DataSource] = None,
stream_source: Optional[DataSource] = None,
features: List[Feature] = [],
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
_input = input or batch_source
assert _input is not None

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
if _input.field_mapping is not None and col in _input.field_mapping.keys():
raise ValueError(
f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name."
f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. "
f"Please either remove this field mapping or use {_input.field_mapping[col]} as the "
f"Entity or Feature name."
)

self.name = name
Expand All @@ -84,7 +92,9 @@ def __init__(
self.ttl = ttl

self.online = online
self.input = input
self.input = _input
self.batch_source = _input
self.stream_source = stream_source
woop marked this conversation as resolved.
Show resolved Hide resolved

self.materialization_intervals = []

Expand Down Expand Up @@ -118,6 +128,8 @@ def __eq__(self, other):
return False
if self.input != other.input:
return False
if self.stream_source != other.stream_source:
return False

return True

Expand Down Expand Up @@ -157,14 +169,21 @@ def to_proto(self) -> FeatureViewProto:
ttl_duration = Duration()
ttl_duration.FromTimedelta(self.ttl)

print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}")

spec = FeatureViewSpecProto(
name=self.name,
entities=self.entities,
features=[feature.to_proto() for feature in self.features],
tags=self.tags,
ttl=(ttl_duration if ttl_duration is not None else None),
online=self.online,
input=self.input.to_proto(),
batch_source=self.input.to_proto(),
stream_source=(
self.stream_source.to_proto()
if self.stream_source is not None
else None
),
)

return FeatureViewProto(spec=spec, meta=meta)
Expand All @@ -181,6 +200,12 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
Returns a FeatureViewProto object based on the feature view protobuf
"""

_input = DataSource.from_proto(feature_view_proto.spec.batch_source)
stream_source = (
DataSource.from_proto(feature_view_proto.spec.stream_source)
if feature_view_proto.spec.HasField("stream_source")
else None
)
feature_view = cls(
name=feature_view_proto.spec.name,
entities=[entity for entity in feature_view_proto.spec.entities],
Expand All @@ -200,7 +225,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
and feature_view_proto.spec.ttl.nanos == 0
else feature_view_proto.spec.ttl
),
input=DataSource.from_proto(feature_view_proto.spec.input),
input=_input,
batch_source=_input,
stream_source=stream_source,
)

feature_view.created_timestamp = feature_view_proto.meta.created_timestamp
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tensorflow_metadata/proto/v0/path_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.