Skip to content

Commit

Permalink
feat: Add http endpoint to the Go feature server
Browse files Browse the repository at this point in the history
Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
Tsotne Tabidze committed May 10, 2022
1 parent cff0133 commit f3367f2
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 6 deletions.
20 changes: 19 additions & 1 deletion go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ func (s *OnlineFeatureService) GetOnlineFeatures(

func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
// TODO(oleksii): enable logging
// Disable logging for now
var loggingService *logging.LoggingService = nil
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
Expand All @@ -243,6 +242,25 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
return nil
}

func (s *OnlineFeatureService) StartHttpServer(host string, port int) error {
// TODO(oleksii): enable logging
var loggingService *logging.LoggingService = nil
ser := server.NewHttpServer(s.fs, loggingService)
log.Printf("Starting a HTTP server on host %s port %d\n", host, port)

go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.grpcStopCh
fmt.Println("Stopping the HTTP server...")
err := ser.Stop()
if err != nil {
fmt.Printf("Error when stopping the HTTP server: %v\n", err)
}
}()

return ser.Serve(host, port)
}

func (s *OnlineFeatureService) Stop() {
s.grpcStopCh <- syscall.SIGINT
}
Expand Down
248 changes: 248 additions & 0 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package server

import (
"context"
"encoding/json"
"fmt"
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"net/http"
)

type httpServer struct {
fs *feast.FeatureStore
loggingService *logging.LoggingService
server *http.Server
}

// Some Feast types aren't supported during JSON conversion
type repeatedValue struct {
stringVal []string
int64Val []int64
doubleVal []float64
boolVal []bool
stringListVal [][]string
int64ListVal [][]int64
doubleListVal [][]float64
boolListVal [][]bool
}

func (u *repeatedValue) UnmarshalJSON(data []byte) error {
isString := false
isDouble := false
isInt64 := false
isArray := false
openBraketCounter := 0
for _, b := range data {
if b == '"' {
isString = true
}
if b == '.' {
isDouble = true
}
if b >= '0' && b <= '9' {
isInt64 = true
}
if b == '[' {
openBraketCounter++
if openBraketCounter > 1 {
isArray = true
}
}
}
var err error
if !isArray {
if isString {
err = json.Unmarshal(data, &u.stringVal)
} else if isDouble {
err = json.Unmarshal(data, &u.doubleVal)
} else if isInt64 {
err = json.Unmarshal(data, &u.int64Val)
} else {
err = json.Unmarshal(data, &u.boolVal)
}
} else {
if isString {
err = json.Unmarshal(data, &u.stringListVal)
} else if isDouble {
err = json.Unmarshal(data, &u.doubleListVal)
} else if isInt64 {
err = json.Unmarshal(data, &u.int64ListVal)
} else {
err = json.Unmarshal(data, &u.boolListVal)
}
}
return err
}

func (u *repeatedValue) ToProto() *prototypes.RepeatedValue {
proto := new(prototypes.RepeatedValue)
if u.stringVal != nil {
for _, val := range u.stringVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_StringVal{StringVal: val}})
}
}
if u.int64Val != nil {
for _, val := range u.int64Val {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_Int64Val{Int64Val: val}})
}
}
if u.doubleVal != nil {
for _, val := range u.doubleVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_DoubleVal{DoubleVal: val}})
}
}
if u.boolVal != nil {
for _, val := range u.boolVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_BoolVal{BoolVal: val}})
}
}
if u.stringListVal != nil {
for _, val := range u.stringListVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_StringListVal{StringListVal: &prototypes.StringList{Val: val}}})
}
}
if u.int64ListVal != nil {
for _, val := range u.int64ListVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_Int64ListVal{Int64ListVal: &prototypes.Int64List{Val: val}}})
}
}
if u.doubleListVal != nil {
for _, val := range u.doubleListVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_DoubleListVal{DoubleListVal: &prototypes.DoubleList{Val: val}}})
}
}
if u.boolListVal != nil {
for _, val := range u.boolListVal {
proto.Val = append(proto.Val, &prototypes.Value{Val: &prototypes.Value_BoolListVal{BoolListVal: &prototypes.BoolList{Val: val}}})
}
}
return proto
}

type getOnlineFeaturesRequest struct {
FeatureService *string `json:"feature_service"`
Features []string `json:"features"`
Entities map[string]repeatedValue `json:"entities"`
FullFeatureNames bool `json:"full_feature_names"`
RequestContext map[string]repeatedValue `json:"request_context"`
}

func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *httpServer {
return &httpServer{fs: fs, loggingService: loggingService}
}

func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.NotFound(w, r)
return
}

decoder := json.NewDecoder(r.Body)
var request getOnlineFeaturesRequest
err := decoder.Decode(&request)
if err != nil {
http.Error(w, fmt.Sprintf("Error decoding JSON request data: %+v", err), http.StatusInternalServerError)
return
}
var featureService *model.FeatureService
if request.FeatureService != nil {
featureService, err = s.fs.GetFeatureService(*request.FeatureService)
if err != nil {
http.Error(w, fmt.Sprintf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError)
}
}
entitiesProto := make(map[string]*prototypes.RepeatedValue)
for key, value := range request.Entities {
entitiesProto[key] = value.ToProto()
}
requestContextProto := make(map[string]*prototypes.RepeatedValue)
for key, value := range request.RequestContext {
requestContextProto[key] = value.ToProto()
}

fmt.Printf("features: %+v\n", request.Features)
for _, feature := range request.Features {
fmt.Printf(" feature %+v (type %T)\n", feature, feature)
}
fmt.Printf("feature_service: %+v\n", featureService)
for key, value := range entitiesProto {
fmt.Printf(" entity %s | repeatedValue %+v\n", key, value)
}
for key, value := range requestContextProto {
fmt.Printf(" requestKey %s | repeatedValue %+v\n", key, value)
}
fmt.Printf("full_feature_names: %+v\n", request.FullFeatureNames)
fmt.Println()

featureVectors, err := s.fs.GetOnlineFeatures(
r.Context(),
request.Features,
featureService,
entitiesProto,
requestContextProto,
request.FullFeatureNames)

if err != nil {
http.Error(w, fmt.Sprintf("Error getting feature vector: %+v", err), http.StatusInternalServerError)
}

fmt.Printf("featureVectors: %+v\n", featureVectors)
var featureNames []string
var results []map[string]interface{}
for _, vector := range featureVectors {
fmt.Printf(" featureVector: %+v\n", *vector)
featureNames = append(featureNames, vector.Name)
result := make(map[string]interface{})
var statuses []string
for _, status := range vector.Statuses {
statuses = append(statuses, status.String())
}
var timestamps []string
for _, timestamp := range vector.Timestamps {
timestamps = append(timestamps, timestamp.String())
}

result["statuses"] = statuses
result["event_timestamps"] = timestamps
// Note, that vector.Values is an Arrow Array, but this type implements JSON Marshaller.
// So, it's not necessary to pre-process it in any way.
result["values"] = vector.Values

results = append(results, result)
}

response := map[string]interface{}{
"metadata": map[string]interface{}{
"feature_names": featureNames,
},
"results": results,
}

err = json.NewEncoder(w).Encode(response)

if err != nil {
http.Error(w, fmt.Sprintf("Error encoding response: %+v", err), http.StatusInternalServerError)
}

w.Header().Set("Content-Type", "application/json")
}

func (s *httpServer) Serve(host string, port int) error {
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: nil}
http.HandleFunc("/get-online-features", s.getOnlineFeatures)
err := s.server.ListenAndServe()
// Don't return the error if it's caused by graceful shutdown using Stop()
if err == http.ErrServerClosed {
return nil
}
return err
}
func (s *httpServer) Stop() error {
if s.server != nil {
return s.server.Shutdown(context.Background())
}
return nil
}
14 changes: 12 additions & 2 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,17 +571,27 @@ def init_command(project_directory, minimal: bool, template: str):
default=6566,
help="Specify a port for the server [default: 6566]",
)
@click.option(
"--type",
"-t",
"type_",
type=click.STRING,
default="http",
help="Specify a server type: 'http' or 'grpc' [default: http]",
)
@click.option(
"--no-access-log", is_flag=True, help="Disable the Uvicorn access log.",
)
@click.pass_context
def serve_command(ctx: click.Context, host: str, port: int, no_access_log: bool):
def serve_command(
ctx: click.Context, host: str, port: int, type_: str, no_access_log: bool
):
"""Start a feature server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

store.serve(host, port, no_access_log)
store.serve(host, port, type_, no_access_log)


@cli.command("serve_transformations")
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ def get_online_features(
resp = record_batch_to_online_response(record_batch)
return OnlineResponse(resp)

def start_http_server(self, host: str, port: int):
self._service.StartHttpServer(host, port)

def start_grpc_server(self, host: str, port: int):
self._service.StartGprcServer(host, port)

def stop_http_server(self):
self._service.Stop()

def stop_grpc_server(self):
self._service.Stop()

Expand Down
17 changes: 14 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1959,14 +1959,25 @@ def _get_feature_views_to_use(
return views_to_use

@log_exceptions_and_usage
def serve(self, host: str, port: int, no_access_log: bool) -> None:
def serve(self, host: str, port: int, type_: str, no_access_log: bool) -> None:
"""Start the feature consumption server locally on a given port."""
type_ = type_.lower()
if self.config.go_feature_retrieval:
# Start go server instead of python if the flag is enabled
self._lazy_init_go_server()
# TODO(tsotne) add http/grpc flag in CLI and call appropriate method here depending on that
self._go_server.start_grpc_server(host, port)
if type_ == "http":
self._go_server.start_http_server(host, port)
elif type_ == "grpc":
self._go_server.start_grpc_server(host, port)
else:
raise ValueError(
f"Unsupported server type '{type_}'. Must be one of 'http' or 'grpc'."
)
else:
if type_ != "http":
raise ValueError(
f"Python server only supports 'http'. Got '{type_}' instead."
)
# Start the python server if go server isn't enabled
feature_server.start_server(self, host, port, no_access_log)

Expand Down

0 comments on commit f3367f2

Please sign in to comment.