Skip to content

Commit

Permalink
feat: Update stream fcos to have watermark and sliding interval (#2765)
Browse files Browse the repository at this point in the history
* Add sliding window to aggregations

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* update apis

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba authored Jun 8, 2022
1 parent d25e8d4 commit 3256952
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 3 deletions.
1 change: 1 addition & 0 deletions protos/feast/core/Aggregation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ message Aggregation {
string column = 1;
string function = 2;
google.protobuf.Duration time_window = 3;
google.protobuf.Duration slide_interval = 4;
}
2 changes: 2 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "DataSourceProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";
import "feast/core/Feature.proto";
Expand Down Expand Up @@ -135,6 +136,7 @@ message DataSource {
// Defines the stream data format encoding feature/entity data in Kafka messages.
StreamFormat message_format = 3;

google.protobuf.Duration watermark = 4;
}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down
24 changes: 23 additions & 1 deletion sdk/python/feast/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,45 @@ class Aggregation:
column: str # Column name of the feature we are aggregating.
function: str # Provided built in aggregations sum, max, min, count mean
time_window: timedelta # The time window for this aggregation.
slide_interval: timedelta # The sliding window for these aggregations
"""

column: str
function: str
time_window: Optional[timedelta]
slide_interval: Optional[timedelta]

def __init__(
self,
column: Optional[str] = "",
function: Optional[str] = "",
time_window: Optional[timedelta] = None,
slide_interval: Optional[timedelta] = None,
):
self.column = column or ""
self.function = function or ""
self.time_window = time_window
if not slide_interval:
self.slide_interval = self.time_window
else:
self.slide_interval = slide_interval

def to_proto(self) -> AggregationProto:
window_duration = None
if self.time_window is not None:
window_duration = Duration()
window_duration.FromTimedelta(self.time_window)

slide_interval_duration = None
if self.slide_interval is not None:
slide_interval_duration = Duration()
slide_interval_duration.FromTimedelta(self.slide_interval)

return AggregationProto(
column=self.column, function=self.function, time_window=window_duration
column=self.column,
function=self.function,
time_window=window_duration,
slide_interval=slide_interval_duration,
)

@classmethod
Expand All @@ -48,10 +63,16 @@ def from_proto(cls, agg_proto: AggregationProto):
else agg_proto.time_window.ToTimedelta()
)

slide_interval = (
timedelta(days=0)
if agg_proto.slide_interval.ToNanoseconds() == 0
else agg_proto.slide_interval.ToTimedelta()
)
aggregation = cls(
column=agg_proto.column,
function=agg_proto.function,
time_window=time_window,
slide_interval=slide_interval,
)
return aggregation

Expand All @@ -63,6 +84,7 @@ def __eq__(self, other):
self.column != other.column
or self.function != other.function
or self.time_window != other.time_window
or self.slide_interval != other.slide_interval
):
return False

Expand Down
59 changes: 57 additions & 2 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import enum
import warnings
from abc import ABC, abstractmethod
from datetime import timedelta
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from google.protobuf.duration_pb2 import Duration
from google.protobuf.json_format import MessageToJson

from feast import type_map
Expand Down Expand Up @@ -47,11 +49,16 @@ class KafkaOptions:
"""

def __init__(
self, bootstrap_servers: str, message_format: StreamFormat, topic: str,
self,
bootstrap_servers: str,
message_format: StreamFormat,
topic: str,
watermark: Optional[timedelta] = None,
):
self.bootstrap_servers = bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark = watermark or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -64,11 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Returns:
Returns a BigQueryOptions object based on the kafka_options protobuf
"""

watermark = None
if kafka_options_proto.HasField("watermark"):
watermark = (
timedelta(days=0)
if kafka_options_proto.watermark.ToNanoseconds() == 0
else kafka_options_proto.watermark.ToTimedelta()
)
kafka_options = cls(
bootstrap_servers=kafka_options_proto.bootstrap_servers,
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark=watermark,
)

return kafka_options
Expand All @@ -80,11 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
Returns:
KafkaOptionsProto protobuf
"""
watermark_duration = None
if self.watermark is not None:
watermark_duration = Duration()
watermark_duration.FromTimedelta(self.watermark)

kafka_options_proto = DataSourceProto.KafkaOptions(
bootstrap_servers=self.bootstrap_servers,
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark=watermark_duration,
)

return kafka_options_proto
Expand Down Expand Up @@ -369,7 +388,32 @@ def __init__(
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
watermark: Optional[timedelta] = None,
):
"""
Creates a KafkaSource stream source object.
Args:
name: str. Name of data source, which should be unique within a project
event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time
joins of feature values.
bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092".
message_format: StreamFormat. StreamFormat of serialized messages.
topic: str. The name of the topic to read from in the kafka source.
created_timestamp_column (optional): str. Timestamp column indicating when the row
was created, used for deduplicating rows.
field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data
source to feature names in a feature table or view. Only used for feature
columns, not entity or timestamp columns.
date_partition_column (optional): str. Timestamp column used for partitioning.
description (optional): str. A human-readable description.
tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): str. The owner of the data source, typically the email of the primary
maintainer.
timestamp_field (optional): str. Event timestamp field used for point
in time joins of feature values.
batch_source: DataSource. The datasource that acts as a batch source.
watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded.
"""
positional_attributes = [
"name",
"event_timestamp_column",
Expand Down Expand Up @@ -425,10 +469,12 @@ def __init__(
timestamp_field=timestamp_field,
)
self.batch_source = batch_source

self.kafka_options = KafkaOptions(
bootstrap_servers=_bootstrap_servers,
message_format=_message_format,
topic=_topic,
watermark=watermark,
)

def __eq__(self, other):
Expand All @@ -445,6 +491,7 @@ def __eq__(self, other):
!= other.kafka_options.bootstrap_servers
or self.kafka_options.message_format != other.kafka_options.message_format
or self.kafka_options.topic != other.kafka_options.topic
or self.kafka_options.watermark != other.kafka_options.watermark
):
return False

Expand All @@ -455,6 +502,13 @@ def __hash__(self):

@staticmethod
def from_proto(data_source: DataSourceProto):
watermark = None
if data_source.kafka_options.HasField("watermark"):
watermark = (
timedelta(days=0)
if data_source.kafka_options.watermark.ToNanoseconds() == 0
else data_source.kafka_options.watermark.ToTimedelta()
)
return KafkaSource(
name=data_source.name,
event_timestamp_column=data_source.timestamp_field,
Expand All @@ -463,6 +517,7 @@ def from_proto(data_source: DataSourceProto):
message_format=StreamFormat.from_proto(
data_source.kafka_options.message_format
),
watermark=watermark,
topic=data_source.kafka_options.topic,
created_timestamp_column=data_source.created_timestamp_column,
timestamp_field=data_source.timestamp_field,
Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def simple_udf(x: int):
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
watermark=timedelta(days=1),
)

sfv = StreamFeatureView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_apply_stream_feature_view(environment) -> None:
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="test_path", timestamp_field="event_timestamp"),
watermark=timedelta(days=1),
)

@stream_feature_view(
Expand Down

0 comments on commit 3256952

Please sign in to comment.