Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement gRPC server to ingest streaming features #3687

Merged
merged 15 commits into from
Sep 7, 2023
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ kill-trino-locally:
cd ${ROOT_DIR}; docker stop trino

install-protoc-dependencies:
pip install --ignore-installed protobuf==4.23.4 grpcio-tools==1.47.0 mypy-protobuf==3.1.0
pip install --ignore-installed protobuf==4.23.4 "grpcio-tools>=1.56.2,<2" mypy-protobuf==3.1.0

install-feast-ci-locally:
pip install -e ".[ci]"
Expand Down
27 changes: 27 additions & 0 deletions protos/feast/serving/GrpcServer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";

message PushRequest {
map<string, string> features = 1;
string stream_feature_view = 2;
bool allow_registry_cache = 3;
string to = 4;
}

message PushResponse {
bool status = 1;
}

message WriteToOnlineStoreRequest {
map<string, string> features = 1;
string feature_view_name = 2;
bool allow_registry_cache = 3;
}

message WriteToOnlineStoreResponse {
bool status = 1;
}

service GrpcFeatureServer {
rpc Push (PushRequest) returns (PushResponse) {};
rpc WriteToOnlineStore (WriteToOnlineStoreRequest) returns (WriteToOnlineStoreResponse);
}
31 changes: 31 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.contrib.grpc_server import get_grpc_server
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import load_repo_config
from feast.repo_operations import (
Expand Down Expand Up @@ -689,6 +690,36 @@ def serve_command(
)


@cli.command("listen")
@click.option(
"--address",
"-a",
type=click.STRING,
default="localhost:50051",
show_default=True,
help="Address of the gRPC server",
)
@click.option(
"--max_workers",
"-w",
type=click.INT,
default=10,
show_default=False,
help="The maximum number of threads that can be used to execute the gRPC calls",
)
@click.pass_context
def listen_command(
ctx: click.Context,
address: str,
max_workers: int,
):
"""Start a gRPC feature server to ingest streaming features on given address"""
store = create_feature_store(ctx)
server = get_grpc_server(address, store, max_workers)
server.start()
server.wait_for_termination()


@cli.command("serve_transformations")
@click.option(
"--port",
Expand Down
95 changes: 95 additions & 0 deletions sdk/python/feast/infra/contrib/grpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
from concurrent import futures
adchia marked this conversation as resolved.
Show resolved Hide resolved

import grpc
import pandas as pd
from grpc_health.v1 import health, health_pb2_grpc

from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.feature_store import FeatureStore
from feast.protos.feast.serving.GrpcServer_pb2 import (
PushResponse,
WriteToOnlineStoreResponse,
)
from feast.protos.feast.serving.GrpcServer_pb2_grpc import (
GrpcFeatureServerServicer,
add_GrpcFeatureServerServicer_to_server,
)


def parse(features):
df = {}
for i in features.keys():
df[i] = [features.get(i)]
return pd.DataFrame.from_dict(df)


class GrpcFeatureServer(GrpcFeatureServerServicer):
fs: FeatureStore

def __init__(self, fs: FeatureStore):
self.fs = fs
super().__init__()

def Push(self, request, context):
try:
df = parse(request.features)
if request.to == "offline":
to = PushMode.OFFLINE
elif request.to == "online":
to = PushMode.ONLINE
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', "
f"'online_and_offline']."
)
self.fs.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)
except PushSourceNotFoundException as e:
logging.exception(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(str(e))
return PushResponse(status=False)
except Exception as e:
logging.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return PushResponse(status=True)

def WriteToOnlineStore(self, request, context):
logging.warning(
"write_to_online_store is deprecated. Please consider using Push instead"
)
try:
df = parse(request.features)
self.fs.write_to_online_store(
feature_view_name=request.feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
logging.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return WriteToOnlineStoreResponse(status=True)


def get_grpc_server(address: str, fs: FeatureStore, max_workers: int):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
add_GrpcFeatureServerServicer_to_server(GrpcFeatureServer(fs), server)
health_servicer = health.HealthServicer(
experimental_non_blocking=True,
experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
)
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
server.add_insecure_port(address)
return server
Loading
Loading