Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Stream Feature View FCOS #2750

Merged
merged 22 commits into from
Jun 1, 2022
14 changes: 14 additions & 0 deletions protos/feast/core/Aggregation.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "AggregationProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";

message Aggregation {
string column = 1;
string function = 2;
google.protobuf.Duration time_window = 3;
}
9 changes: 7 additions & 2 deletions protos/feast/core/DataFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ option java_package = "feast.proto.core";
message FileFormat {
// Defines options for the Parquet data format
message ParquetFormat {}

oneof format {
ParquetFormat parquet_format = 1;
}
Expand All @@ -40,17 +40,22 @@ message StreamFormat {
// Feature data from the obtained stream message
string class_path = 1;
}

// Defines options for the avro data format
message AvroFormat {
// Optional if used in a File DataSource as schema is embedded in avro file.
// Specifies the schema of the Avro message as JSON string.
string schema_json = 1;
}

message JsonFormat {
string schema_json = 1;
}

// Specifies the data format and format specific options
oneof format {
AvroFormat avro_format = 1;
ProtoFormat proto_format = 2;
JsonFormat json_format = 3;
}
}
1 change: 0 additions & 1 deletion protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "FeatureServiceProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";

Expand Down
4 changes: 3 additions & 1 deletion protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ import "feast/core/FeatureView.proto";
import "feast/core/InfraObject.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/StreamFeatureView.proto";
import "feast/core/DataSource.proto";
import "feast/core/SavedDataset.proto";
import "feast/core/ValidationProfile.proto";
import "google/protobuf/timestamp.proto";

// Next id: 14
// Next id: 15
message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
repeated FeatureView feature_views = 6;
repeated DataSource data_sources = 12;
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated RequestFeatureView request_feature_views = 9;
repeated StreamFeatureView stream_feature_views = 14;
repeated FeatureService feature_services = 7;
repeated SavedDataset saved_datasets = 11;
repeated ValidationReference validation_references = 13;
Expand Down
98 changes: 98 additions & 0 deletions protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//
// Copyright 2020 The Feast Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//


syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "StreamFeatureViewProto";
option java_package = "feast.proto.core";


import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Aggregation.proto";

message StreamFeatureView {
// User-specified specifications of this feature view.
StreamFeatureViewSpec spec = 1;
StreamFeatureViewMeta meta = 2;
}

// Next available id: 17
message StreamFeatureViewSpec {
// Name of the feature view. Must be unique. Not updated.
string name = 1;

// Name of Feast project that this feature view belongs to.
string project = 2;

// List of names of entities associated with this feature view.
repeated string entities = 3;

// List of specifications for each feature defined as part of this feature view.
repeated FeatureSpecV2 features = 4;

// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 5;

// Description of the feature view.
string description = 6;

// User defined metadata
map<string,string> tags = 7;

// Owner of the feature view.
string owner = 8;

// Features in this feature view can only be retrieved from online serving
// younger than ttl. Ttl is measured as the duration of time between
// the feature's event timestamp and when the feature is retrieved
// Feature values outside ttl will be returned as unset values and indicated to end user
google.protobuf.Duration ttl = 9;

// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource batch_source = 10;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 11;

// Whether these features should be served online or not
bool online = 12;

// Serialized function that is encoded in the streamfeatureview
UserDefinedFunction user_defined_function = 13;
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved

// Mode of execution
string mode = 14;

// Aggregation definitions
repeated Aggregation aggregations = 15;

// Timestamp field for aggregation
string timestamp_field = 16;
}

message StreamFeatureViewMeta {
// Time where this Feature View is created
google.protobuf.Timestamp created_timestamp = 1;

// Time where this Feature View is last updated
google.protobuf.Timestamp last_updated_timestamp = 2;
}
2 changes: 0 additions & 2 deletions protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ option java_package = "feast.proto.core";
option java_outer_classname = "ValidationProfile";
option go_package = "github.com/feast-dev/feast/go/protos/feast/core";

import "feast/core/SavedDataset.proto";

message GEValidationProfiler {
message UserDefinedProfiler {
// The python-syntax function body (serialized by dill)
Expand Down
69 changes: 69 additions & 0 deletions sdk/python/feast/aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import timedelta
from typing import Optional

from google.protobuf.duration_pb2 import Duration

from feast.protos.feast.core.Aggregation_pb2 import Aggregation as AggregationProto


class Aggregation:
"""
NOTE: Feast-handled aggregations are not yet supported. This class provides a way to register user-defined aggregations.

Attributes:
column: str # Column name of the feature we are aggregating.
function: str # Provided built in aggregations sum, max, min, count mean
time_window: timedelta # The time window for this aggregation.
"""

column: str
function: str
time_window: Optional[timedelta]

def __init__(
self,
column: Optional[str] = "",
function: Optional[str] = "",
time_window: Optional[timedelta] = None,
):
self.column = column or ""
self.function = function or ""
self.time_window = time_window

def to_proto(self) -> AggregationProto:
window_duration = None
if self.time_window is not None:
window_duration = Duration()
window_duration.FromTimedelta(self.time_window)

return AggregationProto(
column=self.column, function=self.function, time_window=window_duration
)

@classmethod
def from_proto(cls, agg_proto: AggregationProto):
time_window = (
timedelta(days=0)
if agg_proto.time_window.ToNanoseconds() == 0
else agg_proto.time_window.ToTimedelta()
)

aggregation = cls(
column=agg_proto.column,
function=agg_proto.function,
time_window=time_window,
)
return aggregation

def __eq__(self, other):
if not isinstance(other, Aggregation):
raise TypeError("Comparisons should only involve Aggregations.")

if (
self.column != other.column
or self.function != other.function
or self.time_window != other.time_window
):
return False

return True
24 changes: 24 additions & 0 deletions sdk/python/feast/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def from_proto(cls, proto):
fmt = proto.WhichOneof("format")
if fmt == "avro_format":
return AvroFormat(schema_json=proto.avro_format.schema_json)
if fmt == "json_format":
return JsonFormat(schema_json=proto.json_format.schema_json)
if fmt == "proto_format":
return ProtoFormat(class_path=proto.proto_format.class_path)
raise NotImplementedError(f"StreamFormat is unsupported: {fmt}")
Expand All @@ -113,6 +115,28 @@ def to_proto(self):
return StreamFormatProto(avro_format=proto)


class JsonFormat(StreamFormat):
"""
Defines the Json streaming data format that encodes data in Json format
"""

def __init__(self, schema_json: str):
"""
Construct a new Json data format.

For spark, uses pyspark ddl string format. Example shown here:
https://vincent.doba.fr/posts/20211004_spark_data_description_language_for_defining_spark_schema/

Args:
schema_json: Json schema definition
"""
self.schema_json = schema_json

def to_proto(self):
proto = StreamFormatProto.JsonFormat(schema_json=self.schema_json)
return StreamFormatProto(json_format=proto)


class ProtoFormat(StreamFormat):
"""
Defines the Protobuf data format
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ def __init__(
if _message_format is None:
raise ValueError("Message format must be specified for Kafka source")

if not timestamp_field and not _event_timestamp_column:
raise ValueError("Timestamp field must be specified for Kafka source")

super().__init__(
event_timestamp_column=_event_timestamp_column,
created_timestamp_column=created_timestamp_column,
Expand Down
Loading