From 0a9064e24c12094cd34f86eb984b97edfd13513e Mon Sep 17 00:00:00 2001 From: glindsell Date: Thu, 30 Apr 2020 17:16:35 +0100 Subject: [PATCH 01/13] Add executor changes for HTTP/gRPC multiplexing Signed-off-by: glindsell --- executor/go.mod | 6 +- executor/go.sum | 20 ++++++ executor/main.go | 95 +++++++++++++------------ executor/samples/local/logger/Makefile | 6 +- executor/samples/local/metrics/Makefile | 4 +- 5 files changed, 79 insertions(+), 52 deletions(-) diff --git a/executor/go.mod b/executor/go.mod index 5a240fd081..5a1ca0fbff 100644 --- a/executor/go.mod +++ b/executor/go.mod @@ -4,9 +4,10 @@ go 1.12 require ( github.com/cloudevents/sdk-go v0.10.2 + github.com/fullstorydev/grpcurl v1.5.1 // indirect github.com/ghodss/yaml v1.0.0 github.com/go-logr/logr v0.1.0 - github.com/golang/protobuf v1.3.2 + github.com/golang/protobuf v1.3.5 github.com/google/uuid v1.1.1 github.com/gorilla/mux v1.7.3 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 @@ -17,12 +18,13 @@ require ( github.com/prometheus/client_golang v1.3.0 github.com/prometheus/common v0.7.0 github.com/seldonio/seldon-core/operator v0.0.0-20200401123312-d4c435ea5217 + github.com/soheilhy/cmux v0.1.4 github.com/tensorflow/tensorflow v1.14.0 // indirect github.com/tensorflow/tensorflow/tensorflow/go/core v0.0.0-00010101000000-000000000000 github.com/uber/jaeger-client-go v2.21.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 - google.golang.org/grpc v1.27.0 + google.golang.org/grpc v1.28.0 gotest.tools v2.2.0+incompatible k8s.io/api v0.17.2 k8s.io/apimachinery v0.17.2 diff --git a/executor/go.sum b/executor/go.sum index fcba37e7c1..c440bc5eee 100644 --- a/executor/go.sum +++ b/executor/go.sum @@ -62,6 +62,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudevents/sdk-go v0.10.2 h1:CAqHqDHmBkCG4OUeUBt7q2Ql8KV25U+bgPUtlcJelZ4= github.com/cloudevents/sdk-go v0.10.2/go.mod h1:EHG6NmU3XkIeuueER6+vbnhYfWlgVlfUQVzPC+UK7ao= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -92,7 +94,11 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4 h1:rEvIZUSZ3fx39WIi3JkQqQBitGwpELBIYWeBVh6wn+E= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M= @@ -101,6 +107,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fullstorydev/grpcurl v1.5.1 h1:MyN3rJxuk1etiBeNpXzjSM7GjhUndohhkgnRQ5LlPh8= +github.com/fullstorydev/grpcurl v1.5.1/go.mod h1:gfDWAYMHBN+GjNeO30K9mAfpbYWtWd/bOebWSV0l6kY= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -182,6 +190,9 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -239,6 +250,8 @@ github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jhump/protoreflect v1.5.0 h1:NgpVT+dX71c8hZnxHof2M7QDK7QtohIJ7DYycjnkyfc= +github.com/jhump/protoreflect v1.5.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= @@ -387,6 +400,7 @@ github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAm github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= @@ -470,6 +484,7 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -583,6 +598,7 @@ google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4 google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/genproto v0.0.0-20170818010345-ee236bd376b0/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -595,6 +611,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2El google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190916214212-f660b8655731 h1:Phvl0+G5t5k/EUFUi0wPdUUeTL2HydMQUXHnunWgSb0= google.golang.org/genproto v0.0.0-20190916214212-f660b8655731/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= +google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -603,8 +620,11 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s= google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4= +google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/executor/main.go b/executor/main.go index 043fd26093..1919369ad6 100644 --- a/executor/main.go +++ b/executor/main.go @@ -6,6 +6,15 @@ import ( "encoding/json" "flag" "fmt" + "io/ioutil" + "net" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + "time" + "github.com/ghodss/yaml" "github.com/go-logr/logr" "github.com/prometheus/common/log" @@ -20,16 +29,9 @@ import ( "github.com/seldonio/seldon-core/executor/k8s" loghandler "github.com/seldonio/seldon-core/executor/logger" "github.com/seldonio/seldon-core/executor/proto/tensorflow/serving" - "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "io/ioutil" - "net" - "net/url" - "os" - "os/signal" + v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" + "github.com/soheilhy/cmux" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "strings" - "syscall" - "time" ) var ( @@ -37,11 +39,9 @@ var ( sdepName = flag.String("sdep", "", "Seldon deployment name") namespace = flag.String("namespace", "", "Namespace") predictorName = flag.String("predictor", "", "Name of the predictor inside the SeldonDeployment") - httpPort = flag.Int("http_port", 8080, "Executor port") - grpcPort = flag.Int("grpc_port", 8000, "Executor port") + port = flag.Int("port", 8080, "Executor port") wait = flag.Duration("graceful_timeout", time.Second*15, "Graceful shutdown secs") protocol = flag.String("protocol", "seldon", "The payload protocol") - transport = flag.String("transport", "rest", "The network transport http or grpc") filename = flag.String("file", "", "Load graph from file") hostname = flag.String("hostname", "localhost", "The hostname of the running server") logWorkers = flag.Int("logger_workers", 5, "Number of workers handling payload logging") @@ -92,7 +92,7 @@ func getServerUrl(hostname string, port int) (*url.URL, error) { return url.Parse(fmt.Sprintf("http://%s:%d/", hostname, port)) } -func runHttpServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, +func runHttpServer(lis net.Listener, logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, probesOnly bool, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) { // Create REST API @@ -101,7 +101,8 @@ func runHttpServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldo srv := seldonRest.CreateHttpServer(port) go func() { - if err := srv.ListenAndServe(); err != nil { + logger.Info("server started") + if err := srv.Serve(lis); err != nil { logger.Error(err, "Server error") } }() @@ -129,11 +130,7 @@ func runHttpServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldo } -func runGrpcServer(logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, serverUrl *url.URL, namespace string, protocol string, deploymentName string, annotations map[string]string) { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } +func runGrpcServer(lis net.Listener, logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, serverUrl *url.URL, namespace string, protocol string, deploymentName string, annotations map[string]string) { grpcServer, err := grpc.CreateGrpcServer(predictor, deploymentName, annotations, logger) if err != nil { log.Fatalf("Failed to create grpc server: %v", err) @@ -171,20 +168,14 @@ func main() { log.Fatal("Protocol must be seldon or tensorflow") } - if !(*transport == "rest" || *transport == "grpc") { - log.Fatal("Only rest and grpc supported") - } - - serverUrl, err := getServerUrl(*hostname, *httpPort) + serverUrl, err := getServerUrl(*hostname, *port) if err != nil { - log.Fatal("Failed to create server url from", *hostname, *httpPort) + log.Fatal("Failed to create server url from", *hostname, *port) } logf.SetLogger(logf.ZapLogger(false)) logger := logf.Log.WithName("entrypoint") - logger.Info("Flags", "transport", *transport) - var predictor *v1.PredictorSpec if *filename != "" { logger.Info("Trying to get predictor from file") @@ -225,25 +216,39 @@ func main() { } defer closer.Close() - if *transport == "rest" { - clientRest, err := rest.NewJSONRestClient(*protocol, *sdepName, predictor, annotations) - if err != nil { - log.Fatalf("Failed to create http client: %v", err) - } - logger.Info("Running http server ", "port", *httpPort) - runHttpServer(logger, predictor, clientRest, *httpPort, false, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) - } else { - logger.Info("Running http probes only server ", "port", *httpPort) - go runHttpServer(logger, predictor, nil, *httpPort, true, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) - logger.Info("Running grpc server ", "port", *grpcPort) - var clientGrpc seldonclient.SeldonApiClient - if *protocol == "seldon" { - clientGrpc = seldon.NewSeldonGrpcClient(predictor, *sdepName, annotations) - } else { - clientGrpc = tensorflow.NewTensorflowGrpcClient(predictor, *sdepName, annotations) - } - runGrpcServer(logger, predictor, clientGrpc, *grpcPort, serverUrl, *namespace, *protocol, *sdepName, annotations) + // Create a listener at the desired port. + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) + if err != nil { + log.Fatalf("failed to create listener: %v", err) + } + + // Create a cmux object. + tcpm := cmux.New(lis) + // Declare the match for different services required. + httpl := tcpm.Match(cmux.HTTP1Fast()) + grpcl := tcpm.MatchWithWriters( + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + + clientRest, err := rest.NewJSONRestClient(*protocol, *sdepName, predictor, annotations) + if err != nil { + log.Fatalf("Failed to create http client: %v", err) } + logger.Info("Running HTTP server ", "port", *port) + go runHttpServer(httpl, logger, predictor, clientRest, *port, false, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) + var clientGrpc seldonclient.SeldonApiClient + if *protocol == "seldon" { + clientGrpc = seldon.NewSeldonGrpcClient(predictor, *sdepName, annotations) + } else { + clientGrpc = tensorflow.NewTensorflowGrpcClient(predictor, *sdepName, annotations) + } + logger.Info("Running gRPC server ", "port", *port) + go runGrpcServer(grpcl, logger, predictor, clientGrpc, *port, serverUrl, *namespace, *protocol, *sdepName, annotations) + + // Start cmux serving. + if err := tcpm.Serve(); !strings.Contains(err.Error(), + "use of closed network connection") { + log.Fatal(err) + } } diff --git a/executor/samples/local/logger/Makefile b/executor/samples/local/logger/Makefile index cf36309844..281166e4ca 100644 --- a/executor/samples/local/logger/Makefile +++ b/executor/samples/local/logger/Makefile @@ -3,7 +3,7 @@ BASE=../../.. ## REST run_rest_executor: - ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_rest.yaml --http_port 8000 + ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_rest.yaml --port 8000 run_dummy_rest_model: @@ -23,12 +23,12 @@ curl_rest_big: ## GRPC run_grpc_executor: - ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_grpc.yaml --http_port 8000 --grpc_port 5000 --transport grpc + ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_grpc.yaml --port 8000 run_dummy_grpc_model: cd ${BASE}/../examples/models/mean_classifier && make run_grpc_local grpc_test: - cd ${BASE}/proto && grpcurl -d '{"data":{"ndarray":[[1.0,2.0]]}}' -plaintext -proto ./prediction.proto 0.0.0.0:5000 seldon.protos.Seldon/Predict + cd ${BASE}/proto && grpcurl -d '{"data":{"ndarray":[[1.0,2.0]]}}' -plaintext -proto ./prediction.proto 0.0.0.0:8000 seldon.protos.Seldon/Predict diff --git a/executor/samples/local/metrics/Makefile b/executor/samples/local/metrics/Makefile index c9678ac241..36dd82bda0 100644 --- a/executor/samples/local/metrics/Makefile +++ b/executor/samples/local/metrics/Makefile @@ -3,7 +3,7 @@ BASE=../../.. ## REST run_rest_executor: - ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_rest.yaml --http_port 8000 + ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_rest.yaml --port 8000 run_dummy_rest_model: @@ -16,7 +16,7 @@ curl_rest: ## GRPC run_grpc_executor: - ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_grpc.yaml --http_port 8000 --grpc_port 5000 --transport grpc + ${BASE}/executor --sdep seldon-model --namespace default --predictor example --file ./model_grpc.yaml --port 8000 run_dummy_grpc_model: cd ${BASE}/../examples/models/mean_classifier && make run_grpc_local From 2e766a264d6e23d41508dda085333e1950c5b10c Mon Sep 17 00:00:00 2001 From: glindsell Date: Fri, 1 May 2020 14:55:08 +0100 Subject: [PATCH 02/13] Remove gRPC port for executor container creation in operator Signed-off-by: glindsell --- .gitignore | 3 + .../controllers/seldondeployment_engine.go | 56 ++++++------------- .../seldondeployment_engine_test.go | 9 +-- 3 files changed, 26 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index be0aafb97e..0284c2f08f 100644 --- a/.gitignore +++ b/.gitignore @@ -211,3 +211,6 @@ examples/models/metrics/metrics.py examples/models/custom_metrics/customMetrics.py examples/models/tracing/tracing.py examples/models/autoscaling/autoscaling_example.py + +# testing +*.coverprofile diff --git a/operator/controllers/seldondeployment_engine.go b/operator/controllers/seldondeployment_engine.go index 591c3a4710..0c344aa455 100644 --- a/operator/controllers/seldondeployment_engine.go +++ b/operator/controllers/seldondeployment_engine.go @@ -18,6 +18,10 @@ package controllers import ( "fmt" + "os" + "strconv" + "strings" + machinelearningv1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" "github.com/seldonio/seldon-core/operator/constants" appsv1 "k8s.io/api/apps/v1" @@ -25,22 +29,17 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "os" - "strconv" - "strings" ) const ( - ENV_DEFAULT_EXECUTOR_SERVER_PORT = "EXECUTOR_SERVER_PORT" - ENV_DEFAULT_EXECUTOR_SERVER_GRPC_PORT = "EXECUTOR_SERVER_GRPC_PORT" - ENV_EXECUTOR_PROMETHEUS_PATH = "EXECUTOR_PROMETHEUS_PATH" - ENV_ENGINE_PROMETHEUS_PATH = "ENGINE_PROMETHEUS_PATH" - ENV_EXECUTOR_USER = "EXECUTOR_CONTAINER_USER" - ENV_ENGINE_USER = "ENGINE_CONTAINER_USER" - ENV_USE_EXECUTOR = "USE_EXECUTOR" + ENV_DEFAULT_EXECUTOR_SERVER_PORT = "EXECUTOR_SERVER_PORT" + ENV_EXECUTOR_PROMETHEUS_PATH = "EXECUTOR_PROMETHEUS_PATH" + ENV_ENGINE_PROMETHEUS_PATH = "ENGINE_PROMETHEUS_PATH" + ENV_EXECUTOR_USER = "EXECUTOR_CONTAINER_USER" + ENV_ENGINE_USER = "ENGINE_CONTAINER_USER" + ENV_USE_EXECUTOR = "USE_EXECUTOR" DEFAULT_EXECUTOR_CONTAINER_PORT = 8000 - DEFAULT_EXECUTOR_GRPC_PORT = 5001 ENV_EXECUTOR_IMAGE = "EXECUTOR_CONTAINER_IMAGE_AND_VERSION" ENV_EXECUTOR_IMAGE_RELATED = "RELATED_IMAGE_EXECUTOR" //RedHat specific @@ -131,19 +130,6 @@ func getExecutorHttpPort() (engine_http_port int, err error) { return engine_http_port, nil } -func getExecutorGrpcPort() (engine_grpc_port int, err error) { - // Get engine grpc port from environment or use default - engine_grpc_port = DEFAULT_EXECUTOR_GRPC_PORT - var env_engine_grpc_port = GetEnv(ENV_DEFAULT_EXECUTOR_SERVER_GRPC_PORT, "") - if env_engine_grpc_port != "" { - engine_grpc_port, err = strconv.Atoi(env_engine_grpc_port) - if err != nil { - return 0, err - } - } - return engine_grpc_port, nil -} - func isExecutorEnabled(mlDep *machinelearningv1.SeldonDeployment) bool { useExecutor := getAnnotation(mlDep, machinelearningv1.ANNOTATION_EXECUTOR, "false") return useExecutor == "true" || envUseExecutor == "true" @@ -200,7 +186,7 @@ func getSvcOrchUser(mlDep *machinelearningv1.SeldonDeployment) (*int64, error) { return nil, nil } -func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machinelearningv1.PredictorSpec, predictorB64 string, http_port int, grpc_port int, resources *corev1.ResourceRequirements) (*corev1.Container, error) { +func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machinelearningv1.PredictorSpec, predictorB64 string, port int, resources *corev1.ResourceRequirements) (*corev1.Container, error) { transport := mlDep.Spec.Transport //Backwards compatible with older resources if transport == "" { @@ -231,8 +217,7 @@ func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machi "--sdep", mlDep.Name, "--namespace", mlDep.Namespace, "--predictor", p.Name, - "--http_port", strconv.Itoa(http_port), - "--grpc_port", strconv.Itoa(grpc_port), + "--port", strconv.Itoa(port), "--transport", string(transport), "--protocol", string(protocol), "--prometheus_path", getPrometheusPath(mlDep), @@ -251,17 +236,16 @@ func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machi {Name: "REQUEST_LOGGER_DEFAULT_ENDPOINT_PREFIX", Value: GetEnv("EXECUTOR_REQUEST_LOGGER_DEFAULT_ENDPOINT_PREFIX", "")}, }, Ports: []corev1.ContainerPort{ - {ContainerPort: int32(http_port), Protocol: corev1.ProtocolTCP}, - {ContainerPort: int32(http_port), Protocol: corev1.ProtocolTCP, Name: constants.MetricsPortName}, - {ContainerPort: int32(grpc_port), Protocol: corev1.ProtocolTCP}, + {ContainerPort: int32(port), Protocol: corev1.ProtocolTCP}, + {ContainerPort: int32(port), Protocol: corev1.ProtocolTCP, Name: constants.MetricsPortName}, }, - ReadinessProbe: &corev1.Probe{Handler: corev1.Handler{HTTPGet: &corev1.HTTPGetAction{Port: intstr.FromInt(http_port), Path: "/ready", Scheme: corev1.URISchemeHTTP}}, + ReadinessProbe: &corev1.Probe{Handler: corev1.Handler{HTTPGet: &corev1.HTTPGetAction{Port: intstr.FromInt(port), Path: "/ready", Scheme: corev1.URISchemeHTTP}}, InitialDelaySeconds: 20, PeriodSeconds: 5, FailureThreshold: 3, SuccessThreshold: 1, TimeoutSeconds: 60}, - LivenessProbe: &corev1.Probe{Handler: corev1.Handler{HTTPGet: &corev1.HTTPGetAction{Port: intstr.FromInt(http_port), Path: "/live", Scheme: corev1.URISchemeHTTP}}, + LivenessProbe: &corev1.Probe{Handler: corev1.Handler{HTTPGet: &corev1.HTTPGetAction{Port: intstr.FromInt(port), Path: "/live", Scheme: corev1.URISchemeHTTP}}, InitialDelaySeconds: 20, PeriodSeconds: 5, FailureThreshold: 3, @@ -358,15 +342,11 @@ func createEngineContainer(mlDep *machinelearningv1.SeldonDeployment, p *machine var c *corev1.Container if isExecutorEnabled(mlDep) { - executor_http_port, err := getExecutorHttpPort() - if err != nil { - return nil, err - } - executor_grpc_port, err := getExecutorGrpcPort() + executor_port, err := getExecutorHttpPort() if err != nil { return nil, err } - c, err = createExecutorContainer(mlDep, p, predictorB64, executor_http_port, executor_grpc_port, engineResources) + c, err = createExecutorContainer(mlDep, p, predictorB64, executor_port, engineResources) if err != nil { return nil, err } diff --git a/operator/controllers/seldondeployment_engine_test.go b/operator/controllers/seldondeployment_engine_test.go index 5bf20b9b58..dda8b38f2a 100644 --- a/operator/controllers/seldondeployment_engine_test.go +++ b/operator/controllers/seldondeployment_engine_test.go @@ -1,12 +1,13 @@ package controllers import ( + "testing" + . "github.com/onsi/gomega" machinelearningv1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "testing" ) func createTestSeldonDeployment() *machinelearningv1.SeldonDeployment { @@ -64,7 +65,7 @@ func TestExecutorCreateNoEnv(t *testing.T) { envExecutorImage = "" envExecutorImageRelated = "" mlDep := createTestSeldonDeployment() - _, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, 2, &v1.ResourceRequirements{}) + _, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, &v1.ResourceRequirements{}) g.Expect(err).ToNot(BeNil()) cleanEnvImages() } @@ -76,7 +77,7 @@ func TestExecutorCreateEnv(t *testing.T) { envExecutorImage = imageName envExecutorImageRelated = "" mlDep := createTestSeldonDeployment() - con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, 2, &v1.ResourceRequirements{}) + con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, &v1.ResourceRequirements{}) g.Expect(err).To(BeNil()) g.Expect(con.Image).To(Equal(imageName)) cleanEnvImages() @@ -90,7 +91,7 @@ func TestExecutorCreateEnvRelated(t *testing.T) { envExecutorImage = imageName envExecutorImageRelated = imageNameRelated mlDep := createTestSeldonDeployment() - con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, 2, &v1.ResourceRequirements{}) + con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, &v1.ResourceRequirements{}) g.Expect(err).To(BeNil()) g.Expect(con.Image).To(Equal(imageNameRelated)) cleanEnvImages() From 2b7f3989004c7e39c3620adffc6742eef4c6a164 Mon Sep 17 00:00:00 2001 From: glindsell Date: Wed, 6 May 2020 09:56:56 +0100 Subject: [PATCH 03/13] Close executor listener Signed-off-by: glindsell --- executor/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/main.go b/executor/main.go index 1919369ad6..968b4e05ba 100644 --- a/executor/main.go +++ b/executor/main.go @@ -221,6 +221,7 @@ func main() { if err != nil { log.Fatalf("failed to create listener: %v", err) } + defer lis.Close() // Create a cmux object. tcpm := cmux.New(lis) From 3c474ad0e1166416fdb920d9c915bece44b23746 Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 11 May 2020 15:03:48 +0100 Subject: [PATCH 04/13] Remove transport flag from operator controller Signed-off-by: glindsell --- .gitignore | 1 + operator/controllers/seldondeployment_engine.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 0284c2f08f..40fd440a0f 100644 --- a/.gitignore +++ b/.gitignore @@ -214,3 +214,4 @@ examples/models/autoscaling/autoscaling_example.py # testing *.coverprofile +*.log diff --git a/operator/controllers/seldondeployment_engine.go b/operator/controllers/seldondeployment_engine.go index 0c344aa455..704eb43771 100644 --- a/operator/controllers/seldondeployment_engine.go +++ b/operator/controllers/seldondeployment_engine.go @@ -218,7 +218,6 @@ func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machi "--namespace", mlDep.Namespace, "--predictor", p.Name, "--port", strconv.Itoa(port), - "--transport", string(transport), "--protocol", string(protocol), "--prometheus_path", getPrometheusPath(mlDep), }, From 7524b4aeb007739fa71b20172f3336ad9ecaec59 Mon Sep 17 00:00:00 2001 From: glindsell Date: Wed, 13 May 2020 10:53:13 +0100 Subject: [PATCH 05/13] Revert changes in Seldon Deployment Engine Signed-off-by: glindsell --- .../controllers/seldondeployment_engine.go | 51 ++++++++++++++----- .../seldondeployment_engine_test.go | 6 +-- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/operator/controllers/seldondeployment_engine.go b/operator/controllers/seldondeployment_engine.go index 704eb43771..054da33e4e 100644 --- a/operator/controllers/seldondeployment_engine.go +++ b/operator/controllers/seldondeployment_engine.go @@ -32,14 +32,16 @@ import ( ) const ( - ENV_DEFAULT_EXECUTOR_SERVER_PORT = "EXECUTOR_SERVER_PORT" - ENV_EXECUTOR_PROMETHEUS_PATH = "EXECUTOR_PROMETHEUS_PATH" - ENV_ENGINE_PROMETHEUS_PATH = "ENGINE_PROMETHEUS_PATH" - ENV_EXECUTOR_USER = "EXECUTOR_CONTAINER_USER" - ENV_ENGINE_USER = "ENGINE_CONTAINER_USER" - ENV_USE_EXECUTOR = "USE_EXECUTOR" + ENV_DEFAULT_EXECUTOR_SERVER_PORT = "EXECUTOR_SERVER_PORT" + ENV_DEFAULT_EXECUTOR_SERVER_GRPC_PORT = "EXECUTOR_SERVER_GRPC_PORT" + ENV_EXECUTOR_PROMETHEUS_PATH = "EXECUTOR_PROMETHEUS_PATH" + ENV_ENGINE_PROMETHEUS_PATH = "ENGINE_PROMETHEUS_PATH" + ENV_EXECUTOR_USER = "EXECUTOR_CONTAINER_USER" + ENV_ENGINE_USER = "ENGINE_CONTAINER_USER" + ENV_USE_EXECUTOR = "USE_EXECUTOR" DEFAULT_EXECUTOR_CONTAINER_PORT = 8000 + DEFAULT_EXECUTOR_GRPC_PORT = 5001 ENV_EXECUTOR_IMAGE = "EXECUTOR_CONTAINER_IMAGE_AND_VERSION" ENV_EXECUTOR_IMAGE_RELATED = "RELATED_IMAGE_EXECUTOR" //RedHat specific @@ -130,6 +132,19 @@ func getExecutorHttpPort() (engine_http_port int, err error) { return engine_http_port, nil } +func getExecutorGrpcPort() (engine_grpc_port int, err error) { + // Get engine grpc port from environment or use default + engine_grpc_port = DEFAULT_EXECUTOR_GRPC_PORT + var env_engine_grpc_port = GetEnv(ENV_DEFAULT_EXECUTOR_SERVER_GRPC_PORT, "") + if env_engine_grpc_port != "" { + engine_grpc_port, err = strconv.Atoi(env_engine_grpc_port) + if err != nil { + return 0, err + } + } + return engine_grpc_port, nil +} + func isExecutorEnabled(mlDep *machinelearningv1.SeldonDeployment) bool { useExecutor := getAnnotation(mlDep, machinelearningv1.ANNOTATION_EXECUTOR, "false") return useExecutor == "true" || envUseExecutor == "true" @@ -186,14 +201,20 @@ func getSvcOrchUser(mlDep *machinelearningv1.SeldonDeployment) (*int64, error) { return nil, nil } -func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machinelearningv1.PredictorSpec, predictorB64 string, port int, resources *corev1.ResourceRequirements) (*corev1.Container, error) { +func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machinelearningv1.PredictorSpec, predictorB64 string, http_port int, grpc_port int, resources *corev1.ResourceRequirements) (*corev1.Container, error) { + port := 0 transport := mlDep.Spec.Transport - //Backwards compatible with older resources - if transport == "" { + switch transport { + case machinelearningv1.TransportGrpc: + port = grpc_port + case machinelearningv1.TransportRest: + port = http_port + default: + //Backwards compatible with older resources if p.Graph.Endpoint.Type == machinelearningv1.GRPC { - transport = machinelearningv1.TransportGrpc + port = grpc_port } else { - transport = machinelearningv1.TransportRest + port = http_port } } protocol := mlDep.Spec.Protocol @@ -341,11 +362,15 @@ func createEngineContainer(mlDep *machinelearningv1.SeldonDeployment, p *machine var c *corev1.Container if isExecutorEnabled(mlDep) { - executor_port, err := getExecutorHttpPort() + executor_http_port, err := getExecutorHttpPort() + if err != nil { + return nil, err + } + executor_grpc_port, err := getExecutorGrpcPort() if err != nil { return nil, err } - c, err = createExecutorContainer(mlDep, p, predictorB64, executor_port, engineResources) + c, err = createExecutorContainer(mlDep, p, predictorB64, executor_http_port, executor_grpc_port, engineResources) if err != nil { return nil, err } diff --git a/operator/controllers/seldondeployment_engine_test.go b/operator/controllers/seldondeployment_engine_test.go index dda8b38f2a..935a937466 100644 --- a/operator/controllers/seldondeployment_engine_test.go +++ b/operator/controllers/seldondeployment_engine_test.go @@ -65,7 +65,7 @@ func TestExecutorCreateNoEnv(t *testing.T) { envExecutorImage = "" envExecutorImageRelated = "" mlDep := createTestSeldonDeployment() - _, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, &v1.ResourceRequirements{}) + _, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, 2, &v1.ResourceRequirements{}) g.Expect(err).ToNot(BeNil()) cleanEnvImages() } @@ -77,7 +77,7 @@ func TestExecutorCreateEnv(t *testing.T) { envExecutorImage = imageName envExecutorImageRelated = "" mlDep := createTestSeldonDeployment() - con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, &v1.ResourceRequirements{}) + con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, 2, &v1.ResourceRequirements{}) g.Expect(err).To(BeNil()) g.Expect(con.Image).To(Equal(imageName)) cleanEnvImages() @@ -91,7 +91,7 @@ func TestExecutorCreateEnvRelated(t *testing.T) { envExecutorImage = imageName envExecutorImageRelated = imageNameRelated mlDep := createTestSeldonDeployment() - con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, &v1.ResourceRequirements{}) + con, err := createExecutorContainer(mlDep, &mlDep.Spec.Predictors[0], "", 1, 2, &v1.ResourceRequirements{}) g.Expect(err).To(BeNil()) g.Expect(con.Image).To(Equal(imageNameRelated)) cleanEnvImages() From 21a0cff9dcb60ba1ea4d6c87ec90bdecb27a646b Mon Sep 17 00:00:00 2001 From: glindsell Date: Wed, 13 May 2020 14:35:26 +0100 Subject: [PATCH 06/13] Add Metrics Port Name removed by merge Signed-off-by: glindsell --- operator/controllers/seldondeployment_engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator/controllers/seldondeployment_engine.go b/operator/controllers/seldondeployment_engine.go index 6ba865ff70..ae56857985 100644 --- a/operator/controllers/seldondeployment_engine.go +++ b/operator/controllers/seldondeployment_engine.go @@ -260,7 +260,7 @@ func createExecutorContainer(mlDep *machinelearningv1.SeldonDeployment, p *machi }, Ports: []corev1.ContainerPort{ {ContainerPort: int32(port), Protocol: corev1.ProtocolTCP}, - {ContainerPort: int32(port), Protocol: corev1.ProtocolTCP, Name: constants.MetricsPortName}, + {ContainerPort: int32(port), Protocol: corev1.ProtocolTCP, Name: executorMetricsPortName}, }, ReadinessProbe: &corev1.Probe{Handler: corev1.Handler{HTTPGet: &corev1.HTTPGetAction{Port: intstr.FromInt(port), Path: "/ready", Scheme: corev1.URISchemeHTTP}}, InitialDelaySeconds: 20, From 144659b47992dd2c2e455ee32c3f4274838c8036 Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 15:06:33 +0100 Subject: [PATCH 07/13] remove ProbesOnly from SeldonRestApi Signed-off-by: glindsell --- executor/api/rest/server.go | 17 +++++++---------- executor/main.go | 4 ++-- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/executor/api/rest/server.go b/executor/api/rest/server.go index 5eef51aaf7..57801fa5b9 100644 --- a/executor/api/rest/server.go +++ b/executor/api/rest/server.go @@ -3,6 +3,10 @@ package rest import ( "context" "fmt" + "io/ioutil" + "net/http" + "net/url" + "github.com/go-logr/logr" guuid "github.com/google/uuid" "github.com/gorilla/mux" @@ -15,10 +19,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/metric" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/predictor" - "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "io/ioutil" - "net/http" - "net/url" + v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" ) @@ -36,7 +37,6 @@ type SeldonRestApi struct { Client client.SeldonApiClient predictor *v1.PredictorSpec Log logr.Logger - ProbesOnly bool ServerUrl *url.URL Namespace string Protocol string @@ -45,17 +45,14 @@ type SeldonRestApi struct { prometheusPath string } -func NewServerRestApi(predictor *v1.PredictorSpec, client client.SeldonApiClient, probesOnly bool, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) *SeldonRestApi { +func NewServerRestApi(predictor *v1.PredictorSpec, client client.SeldonApiClient, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) *SeldonRestApi { var serverMetrics *metric.ServerMetrics - if !probesOnly { - serverMetrics = metric.NewServerMetrics(predictor, deploymentName) - } + serverMetrics = metric.NewServerMetrics(predictor, deploymentName) return &SeldonRestApi{ mux.NewRouter(), client, predictor, logf.Log.WithName("SeldonRestApi"), - probesOnly, serverUrl, namespace, protocol, diff --git a/executor/main.go b/executor/main.go index 36ea50bb83..a8ac8a2fe9 100644 --- a/executor/main.go +++ b/executor/main.go @@ -93,10 +93,10 @@ func getServerUrl(hostname string, port int) (*url.URL, error) { } func runHttpServer(lis net.Listener, logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, - probesOnly bool, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) { + serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) { // Create REST API - seldonRest := rest.NewServerRestApi(predictor, client, probesOnly, serverUrl, namespace, protocol, deploymentName, prometheusPath) + seldonRest := rest.NewServerRestApi(predictor, client, serverUrl, namespace, protocol, deploymentName, prometheusPath) seldonRest.Initialise() srv := seldonRest.CreateHttpServer(port) From 6c1bc89a89cc1232b4fc77e8e0fd6e1e6f68cd9d Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 15:54:13 +0100 Subject: [PATCH 08/13] Remove probesOnly from Executor tests Signed-off-by: glindsell --- executor/api/rest/server.go | 48 +++++++++++++++----------------- executor/api/rest/server_test.go | 45 +++++++++++++++--------------- executor/main.go | 2 +- 3 files changed, 47 insertions(+), 48 deletions(-) diff --git a/executor/api/rest/server.go b/executor/api/rest/server.go index 57801fa5b9..0db2240934 100644 --- a/executor/api/rest/server.go +++ b/executor/api/rest/server.go @@ -127,31 +127,29 @@ func (r *SeldonRestApi) Initialise() { r.Router.HandleFunc("/ready", r.checkReady) r.Router.HandleFunc("/live", r.alive) r.Router.Handle(r.prometheusPath, promhttp.Handler()) - if !r.ProbesOnly { - cloudeventHeaderMiddleware := CloudeventHeaderMiddleware{deploymentName: r.DeploymentName, namespace: r.Namespace} - r.Router.Use(puidHeader) - r.Router.Use(cloudeventHeaderMiddleware.Middleware) - switch r.Protocol { - case api.ProtocolSeldon: - //v0.1 API - api01 := r.Router.PathPrefix("/api/v0.1").Methods("POST").Subrouter() - api01.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) - r.Router.NewRoute().Path("/api/v0.1/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) - r.Router.NewRoute().PathPrefix("/api/v0.1/doc/").Handler(http.StripPrefix("/api/v0.1/doc/", http.FileServer(http.Dir("./openapi/")))) - //v1.0 API - api1 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter() - api1.Handle("/predictions", r.wrapMetrics(metric.PredictionServiceMetricName, r.predictions)) - r.Router.NewRoute().Path("/api/v1.0/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) - r.Router.NewRoute().PathPrefix("/api/v1.0/doc/").Handler(http.StripPrefix("/api/v1.0/doc/", http.FileServer(http.Dir("./openapi/")))) - - case api.ProtocolTensorflow: - r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) - r.Router.NewRoute().Path("/v1/models/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) // Nonstandard path - Seldon extension - r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/metadata").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) - } + cloudeventHeaderMiddleware := CloudeventHeaderMiddleware{deploymentName: r.DeploymentName, namespace: r.Namespace} + r.Router.Use(puidHeader) + r.Router.Use(cloudeventHeaderMiddleware.Middleware) + switch r.Protocol { + case api.ProtocolSeldon: + //v0.1 API + api01 := r.Router.PathPrefix("/api/v0.1").Methods("POST").Subrouter() + api01.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) + r.Router.NewRoute().Path("/api/v0.1/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) + r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) + r.Router.NewRoute().PathPrefix("/api/v0.1/doc/").Handler(http.StripPrefix("/api/v0.1/doc/", http.FileServer(http.Dir("./openapi/")))) + //v1.0 API + api1 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter() + api1.Handle("/predictions", r.wrapMetrics(metric.PredictionServiceMetricName, r.predictions)) + r.Router.NewRoute().Path("/api/v1.0/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) + r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) + r.Router.NewRoute().PathPrefix("/api/v1.0/doc/").Handler(http.StripPrefix("/api/v1.0/doc/", http.FileServer(http.Dir("./openapi/")))) + + case api.ProtocolTensorflow: + r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) + r.Router.NewRoute().Path("/v1/models/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) // Nonstandard path - Seldon extension + r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) + r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/metadata").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) } } diff --git a/executor/api/rest/server_test.go b/executor/api/rest/server_test.go index 9e364814b4..4671dfcdaa 100644 --- a/executor/api/rest/server_test.go +++ b/executor/api/rest/server_test.go @@ -1,6 +1,14 @@ package rest import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + "testing" + guuid "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/prometheus/common/expfmt" @@ -9,13 +17,6 @@ import ( "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/test" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "io/ioutil" - "net/http" - "net/http/httptest" - "net/url" - "strconv" - "strings" - "testing" ) const ( @@ -27,7 +28,7 @@ func TestAliveEndpoint(t *testing.T) { g := NewGomegaWithT(t) url, _ := url.Parse("http://localhost") - r := NewServerRestApi(nil, nil, true, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(nil, nil, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/live", nil) @@ -55,7 +56,7 @@ func TestSimpleModel(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -87,7 +88,7 @@ func TestCloudeventHeaderIsSet(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -126,7 +127,7 @@ func TestCloudeventHeaderIsNotSet(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -160,7 +161,7 @@ func TestReponsePuuidHeaderIsSet(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -209,7 +210,7 @@ func TestRequestPuuidHeaderIsSet(t *testing.T) { client, err := NewJSONRestClient(api.ProtocolSeldon, "dep", &p, nil) g.Expect(err).To(BeNil()) - r := NewServerRestApi(&p, client, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, client, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -256,7 +257,7 @@ func TestModelWithServer(t *testing.T) { client, err := NewJSONRestClient(api.ProtocolSeldon, "dep", &p, nil) g.Expect(err).To(BeNil()) - r := NewServerRestApi(&p, client, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, client, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -287,7 +288,7 @@ func TestServerMetrics(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -328,7 +329,7 @@ func TestTensorflowStatus(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/v1/models/mymodel", nil) @@ -357,7 +358,7 @@ func TestSeldonStatus(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/api/v1.0/status/mymodel", nil) @@ -386,7 +387,7 @@ func TestSeldonMetadata(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/api/v1.0/metadata/mymodel", nil) @@ -415,7 +416,7 @@ func TestTensorflowMetadata(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/v1/models/mymodel/metadata", nil) @@ -461,7 +462,7 @@ func TestPredictErrorWithServer(t *testing.T) { } client, err := NewJSONRestClient(api.ProtocolSeldon, "dep", &p, nil) g.Expect(err).Should(BeNil()) - r := NewServerRestApi(&p, client, false, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, client, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -496,7 +497,7 @@ func TestTensorflowModel(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() var data = ` {"instances":[[1,2,3]]}` @@ -526,7 +527,7 @@ func TestTensorflowModelBadModelName(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() var data = ` {"instances":[[1,2,3]]}` diff --git a/executor/main.go b/executor/main.go index a8ac8a2fe9..7e4da5a37d 100644 --- a/executor/main.go +++ b/executor/main.go @@ -242,7 +242,7 @@ func main() { log.Fatalf("Failed to create http client: %v", err) } logger.Info("Running HTTP server ", "port", *port) - go runHttpServer(httpl, logger, predictor, clientRest, *port, false, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) + go runHttpServer(httpl, logger, predictor, clientRest, *port, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) var clientGrpc seldonclient.SeldonApiClient if *protocol == "seldon" { From 5a01b78527137e13d0654a01ba97db6cab608eeb Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 16:20:11 +0100 Subject: [PATCH 09/13] Revert "fix after rebase" This reverts commit dc6e7ea222afaaf861c9b960247c70842a590076. --- .../v1/seldondeployment_webhook_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go b/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go index 2aa41fd30c..2c1de09b86 100644 --- a/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go +++ b/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go @@ -399,7 +399,7 @@ func TestOverrideMetricsPortName(t *testing.T) { Predictors: []PredictorSpec{ { Name: "p1", - Graph: PredictiveUnit{ + Graph: &PredictiveUnit{ Name: "classifier", Implementation: &impl, }, @@ -418,7 +418,7 @@ func TestOverrideMetricsPortName(t *testing.T) { g.Expect(defaultMetricsPort).To(BeNil()) // Graph - pu := GetPredictiveUnit(&spec.Predictors[0].Graph, "classifier") + pu := GetPredictiveUnit(spec.Predictors[0].Graph, "classifier") g.Expect(pu).ToNot(BeNil()) g.Expect(pu.Endpoint.Type).To(Equal(REST)) g.Expect(*pu.Type).To(Equal(MODEL)) @@ -933,4 +933,4 @@ func TestPredictorNoGraph(t *testing.T) { spec.DefaultSeldonDeployment("mydep", "default") err := spec.ValidateSeldonDeployment() g.Expect(err).ToNot(BeNil()) -} +} \ No newline at end of file From 5abd007984fbd46807cfc7392ebb6dbdd4c1fce5 Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 16:27:13 +0100 Subject: [PATCH 10/13] Undo revert Signed-off-by: glindsell --- .../v1/seldondeployment_webhook_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go b/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go index 2c1de09b86..2aa41fd30c 100644 --- a/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go +++ b/operator/apis/machinelearning.seldon.io/v1/seldondeployment_webhook_test.go @@ -399,7 +399,7 @@ func TestOverrideMetricsPortName(t *testing.T) { Predictors: []PredictorSpec{ { Name: "p1", - Graph: &PredictiveUnit{ + Graph: PredictiveUnit{ Name: "classifier", Implementation: &impl, }, @@ -418,7 +418,7 @@ func TestOverrideMetricsPortName(t *testing.T) { g.Expect(defaultMetricsPort).To(BeNil()) // Graph - pu := GetPredictiveUnit(spec.Predictors[0].Graph, "classifier") + pu := GetPredictiveUnit(&spec.Predictors[0].Graph, "classifier") g.Expect(pu).ToNot(BeNil()) g.Expect(pu.Endpoint.Type).To(Equal(REST)) g.Expect(*pu.Type).To(Equal(MODEL)) @@ -933,4 +933,4 @@ func TestPredictorNoGraph(t *testing.T) { spec.DefaultSeldonDeployment("mydep", "default") err := spec.ValidateSeldonDeployment() g.Expect(err).ToNot(BeNil()) -} \ No newline at end of file +} From 1898e2a62d5c4548ce4d0d7b096f668c5e22ab6a Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 16:30:21 +0100 Subject: [PATCH 11/13] Revert "Remove probesOnly from Executor tests" This reverts commit 6c1bc89a89cc1232b4fc77e8e0fd6e1e6f68cd9d. --- executor/api/rest/server.go | 50 ++++++++++++++++---------------- executor/api/rest/server_test.go | 45 ++++++++++++++-------------- executor/main.go | 2 +- 3 files changed, 48 insertions(+), 49 deletions(-) diff --git a/executor/api/rest/server.go b/executor/api/rest/server.go index e3f9fbfd57..1d9c39225f 100644 --- a/executor/api/rest/server.go +++ b/executor/api/rest/server.go @@ -127,31 +127,31 @@ func (r *SeldonRestApi) Initialise() { r.Router.HandleFunc("/ready", r.checkReady) r.Router.HandleFunc("/live", r.alive) r.Router.Handle(r.prometheusPath, promhttp.Handler()) - cloudeventHeaderMiddleware := CloudeventHeaderMiddleware{deploymentName: r.DeploymentName, namespace: r.Namespace} - r.Router.Use(puidHeader) - r.Router.Use(cloudeventHeaderMiddleware.Middleware) - switch r.Protocol { - case api.ProtocolSeldon: - //v0.1 API - api01 := r.Router.PathPrefix("/api/v0.1").Methods("POST").Subrouter() - api01.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) - api01.Handle("/feedback", r.wrapMetrics(metric.FeedbackHttpServiceName, r.feedback)) - r.Router.NewRoute().Path("/api/v0.1/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) - r.Router.NewRoute().PathPrefix("/api/v0.1/doc/").Handler(http.StripPrefix("/api/v0.1/doc/", http.FileServer(http.Dir("./openapi/")))) - //v1.0 API - api10 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter() - api10.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) - api10.Handle("/feedback", r.wrapMetrics(metric.FeedbackHttpServiceName, r.feedback)) - r.Router.NewRoute().Path("/api/v1.0/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) - r.Router.NewRoute().PathPrefix("/api/v1.0/doc/").Handler(http.StripPrefix("/api/v1.0/doc/", http.FileServer(http.Dir("./openapi/")))) - - case api.ProtocolTensorflow: - r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) - r.Router.NewRoute().Path("/v1/models/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) // Nonstandard path - Seldon extension - r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/metadata").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) + if !r.ProbesOnly { + cloudeventHeaderMiddleware := CloudeventHeaderMiddleware{deploymentName: r.DeploymentName, namespace: r.Namespace} + r.Router.Use(puidHeader) + r.Router.Use(cloudeventHeaderMiddleware.Middleware) + switch r.Protocol { + case api.ProtocolSeldon: + //v0.1 API + api01 := r.Router.PathPrefix("/api/v0.1").Methods("POST").Subrouter() + api01.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) + r.Router.NewRoute().Path("/api/v0.1/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) + r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) + r.Router.NewRoute().PathPrefix("/api/v0.1/doc/").Handler(http.StripPrefix("/api/v0.1/doc/", http.FileServer(http.Dir("./openapi/")))) + //v1.0 API + api1 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter() + api1.Handle("/predictions", r.wrapMetrics(metric.PredictionServiceMetricName, r.predictions)) + r.Router.NewRoute().Path("/api/v1.0/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) + r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) + r.Router.NewRoute().PathPrefix("/api/v1.0/doc/").Handler(http.StripPrefix("/api/v1.0/doc/", http.FileServer(http.Dir("./openapi/")))) + + case api.ProtocolTensorflow: + r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) + r.Router.NewRoute().Path("/v1/models/:predict").Methods("POST").HandlerFunc(r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) // Nonstandard path - Seldon extension + r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) + r.Router.NewRoute().Path("/v1/models/{" + ModelHttpPathVariable + "}/metadata").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) + } } } diff --git a/executor/api/rest/server_test.go b/executor/api/rest/server_test.go index 27e0396073..8f76599e93 100644 --- a/executor/api/rest/server_test.go +++ b/executor/api/rest/server_test.go @@ -1,14 +1,6 @@ package rest import ( - "io/ioutil" - "net/http" - "net/http/httptest" - "net/url" - "strconv" - "strings" - "testing" - guuid "github.com/google/uuid" . "github.com/onsi/gomega" "github.com/prometheus/common/expfmt" @@ -17,6 +9,13 @@ import ( "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/test" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + "testing" ) const ( @@ -28,7 +27,7 @@ func TestAliveEndpoint(t *testing.T) { g := NewGomegaWithT(t) url, _ := url.Parse("http://localhost") - r := NewServerRestApi(nil, nil, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(nil, nil, true, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/live", nil) @@ -56,7 +55,7 @@ func TestSimpleModel(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -88,7 +87,7 @@ func TestCloudeventHeaderIsSet(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -127,7 +126,7 @@ func TestCloudeventHeaderIsNotSet(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, testNamespace, api.ProtocolSeldon, testDepName, "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -161,7 +160,7 @@ func TestReponsePuuidHeaderIsSet(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -210,7 +209,7 @@ func TestRequestPuuidHeaderIsSet(t *testing.T) { client, err := NewJSONRestClient(api.ProtocolSeldon, "dep", &p, nil) g.Expect(err).To(BeNil()) - r := NewServerRestApi(&p, client, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, client, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -257,7 +256,7 @@ func TestModelWithServer(t *testing.T) { client, err := NewJSONRestClient(api.ProtocolSeldon, "dep", &p, nil) g.Expect(err).To(BeNil()) - r := NewServerRestApi(&p, client, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, client, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -288,7 +287,7 @@ func TestServerMetrics(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -329,7 +328,7 @@ func TestTensorflowStatus(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/v1/models/mymodel", nil) @@ -358,7 +357,7 @@ func TestSeldonStatus(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/api/v1.0/status/mymodel", nil) @@ -387,7 +386,7 @@ func TestSeldonMetadata(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/api/v1.0/metadata/mymodel", nil) @@ -444,7 +443,7 @@ func TestTensorflowMetadata(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() req, _ := http.NewRequest("GET", "/v1/models/mymodel/metadata", nil) @@ -490,7 +489,7 @@ func TestPredictErrorWithServer(t *testing.T) { } client, err := NewJSONRestClient(api.ProtocolSeldon, "dep", &p, nil) g.Expect(err).Should(BeNil()) - r := NewServerRestApi(&p, client, url, "default", api.ProtocolSeldon, "test", "/metrics") + r := NewServerRestApi(&p, client, false, url, "default", api.ProtocolSeldon, "test", "/metrics") r.Initialise() var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -525,7 +524,7 @@ func TestTensorflowModel(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() var data = ` {"instances":[[1,2,3]]}` @@ -555,7 +554,7 @@ func TestTensorflowModelBadModelName(t *testing.T) { } url, _ := url.Parse("http://localhost") - r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, url, "default", api.ProtocolTensorflow, "test", "/metrics") + r := NewServerRestApi(&p, &test.SeldonMessageTestClient{}, false, url, "default", api.ProtocolTensorflow, "test", "/metrics") r.Initialise() var data = ` {"instances":[[1,2,3]]}` diff --git a/executor/main.go b/executor/main.go index 7e4da5a37d..a8ac8a2fe9 100644 --- a/executor/main.go +++ b/executor/main.go @@ -242,7 +242,7 @@ func main() { log.Fatalf("Failed to create http client: %v", err) } logger.Info("Running HTTP server ", "port", *port) - go runHttpServer(httpl, logger, predictor, clientRest, *port, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) + go runHttpServer(httpl, logger, predictor, clientRest, *port, false, serverUrl, *namespace, *protocol, *sdepName, *prometheusPath) var clientGrpc seldonclient.SeldonApiClient if *protocol == "seldon" { From bfd0ab51e4c5df0bc1178e8b7141cf8644576e26 Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 16:31:10 +0100 Subject: [PATCH 12/13] Revert "remove ProbesOnly from SeldonRestApi" This reverts commit 144659b47992dd2c2e455ee32c3f4274838c8036. --- executor/api/rest/server.go | 17 ++++++++++------- executor/main.go | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/executor/api/rest/server.go b/executor/api/rest/server.go index 1d9c39225f..d076991771 100644 --- a/executor/api/rest/server.go +++ b/executor/api/rest/server.go @@ -3,10 +3,6 @@ package rest import ( "context" "fmt" - "io/ioutil" - "net/http" - "net/url" - "github.com/go-logr/logr" guuid "github.com/google/uuid" "github.com/gorilla/mux" @@ -19,7 +15,10 @@ import ( "github.com/seldonio/seldon-core/executor/api/metric" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/predictor" - v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" + "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" + "io/ioutil" + "net/http" + "net/url" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" ) @@ -37,6 +36,7 @@ type SeldonRestApi struct { Client client.SeldonApiClient predictor *v1.PredictorSpec Log logr.Logger + ProbesOnly bool ServerUrl *url.URL Namespace string Protocol string @@ -45,14 +45,17 @@ type SeldonRestApi struct { prometheusPath string } -func NewServerRestApi(predictor *v1.PredictorSpec, client client.SeldonApiClient, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) *SeldonRestApi { +func NewServerRestApi(predictor *v1.PredictorSpec, client client.SeldonApiClient, probesOnly bool, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) *SeldonRestApi { var serverMetrics *metric.ServerMetrics - serverMetrics = metric.NewServerMetrics(predictor, deploymentName) + if !probesOnly { + serverMetrics = metric.NewServerMetrics(predictor, deploymentName) + } return &SeldonRestApi{ mux.NewRouter(), client, predictor, logf.Log.WithName("SeldonRestApi"), + probesOnly, serverUrl, namespace, protocol, diff --git a/executor/main.go b/executor/main.go index a8ac8a2fe9..36ea50bb83 100644 --- a/executor/main.go +++ b/executor/main.go @@ -93,10 +93,10 @@ func getServerUrl(hostname string, port int) (*url.URL, error) { } func runHttpServer(lis net.Listener, logger logr.Logger, predictor *v1.PredictorSpec, client seldonclient.SeldonApiClient, port int, - serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) { + probesOnly bool, serverUrl *url.URL, namespace string, protocol string, deploymentName string, prometheusPath string) { // Create REST API - seldonRest := rest.NewServerRestApi(predictor, client, serverUrl, namespace, protocol, deploymentName, prometheusPath) + seldonRest := rest.NewServerRestApi(predictor, client, probesOnly, serverUrl, namespace, protocol, deploymentName, prometheusPath) seldonRest.Initialise() srv := seldonRest.CreateHttpServer(port) From 82eac9486d1066227bc05d0c6b298d833dc5b280 Mon Sep 17 00:00:00 2001 From: glindsell Date: Mon, 18 May 2020 16:43:56 +0100 Subject: [PATCH 13/13] Undo changes to REST server Signed-off-by: glindsell --- executor/api/rest/server.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/executor/api/rest/server.go b/executor/api/rest/server.go index d076991771..87fea80640 100644 --- a/executor/api/rest/server.go +++ b/executor/api/rest/server.go @@ -3,6 +3,10 @@ package rest import ( "context" "fmt" + "io/ioutil" + "net/http" + "net/url" + "github.com/go-logr/logr" guuid "github.com/google/uuid" "github.com/gorilla/mux" @@ -15,10 +19,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/metric" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/predictor" - "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "io/ioutil" - "net/http" - "net/url" + v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" ) @@ -139,14 +140,16 @@ func (r *SeldonRestApi) Initialise() { //v0.1 API api01 := r.Router.PathPrefix("/api/v0.1").Methods("POST").Subrouter() api01.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) + api01.Handle("/feedback", r.wrapMetrics(metric.FeedbackHttpServiceName, r.feedback)) r.Router.NewRoute().Path("/api/v0.1/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) + r.Router.NewRoute().Path("/api/v0.1/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) r.Router.NewRoute().PathPrefix("/api/v0.1/doc/").Handler(http.StripPrefix("/api/v0.1/doc/", http.FileServer(http.Dir("./openapi/")))) //v1.0 API - api1 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter() - api1.Handle("/predictions", r.wrapMetrics(metric.PredictionServiceMetricName, r.predictions)) + api10 := r.Router.PathPrefix("/api/v1.0").Methods("POST").Subrouter() + api10.Handle("/predictions", r.wrapMetrics(metric.PredictionHttpServiceName, r.predictions)) + api10.Handle("/feedback", r.wrapMetrics(metric.FeedbackHttpServiceName, r.feedback)) r.Router.NewRoute().Path("/api/v1.0/status/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.status)) - r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.StatusHttpServiceName, r.metadata)) + r.Router.NewRoute().Path("/api/v1.0/metadata/{" + ModelHttpPathVariable + "}").Methods("GET").HandlerFunc(r.wrapMetrics(metric.MetadataHttpServiceName, r.metadata)) r.Router.NewRoute().PathPrefix("/api/v1.0/doc/").Handler(http.StripPrefix("/api/v1.0/doc/", http.FileServer(http.Dir("./openapi/")))) case api.ProtocolTensorflow: