Skip to content

Commit

Permalink
include seldon-puid in grpc headers (#3390)
Browse files Browse the repository at this point in the history
* include seldon-puid in grpc headers

* include seldon-puid also in SendFeedback calls

* fix argo -> argo-workflows

* extend seldon-puid headers to kfserving grpc

* fix flakes in test_notebooks

* fix kfserving local example for executor

* make python linter happy
  • Loading branch information
RafalSkolasinski authored Jul 12, 2021
1 parent 287bfcf commit 78fbe31
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 30 deletions.
2 changes: 1 addition & 1 deletion examples/batch/argo-workflows-batch/README.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"#### Install Seldon Core\n",
"Use the notebook to [set-up Seldon Core with Ambassador or Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/examples/seldon_core_setup.html).\n",
"\n",
"Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.\n",
"Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo-workflows/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.\n",
"\n",
"#### Set up Minio in your cluster\n",
"Use the notebook to [set-up Minio in your cluster](https://docs.seldon.io/projects/seldon-core/en/latest/examples/minio_setup.html).\n",
Expand Down
3 changes: 1 addition & 2 deletions examples/batch/argo-workflows-batch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Dependencies:
#### Install Seldon Core
Use the notebook to [set-up Seldon Core with Ambassador or Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/examples/seldon_core_setup.html).

Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.
Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo-workflows/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.

#### Set up Minio in your cluster
Use the notebook to [set-up Minio in your cluster](https://docs.seldon.io/projects/seldon-core/en/latest/examples/minio_setup.html).
Expand Down Expand Up @@ -427,4 +427,3 @@ Now we can output the contents of the file created using the `mc head` command.
```

Workflow 'seldon-batch-process' deleted

2 changes: 1 addition & 1 deletion examples/batch/hdfs-argo-workflows/hdfs-batch.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"### Install Seldon Core\n",
"Use the notebook to [set-up Seldon Core with Ambassador or Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/examples/seldon_core_setup.html).\n",
"\n",
"Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue:\n",
"Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo-workflows/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue:\n",
"```bash\n",
"kubectl patch -n argo configmap workflow-controller-configmap \\\n",
" --type merge -p '{\"data\": {\"config\": \"containerRuntimeExecutor: k8sapi\"}}'\n",
Expand Down
8 changes: 8 additions & 0 deletions executor/api/grpc/kfserving/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/predictor"
v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
protoGrpc "google.golang.org/grpc"
protoGrpcMetadata "google.golang.org/grpc/metadata"
"net/url"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)
Expand Down Expand Up @@ -41,6 +43,8 @@ func (g GrpcKFServingServer) ServerReady(ctx context.Context, request *inference

func (g GrpcKFServingServer) ModelReady(ctx context.Context, request *inference.ModelReadyRequest) (*inference.ModelReadyResponse, error) {
md := grpc.CollectMetadata(ctx)
header := protoGrpcMetadata.Pairs(payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
protoGrpc.SetHeader(ctx, header)
ctx = context.WithValue(ctx, payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("infer"), g.ServerUrl, g.Namespace, md, request.GetName())
reqPayload := payload.ProtoPayload{Msg: request}
Expand All @@ -57,6 +61,8 @@ func (g GrpcKFServingServer) ServerMetadata(ctx context.Context, request *infere

func (g GrpcKFServingServer) ModelMetadata(ctx context.Context, request *inference.ModelMetadataRequest) (*inference.ModelMetadataResponse, error) {
md := grpc.CollectMetadata(ctx)
header := protoGrpcMetadata.Pairs(payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
protoGrpc.SetHeader(ctx, header)
ctx = context.WithValue(ctx, payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("infer"), g.ServerUrl, g.Namespace, md, request.GetName())
reqPayload := payload.ProtoPayload{Msg: request}
Expand All @@ -69,6 +75,8 @@ func (g GrpcKFServingServer) ModelMetadata(ctx context.Context, request *inferen

func (g GrpcKFServingServer) ModelInfer(ctx context.Context, request *inference.ModelInferRequest) (*inference.ModelInferResponse, error) {
md := grpc.CollectMetadata(ctx)
header := protoGrpcMetadata.Pairs(payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
protoGrpc.SetHeader(ctx, header)
ctx = context.WithValue(ctx, payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("infer"), g.ServerUrl, g.Namespace, md, request.GetModelName())
reqPayload := payload.ProtoPayload{Msg: request}
Expand Down
9 changes: 8 additions & 1 deletion executor/api/grpc/seldon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/seldonio/seldon-core/executor/api/payload"
"github.com/seldonio/seldon-core/executor/predictor"
v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1"
protoGrpc "google.golang.org/grpc"
protoGrpcMetadata "google.golang.org/grpc/metadata"
"net/url"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)
Expand All @@ -34,6 +36,8 @@ func NewGrpcSeldonServer(predictor *v1.PredictorSpec, client client.SeldonApiCli

func (g GrpcSeldonServer) Predict(ctx context.Context, req *proto.SeldonMessage) (*proto.SeldonMessage, error) {
md := grpc.CollectMetadata(ctx)
header := protoGrpcMetadata.Pairs(payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
protoGrpc.SetHeader(ctx, header)
ctx = context.WithValue(ctx, payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("SeldonMessageRestClient"), g.ServerUrl, g.Namespace, md, "")
reqPayload := payload.ProtoPayload{Msg: req}
Expand All @@ -46,7 +50,10 @@ func (g GrpcSeldonServer) Predict(ctx context.Context, req *proto.SeldonMessage)
}

func (g GrpcSeldonServer) SendFeedback(ctx context.Context, req *proto.Feedback) (*proto.SeldonMessage, error) {
seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("SeldonMessageRestClient"), g.ServerUrl, g.Namespace, grpc.CollectMetadata(ctx), "")
md := grpc.CollectMetadata(ctx)
header := protoGrpcMetadata.Pairs(payload.SeldonPUIDHeader, md.Get(payload.SeldonPUIDHeader)[0])
protoGrpc.SetHeader(ctx, header)
seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("SeldonMessageRestClient"), g.ServerUrl, g.Namespace, md, "")
reqPayload := payload.ProtoPayload{Msg: req}
resPayload, err := seldonPredictorProcess.Feedback(&g.predictor.Graph, &reqPayload)
if err != nil {
Expand Down
13 changes: 5 additions & 8 deletions executor/samples/local/graph/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@ run_executor:


run_model:
export PREDICTIVE_UNIT_HTTP_SERVICE_PORT=9001 && export PREDICTIVE_UNIT_GRPC_SERVICE_PORT=9501 && export PREDICTIVE_UNIT_METRICS_SERVICE_PORT=6001 && seldon-core-microservice --service-type MODEL Model GRPC
export PREDICTIVE_UNIT_HTTP_SERVICE_PORT=9001 && export PREDICTIVE_UNIT_GRPC_SERVICE_PORT=9501 && export PREDICTIVE_UNIT_METRICS_SERVICE_PORT=6001 && seldon-core-microservice --service-type MODEL Model

run_transformer:
export PREDICTIVE_UNIT_HTTP_SERVICE_PORT=9000 && export PREDICTIVE_UNIT_GRPC_SERVICE_PORT=9500 && seldon-core-microservice --service-type TRANSFORMER Transformer GRPC
export PREDICTIVE_UNIT_HTTP_SERVICE_PORT=9000 && export PREDICTIVE_UNIT_GRPC_SERVICE_PORT=9500 && seldon-core-microservice --service-type TRANSFORMER Transformer


curl_rest:
curl -v localhost:8000/api/v0.1/predictions -H "Accept: application/json" -H "Content-Type: application/json" -d '{"data":{"ndarray":[[1.0,2.0]]}}'
curl -v localhost:8000/api/v1.0/predictions -H "Accept: application/json" -H "Content-Type: application/json" -d '{"data":{"ndarray":[[1.0,2.0]]}}'

curl_rest_multipart:
curl -v localhost:8000/api/v0.1/predictions -H "Accept: application/json" -F jsonData=@input.json
curl -v localhost:8000/api/v1.0/predictions -H "Accept: application/json" -F jsonData=@input.json


grpc_test:
cd ${BASE}/proto && grpcurl -d '{"data":{"ndarray":[[1.0,2.0]]}}' -plaintext -proto ./prediction.proto 0.0.0.0:5000 seldon.protos.Seldon/Predict



cd ${BASE}/proto && grpcurl -v -d '{"data":{"ndarray":[[1.0,2.0]]}}' -plaintext -proto ./prediction.proto 0.0.0.0:5000 seldon.protos.Seldon/Predict
3 changes: 0 additions & 3 deletions executor/samples/local/graph/Model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@ class Model(object):
def predict(self, X, feature_names):
print(X)
return X



8 changes: 4 additions & 4 deletions executor/samples/local/graph/model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ spec:
type: REST
service_host: 0.0.0.0
service_port: 9000
http_port: 9000
grpc_port: 9500
httpPort: 9000
grpcPort: 9500
name: transformer
type: TRANSFORMER
children:
Expand All @@ -31,8 +31,8 @@ spec:
service_host: 0.0.0.0
service_port: 9001
name: classifier
http_port: 9001
grpc_port: 9501
httpPort: 9001
grpcPort: 9501
type: MODEL
labels:
version: v1
Expand Down
4 changes: 1 addition & 3 deletions executor/samples/local/kfserving/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ curl_metadata:


grpc_test:
cd ${BASE}/api/grpc/kfserving/inference && grpcurl -d '{"model_name":"simple","inputs":[{"name":"INPUT0","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]}]}' -plaintext -proto ./grpc_service.proto 0.0.0.0:5000 inference.GRPCInferenceService/ModelInfer
cd ${BASE}/api/grpc/kfserving/inference && grpcurl -v -d '{"model_name":"simple","inputs":[{"name":"INPUT0","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","contents":{"int_contents":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]},"datatype":"INT32","shape":[1,16]}]}' -plaintext -proto ./grpc_service.proto 0.0.0.0:5000 inference.GRPCInferenceService/ModelInfer

grpc_status:
cd ${BASE}/api/grpc/kfserving/inference && grpcurl -d '{"name":"simple"}' -plaintext -proto ./grpc_service.proto 0.0.0.0:5000 inference.GRPCInferenceService/ModelReady
Expand All @@ -41,5 +41,3 @@ grpc_status_triton:

grpc_metadata:
cd ${BASE}/api/grpc/kfserving/inference && grpcurl -d '{"name":"simple"}' -plaintext -proto ./grpc_service.proto 0.0.0.0:5000 inference.GRPCInferenceService/ModelMetadata


4 changes: 2 additions & 2 deletions executor/samples/local/kfserving/model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ spec:
children: []
endpoint:
service_host: 0.0.0.0
http_port: 9000
grpc_port: 5001
httpPort: 9000
grpcPort: 5001
implementation: TRITON_SERVER
modelUri: gs://seldon-models/trtis/simple-model
name: simple
Expand Down
2 changes: 1 addition & 1 deletion testing/benchmarking/automated-benchmark/README.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"### Install Seldon Core\n",
"Use the notebook to [set-up Seldon Core with Ambassador or Istio Ingress](https://docs.seldon.io/projects/seldon-core/en/latest/examples/seldon_core_setup.html).\n",
"\n",
"Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.\n"
"Note: If running with KIND you need to make sure do follow [these steps](https://github.com/argoproj/argo-workflows/issues/2376#issuecomment-595593237) as workaround to the `/.../docker.sock` known issue.\n"
]
},
{
Expand Down
10 changes: 6 additions & 4 deletions testing/scripts/test_notebooks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from subprocess import CalledProcessError, run

import pytest

from seldon_e2e_utils import create_and_run_script
Expand Down Expand Up @@ -60,7 +62,7 @@ def test_keda_prom_auto_scale(self):
create_and_run_script("../../examples/keda", "keda_prom_auto_scale")
except CalledProcessError as e:
run(
f"helm delete seldon-core-analytics --namespace seldon-system",
"helm delete seldon-core-analytics --namespace seldon-system",
shell=True,
check=False,
)
Expand All @@ -79,7 +81,7 @@ def test_metrics(self):
create_and_run_script("../../examples/models/metrics", "general_metrics")
except CalledProcessError as e:
run(
f"helm delete seldon-core-analytics --namespace seldon-system",
"helm delete seldon-core-analytics --namespace seldon-system",
shell=True,
check=False,
)
Expand All @@ -106,7 +108,7 @@ def test_custom_metrics(self):
)
except CalledProcessError as e:
run(
f"helm delete seldon-core-analytics --namespace seldon-system",
"helm delete seldon-core-analytics --namespace seldon-system",
shell=True,
check=False,
)
Expand All @@ -118,7 +120,7 @@ def test_autoscaling(self):
"../../examples/models/autoscaling", "autoscaling_example"
)
except CalledProcessError as e:
run(f"helm delete loadtester --namespace seldon", shell=True, check=False)
run("helm delete loadtester --namespace seldon", shell=True, check=False)
raise e

def test_scaling(self):
Expand Down

0 comments on commit 78fbe31

Please sign in to comment.