diff --git a/examples/centralised-logging/fluentd-values.yaml b/examples/centralised-logging/fluentd-values.yaml index 7eaa4fff40..68b05438e6 100644 --- a/examples/centralised-logging/fluentd-values.yaml +++ b/examples/centralised-logging/fluentd-values.yaml @@ -1,5 +1,5 @@ elasticsearch: - host: 'elasticsearch-master' + hosts: ['elasticsearch-master'] logstash_prefix: 'kubernetes_cluster' configMaps: @@ -95,4 +95,4 @@ extraConfigMaps: key $.kubernetes.container_name pattern istio-proxy - \ No newline at end of file + diff --git a/examples/centralised-logging/full-setup-existing-kubeflow.sh b/examples/centralised-logging/full-setup-existing-kubeflow.sh index 0c4234fbda..d3fd6f0251 100755 --- a/examples/centralised-logging/full-setup-existing-kubeflow.sh +++ b/examples/centralised-logging/full-setup-existing-kubeflow.sh @@ -49,7 +49,7 @@ kubectl create namespace logs || echo "namespace logs exists" helm upgrade --install elasticsearch elasticsearch --version 7.6.0 --namespace=logs --set service.type=ClusterIP --set antiAffinity="soft" --repo https://helm.elastic.co --set image=docker.elastic.co/elasticsearch/elasticsearch-oss kubectl rollout status statefulset/elasticsearch-master -n logs -helm upgrade --install fluentd fluentd-elasticsearch --namespace=logs -f fluentd-values.yaml --repo https://kiwigrid.github.io +helm upgrade --install fluentd fluentd-elasticsearch --version 8.0.0 --namespace=logs -f fluentd-values.yaml --repo https://kiwigrid.github.io helm upgrade --install kibana kibana --version 7.6.0 --namespace=logs --set service.type=ClusterIP -f ./kubeflow/kibana-values.yaml --repo https://helm.elastic.co --set image=docker.elastic.co/kibana/kibana-oss kubectl apply -f ./kubeflow/virtualservice-kibana.yaml diff --git a/examples/centralised-logging/full-setup.sh b/examples/centralised-logging/full-setup.sh index 1bf58e51e9..d384d1d5f2 100755 --- a/examples/centralised-logging/full-setup.sh +++ b/examples/centralised-logging/full-setup.sh @@ -23,7 +23,7 @@ helm install --name elasticsearch elasticsearch --version 7.6.0 --namespace=logs kubectl rollout status statefulset/elasticsearch-master -n logs kubectl patch svc elasticsearch-master -n logs -p '{"spec": {"type": "LoadBalancer"}}' -helm install fluentd-elasticsearch --name fluentd --namespace=logs -f fluentd-values.yaml --repo https://kiwigrid.github.io +helm install fluentd-elasticsearch --version 8.0.0 --name fluentd --namespace=logs -f fluentd-values.yaml --repo https://kiwigrid.github.io helm install kibana --version 7.6.0 --name=kibana --namespace=logs --set service.type=NodePort --repo https://helm.elastic.co --set image=docker.elastic.co/kibana/kibana-oss kubectl rollout status deployment/kibana-kibana -n logs diff --git a/executor/api/grpc/seldon/server.go b/executor/api/grpc/seldon/server.go index 891042fe36..201eb08dc0 100644 --- a/executor/api/grpc/seldon/server.go +++ b/executor/api/grpc/seldon/server.go @@ -2,14 +2,15 @@ package seldon import ( "context" + "net/url" + "github.com/go-logr/logr" "github.com/seldonio/seldon-core/executor/api/client" "github.com/seldonio/seldon-core/executor/api/grpc" "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/predictor" - "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "net/url" + v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" ) @@ -32,7 +33,9 @@ func NewGrpcSeldonServer(predictor *v1.PredictorSpec, client client.SeldonApiCli } func (g GrpcSeldonServer) Predict(ctx context.Context, req *proto.SeldonMessage) (*proto.SeldonMessage, error) { - seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("SeldonMessageRestClient"), g.ServerUrl, g.Namespace, grpc.CollectMetadata(ctx)) + md := grpc.CollectMetadata(ctx) + ctx = context.WithValue(ctx, payload.SeldonPUIDHeader, md[payload.SeldonPUIDHeader]) + seldonPredictorProcess := predictor.NewPredictorProcess(ctx, g.Client, logf.Log.WithName("SeldonMessageRestClient"), g.ServerUrl, g.Namespace, md) reqPayload := payload.ProtoPayload{Msg: req} resPayload, err := seldonPredictorProcess.Predict(g.predictor.Graph, &reqPayload) if err != nil { diff --git a/executor/api/grpc/server.go b/executor/api/grpc/server.go index c7461fb195..f72d040021 100644 --- a/executor/api/grpc/server.go +++ b/executor/api/grpc/server.go @@ -2,10 +2,13 @@ package grpc import ( "context" + "math" + "strconv" + "github.com/go-logr/logr" guuid "github.com/google/uuid" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/opentracing/opentracing-go" "github.com/seldonio/seldon-core/executor/api/metric" "github.com/seldonio/seldon-core/executor/api/payload" @@ -13,8 +16,6 @@ import ( v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "math" - "strconv" ) const ( diff --git a/executor/predictor/predictor_process.go b/executor/predictor/predictor_process.go index d31053bfbf..4b01cf22e3 100644 --- a/executor/predictor/predictor_process.go +++ b/executor/predictor/predictor_process.go @@ -3,14 +3,15 @@ package predictor import ( "context" "fmt" + "net/url" + "sync" + "github.com/go-logr/logr" guuid "github.com/google/uuid" "github.com/seldonio/seldon-core/executor/api/client" "github.com/seldonio/seldon-core/executor/api/payload" payloadLogger "github.com/seldonio/seldon-core/executor/logger" - "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "net/url" - "sync" + v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" ) type PredictorProcess struct { @@ -251,7 +252,12 @@ func (p *PredictorProcess) logPayload(nodeName string, logger *v1.Logger, reqTyp if err != nil { return err } - + var reqId string + if r, ok := p.Ctx.Value(payload.SeldonPUIDHeader).(string); ok { + reqId = r + } else { + return fmt.Errorf("context value Seldon PUID Header is nil: interface to string conversion failed") + } payloadLogger.QueueLogRequest(payloadLogger.LogRequest{ Url: logUrl, Bytes: &data, @@ -260,7 +266,7 @@ func (p *PredictorProcess) logPayload(nodeName string, logger *v1.Logger, reqTyp Id: guuid.New().String(), SourceUri: p.ServerUrl, ModelId: nodeName, - RequestId: p.Ctx.Value(payload.SeldonPUIDHeader).(string), + RequestId: reqId, }) return nil } @@ -268,7 +274,10 @@ func (p *PredictorProcess) logPayload(nodeName string, logger *v1.Logger, reqTyp func (p *PredictorProcess) Predict(node *v1.PredictiveUnit, msg payload.SeldonPayload) (payload.SeldonPayload, error) { //Log Request if node.Logger != nil && (node.Logger.Mode == v1.LogRequest || node.Logger.Mode == v1.LogAll) { - p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg) + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceRequest, msg) + if err != nil { + return nil, err + } } tmsg, err := p.transformInput(node, msg) if err != nil { @@ -281,7 +290,10 @@ func (p *PredictorProcess) Predict(node *v1.PredictiveUnit, msg payload.SeldonPa response, err := p.transformOutput(node, cmsg) // Log Response if err == nil && node.Logger != nil && (node.Logger.Mode == v1.LogResponse || node.Logger.Mode == v1.LogAll) { - p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, response) + err := p.logPayload(node.Name, node.Logger, payloadLogger.InferenceResponse, response) + if err != nil { + return nil, err + } } return response, err } diff --git a/executor/predictor/predictor_process_test.go b/executor/predictor/predictor_process_test.go index 27ae9212f6..0be1c18184 100644 --- a/executor/predictor/predictor_process_test.go +++ b/executor/predictor/predictor_process_test.go @@ -4,6 +4,11 @@ import ( "context" "errors" "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "github.com/golang/protobuf/jsonpb" . "github.com/onsi/gomega" "github.com/seldonio/seldon-core/executor/api/grpc" @@ -12,11 +17,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/test" "github.com/seldonio/seldon-core/executor/logger" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" - "net/http" - "net/http/httptest" - "net/url" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "testing" ) const ( @@ -50,6 +51,13 @@ func createPredictorProcessWithError(t *testing.T, errMethod *v1.PredictiveUnitM return &pp } +func createPredictorProcessWithoutPUID(t *testing.T) *PredictorProcess { + url, _ := url.Parse(testSourceUrl) + ctx := context.WithValue(context.TODO(), payload.SeldonPUIDHeader, nil) + pp := NewPredictorProcess(ctx, &test.SeldonMessageTestClient{}, logf.Log.WithName("SeldonMessageRestClient"), url, "default", map[string][]string{testCustomMetaKey: []string{testCustomMetaValue}}) + return &pp +} + func createPredictPayload(g *GomegaWithT) payload.SeldonPayload { var sm proto.SeldonMessage var data = ` {"data":{"ndarray":[1.1,2.0]}}` @@ -455,3 +463,85 @@ func TestModelWithLogResponses(t *testing.T) { g.Expect(smRes.GetData().GetNdarray().Values[1].GetNumberValue()).Should(Equal(2.0)) g.Eventually(func() bool { return logged }).Should(Equal(true)) } + +func TestModelWithLogRequestsNilPUIDError(t *testing.T) { + t.Logf("Started") + g := NewGomegaWithT(t) + modelName := "foo" + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + //g.Expect(r.Header.Get(logger.CloudEventsIdHeader)).Should(Equal(testEventId)) + g.Expect(r.Header.Get(logger.CloudEventsTypeHeader)).To(Equal(logger.CEInferenceRequest)) + g.Expect(r.Header.Get(logger.CloudEventsTypeSource)).To(Equal(testSourceUrl)) + g.Expect(r.Header.Get(modelIdHeaderName)).To(Equal(modelName)) + g.Expect(r.Header.Get(contentTypeHeaderName)).To(Equal(grpc.ProtobufContentType)) + g.Expect(r.Header.Get(requestIdHeaderName)).To(Equal(testSeldonPuid)) + w.Write([]byte("")) + fmt.Printf("%+v\n", r.Header) + fmt.Printf("%+v\n", r.Body) + }) + server := httptest.NewServer(handler) + defer server.Close() + + logf.SetLogger(logf.ZapLogger(false)) + log := logf.Log.WithName("entrypoint") + logger.StartDispatcher(1, log, "", "", "") + + model := v1.MODEL + graph := &v1.PredictiveUnit{ + Name: modelName, + Type: &model, + Endpoint: &v1.Endpoint{ + ServiceHost: "foo", + ServicePort: 9000, + Type: v1.REST, + }, + Logger: &v1.Logger{ + Mode: v1.LogRequest, + Url: &server.URL, + }, + } + + _, err := createPredictorProcessWithoutPUID(t).Predict(graph, createPredictPayload(g)) + g.Expect(err).NotTo(BeNil()) + g.Expect(err.Error()).Should(Equal("context value Seldon PUID Header is nil: interface to string conversion failed")) +} + +func TestModelWithLogResponsesNilPUIDError(t *testing.T) { + t.Logf("Started") + g := NewGomegaWithT(t) + modelName := "foo" + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + //g.Expect(r.Header.Get(logger.CloudEventsIdHeader)).Should(Equal(testEventId)) + g.Expect(r.Header.Get(logger.CloudEventsTypeHeader)).To(Equal(logger.CEInferenceResponse)) + g.Expect(r.Header.Get(logger.CloudEventsTypeSource)).To(Equal(testSourceUrl)) + g.Expect(r.Header.Get(modelIdHeaderName)).To(Equal(modelName)) + g.Expect(r.Header.Get(contentTypeHeaderName)).To(Equal(grpc.ProtobufContentType)) + g.Expect(r.Header.Get(requestIdHeaderName)).To(Equal(testSeldonPuid)) + w.Write([]byte("")) + }) + server := httptest.NewServer(handler) + defer server.Close() + + logf.SetLogger(logf.ZapLogger(false)) + log := logf.Log.WithName("entrypoint") + logger.StartDispatcher(1, log, "", "", "") + + model := v1.MODEL + graph := &v1.PredictiveUnit{ + Name: modelName, + Type: &model, + Endpoint: &v1.Endpoint{ + ServiceHost: "foo", + ServicePort: 9000, + Type: v1.REST, + }, + Logger: &v1.Logger{ + Mode: v1.LogResponse, + Url: &server.URL, + }, + } + + _, err := createPredictorProcessWithoutPUID(t).Predict(graph, createPredictPayload(g)) + g.Expect(err).NotTo(BeNil()) + g.Expect(err.Error()).Should(Equal("context value Seldon PUID Header is nil: interface to string conversion failed")) +}