diff --git a/README.md b/README.md index b4a4e83..68c8c46 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![GoDoc](http://img.shields.io/badge/GoDoc-Reference-blue.svg)](https://godoc.org/github.com/grpc-ecosystem/go-grpc-prometheus) [![Apache 2.0 License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) -[Prometheus](https://prometheus.io/) monitoring for your [gRPC Go](https://github.com/grpc/grpc-go) servers. +[Prometheus](https://prometheus.io/) monitoring for your [gRPC Go](https://github.com/grpc/grpc-go) servers and clients. A sister implementation for [gRPC Java](https://github.com/grpc/grpc-java) (same metrics, same semantics) is in [grpc-ecosystem/java-grpc-prometheus](https://github.com/grpc-ecosystem/java-grpc-prometheus). @@ -19,6 +19,10 @@ To use Interceptors in chains, please see [`go-grpc-middleware`](https://github. ## Usage +There are two types of interceptors: client-side and server-side. This package provides monitoring Interceptors for both. + +### Server-side + ```go import "github.com/grpc-ecosystem/go-grpc-prometheus" ... @@ -27,17 +31,35 @@ import "github.com/grpc-ecosystem/go-grpc-prometheus" grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), ) + // Register your gRPC service implementations. + myservice.RegisterMyServiceServer(s.server, &myServiceImpl{}) + // After all your registrations, make sure all of the Prometheus metrics are initialized. + grpc_prometheus.Register(myServer) // Register Prometheus metrics handler. http.Handle("/metrics", prometheus.Handler()) ... ``` -# Metrics +### Client-side +```go +import "github.com/grpc-ecosystem/go-grpc-prometheus" +... + clientConn, err = grpc.Dial( + address, + grpc.WithUnaryInterceptor(UnaryClientInterceptor), + grpc.WithStreamInterceptor(StreamClientInterceptor) + ) + client = pb_testproto.NewTestServiceClient(clientConn) + resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"}) +... +``` + +# Metrics ## Labels -All server-side metrics start with `grpc_server` as Prometheus subsystem name. Similarly all methods +All server-side metrics start with `grpc_server` as Prometheus subsystem name. All client-side metrics start with `grpc_client`. Both of them have mirror-concepts. Similarly all methods contain the same rich labels: * `grpc_service` - the [gRPC service](http://www.grpc.io/docs/#defining-a-service) name, which is the combination of protobuf `package` and @@ -65,9 +87,11 @@ Additionally for completed RPCs, the following labels are used: ## Counters -The counters and their up to date documentation is in [server_reporter.go](server_reporter.go) and +The counters and their up to date documentation is in [server_reporter.go](server_reporter.go) and [client_reporter.go](client_reporter.go) the respective Prometheus handler (usually `/metrics`). +For the purpose of this documentation we will only discuss `grpc_server` metrics. The `grpc_client` ones contain mirror concepts. + For simplicity, let's assume we're tracking a single server-side RPC call of [`mwitkow.testproto.TestService`](examples/testproto/test.proto), calling the method `PingList`. The call succeeds and returns 20 messages in the stream. @@ -214,8 +238,7 @@ e.g. "less than 1% of requests are slower than 250ms". ## Status -This code has been in an upstream [pull request](https://github.com/grpc/grpc-go/pull/299) since August 2015. It has -served as the basis for monitoring of production gRPC micro services at [Improbable](https://improbable.io) since then. +This code has been used since August 2015 as the basis for monitoring of *production* gRPC micro services at [Improbable](https://improbable.io). ## License diff --git a/client.go b/client.go new file mode 100644 index 0000000..d9e87b2 --- /dev/null +++ b/client.go @@ -0,0 +1,72 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +// gRPC Prometheus monitoring interceptors for client-side gRPC. + +package grpc_prometheus + +import ( + "io" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs. +func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + monitor := newClientReporter(Unary, method) + monitor.SentMessage() + err := invoker(ctx, method, req, reply, cc, opts...) + if err != nil { + monitor.ReceivedMessage() + } + monitor.Handled(grpc.Code(err)) + return err +} + +// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs. +func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + monitor := newClientReporter(clientStreamType(desc), method) + clientStream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + monitor.Handled(grpc.Code(err)) + return nil, err + } + return &monitoredClientStream{clientStream, monitor}, nil +} + +func clientStreamType(desc *grpc.StreamDesc) grpcType { + if desc.ClientStreams && !desc.ServerStreams { + return ClientStream + } else if !desc.ClientStreams && desc.ServerStreams { + return ServerStream + } + return BidiStream +} + +// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters. +type monitoredClientStream struct { + grpc.ClientStream + monitor *clientReporter +} + +func (s *monitoredClientStream) SendMsg(m interface{}) error { + err := s.ClientStream.SendMsg(m) + if err == nil { + s.monitor.SentMessage() + } + return err +} + +func (s *monitoredClientStream) RecvMsg(m interface{}) error { + err := s.ClientStream.RecvMsg(m) + if err == nil { + s.monitor.ReceivedMessage() + } else if err == io.EOF { + s.monitor.Handled(codes.OK) + } else { + s.monitor.Handled(grpc.Code(err)) + } + return err +} diff --git a/client_reporter.go b/client_reporter.go new file mode 100644 index 0000000..16b7615 --- /dev/null +++ b/client_reporter.go @@ -0,0 +1,111 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package grpc_prometheus + +import ( + "time" + + "google.golang.org/grpc/codes" + + prom "github.com/prometheus/client_golang/prometheus" +) + +var ( + clientStartedCounter = prom.NewCounterVec( + prom.CounterOpts{ + Namespace: "grpc", + Subsystem: "client", + Name: "started_total", + Help: "Total number of RPCs started on the client.", + }, []string{"grpc_type", "grpc_service", "grpc_method"}) + + clientHandledCounter = prom.NewCounterVec( + prom.CounterOpts{ + Namespace: "grpc", + Subsystem: "client", + Name: "handled_total", + Help: "Total number of RPCs completed by the client, regardless of success or failure.", + }, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}) + + clientStreamMsgReceived = prom.NewCounterVec( + prom.CounterOpts{ + Namespace: "grpc", + Subsystem: "client", + Name: "msg_received_total", + Help: "Total number of RPC stream messages received by the client.", + }, []string{"grpc_type", "grpc_service", "grpc_method"}) + + clientStreamMsgSent = prom.NewCounterVec( + prom.CounterOpts{ + Namespace: "grpc", + Subsystem: "client", + Name: "msg_sent_total", + Help: "Total number of gRPC stream messages sent by the client.", + }, []string{"grpc_type", "grpc_service", "grpc_method"}) + + clientHandledHistogramEnabled = false + clientHandledHistogramOpts = prom.HistogramOpts{ + Namespace: "grpc", + Subsystem: "client", + Name: "handling_seconds", + Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.", + Buckets: prom.DefBuckets, + } + clientHandledHistogram *prom.HistogramVec +) + +func init() { + prom.MustRegister(clientStartedCounter) + prom.MustRegister(clientHandledCounter) + prom.MustRegister(clientStreamMsgReceived) + prom.MustRegister(clientStreamMsgSent) +} + +// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. +func EnableClientHandlingTimeHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&clientHandledHistogramOpts) + } + if !clientHandledHistogramEnabled { + clientHandledHistogram = prom.NewHistogramVec( + clientHandledHistogramOpts, + []string{"grpc_type", "grpc_service", "grpc_method"}, + ) + prom.Register(clientHandledHistogram) + } + clientHandledHistogramEnabled = true +} + +type clientReporter struct { + rpcType grpcType + serviceName string + methodName string + startTime time.Time +} + +func newClientReporter(rpcType grpcType, fullMethod string) *clientReporter { + r := &clientReporter{rpcType: rpcType} + if clientHandledHistogramEnabled { + r.startTime = time.Now() + } + r.serviceName, r.methodName = splitMethodName(fullMethod) + clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() + return r +} + +func (r *clientReporter) ReceivedMessage() { + clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() +} + +func (r *clientReporter) SentMessage() { + clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() +} + +func (r *clientReporter) Handled(code codes.Code) { + clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc() + if clientHandledHistogramEnabled { + clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds()) + } +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..b2ebda4 --- /dev/null +++ b/client_test.go @@ -0,0 +1,212 @@ +// Copyright 2016 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package grpc_prometheus + +import ( + "net" + "testing" + + "time" + + "io" + + pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func TestClientInterceptorSuite(t *testing.T) { + suite.Run(t, &ClientInterceptorTestSuite{}) +} + +type ClientInterceptorTestSuite struct { + suite.Suite + + serverListener net.Listener + server *grpc.Server + clientConn *grpc.ClientConn + testClient pb_testproto.TestServiceClient + ctx context.Context +} + +func (s *ClientInterceptorTestSuite) SetupSuite() { + var err error + + EnableClientHandlingTimeHistogram() + + s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + + // This is the point where we hook up the interceptor + s.server = grpc.NewServer() + pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()}) + + go func() { + s.server.Serve(s.serverListener) + }() + + s.clientConn, err = grpc.Dial( + s.serverListener.Addr().String(), + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithUnaryInterceptor(UnaryClientInterceptor), + grpc.WithStreamInterceptor(StreamClientInterceptor), + grpc.WithTimeout(2*time.Second)) + require.NoError(s.T(), err, "must not error on client Dial") + s.testClient = pb_testproto.NewTestServiceClient(s.clientConn) +} + +func (s *ClientInterceptorTestSuite) SetupTest() { + // Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests. + s.ctx, _ = context.WithTimeout(context.TODO(), 2*time.Second) +} + +func (s *ClientInterceptorTestSuite) TearDownSuite() { + if s.serverListener != nil { + s.server.Stop() + s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String()) + s.serverListener.Close() + + } + if s.clientConn != nil { + s.clientConn.Close() + } +} + +func (s *ClientInterceptorTestSuite) TestUnaryIncrementsStarted() { + var before int + var after int + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_started_total", "PingEmpty", "unary") + s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_started_total", "PingEmpty", "unary") + assert.EqualValues(s.T(), before+1, after, "grpc_client_started_total should be incremented for PingEmpty") + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_started_total", "PingError", "unary") + s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.Unavailable)}) + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_started_total", "PingError", "unary") + assert.EqualValues(s.T(), before+1, after, "grpc_client_started_total should be incremented for PingError") +} + +func (s *ClientInterceptorTestSuite) TestUnaryIncrementsHandled() { + var before int + var after int + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingEmpty", "unary", "OK") + s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingEmpty", "unary", "OK") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handled_count should be incremented for PingEmpty") + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingError", "unary", "FailedPrecondition") + s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingError", "unary", "FailedPrecondition") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handled_total should be incremented for PingError") +} + +func (s *ClientInterceptorTestSuite) TestUnaryIncrementsHistograms() { + var before int + var after int + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingEmpty", "unary") + s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingEmpty", "unary") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handled_count should be incremented for PingEmpty") + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingError", "unary") + s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingError", "unary") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handling_seconds_count should be incremented for PingError") +} + +func (s *ClientInterceptorTestSuite) TestStreamingIncrementsStarted() { + var before int + var after int + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_started_total", "PingList", "server_stream") + s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_started_total", "PingList", "server_stream") + assert.EqualValues(s.T(), before+1, after, "grpc_client_started_total should be incremented for PingList") +} + +func (s *ClientInterceptorTestSuite) TestStreamingIncrementsHistograms() { + var before int + var after int + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingList", "server_stream") + ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK + // Do a read, just for kicks. + for { + _, err := ss.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading pingList shouldn't fail") + } + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingList", "server_stream") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handling_seconds_count should be incremented for PingList OK") + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingList", "server_stream") + ss, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.NoError(s.T(), err, "PingList must not fail immedietely") + // Do a read, just to progate errors. + _, err = ss.Recv() + require.Equal(s.T(), codes.FailedPrecondition, grpc.Code(err), "Recv must return FailedPrecondition, otherwise the test is wrong") + + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handling_seconds_count", "PingList", "server_stream") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handling_seconds_count should be incremented for PingList FailedPrecondition") +} + +func (s *ClientInterceptorTestSuite) TestStreamingIncrementsHandled() { + var before int + var after int + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingList", "server_stream", "OK") + ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK + // Do a read, just for kicks. + for { + _, err := ss.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading pingList shouldn't fail") + } + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingList", "server_stream", "OK") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handled_total should be incremented for PingList OK") + + before = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingList", "server_stream", "FailedPrecondition") + ss, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition + require.NoError(s.T(), err, "PingList must not fail immedietely") + // Do a read, just to progate errors. + _, err = ss.Recv() + require.Equal(s.T(), codes.FailedPrecondition, grpc.Code(err), "Recv must return FailedPrecondition, otherwise the test is wrong") + + after = sumCountersForMetricAndLabels(s.T(), "grpc_client_handled_total", "PingList", "server_stream", "FailedPrecondition") + assert.EqualValues(s.T(), before+1, after, "grpc_client_handled_total should be incremented for PingList FailedPrecondition") +} + +func (s *ClientInterceptorTestSuite) TestStreamingIncrementsMessageCounts() { + beforeRecv := sumCountersForMetricAndLabels(s.T(), "grpc_client_msg_received_total", "PingList", "server_stream") + beforeSent := sumCountersForMetricAndLabels(s.T(), "grpc_client_msg_sent_total", "PingList", "server_stream") + ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK + // Do a read, just for kicks. + count := 0 + for { + _, err := ss.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading pingList shouldn't fail") + count += 1 + } + require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match") + afterSent := sumCountersForMetricAndLabels(s.T(), "grpc_client_msg_sent_total", "PingList", "server_stream") + afterRecv := sumCountersForMetricAndLabels(s.T(), "grpc_client_msg_received_total", "PingList", "server_stream") + + assert.EqualValues(s.T(), beforeSent+1, afterSent, "grpc_client_msg_sent_total should be incremented 20 times for PingList") + assert.EqualValues(s.T(), beforeRecv+countListResponses, afterRecv, "grpc_client_msg_sent_total should be incremented ones for PingList ") +} diff --git a/server_reporter.go b/server_reporter.go index 6b32d23..628a890 100644 --- a/server_reporter.go +++ b/server_reporter.go @@ -79,7 +79,7 @@ func WithHistogramBuckets(buckets []float64) HistogramOption { return func(o *prom.HistogramOpts) { o.Buckets = buckets } } -// EnableHandlingTimeHistogram turns on recording of handling time of RPCs. +// EnableHandlingTimeHistogram turns on recording of handling time of RPCs for server-side interceptors. // Histogram metrics can be very expensive for Prometheus to retain and query. func EnableHandlingTimeHistogram(opts ...HistogramOption) { for _, o := range opts { diff --git a/server_test.go b/server_test.go index e5d1e2e..f6944f0 100644 --- a/server_test.go +++ b/server_test.go @@ -4,17 +4,15 @@ package grpc_prometheus import ( - "net" - "testing" - - "time" - "bufio" "io" + "net" "net/http" "net/http/httptest" "strconv" "strings" + "testing" + "time" pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto" "github.com/prometheus/client_golang/prometheus" @@ -31,7 +29,7 @@ const ( countListResponses = 20 ) -func TestProxyHappySuite(t *testing.T) { +func TestServerInterceptorSuite(t *testing.T) { suite.Run(t, &ServerInterceptorTestSuite{}) } @@ -61,7 +59,6 @@ func (s *ServerInterceptorTestSuite) SetupSuite() { pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()}) go func() { - s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String()) s.server.Serve(s.serverListener) }() @@ -71,7 +68,6 @@ func (s *ServerInterceptorTestSuite) SetupSuite() { // Important! Pre-register stuff here. Register(s.server) - } func (s *ServerInterceptorTestSuite) SetupTest() { @@ -107,8 +103,8 @@ func (s *ServerInterceptorTestSuite) TestRegisterPresetsStuff() { {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary", "FailedPrecondition"}}, {"grpc_server_handled_total", []string{"mwitkow.testproto.TestService", "PingEmpty", "unary", "ResourceExhausted"}}, } { - assert.Equal(s.T(), 0, s.sumCountersForMetricAndLabels(testCase.metricName, testCase.existingLabels...), - "metrics must be pre-set for test case %d", testId) + lineCount := len(fetchPrometheusLines(s.T(), testCase.metricName, testCase.existingLabels...)) + assert.NotEqual(s.T(), 0, lineCount, "metrics must exist for test case %d", testId) } } @@ -116,14 +112,14 @@ func (s *ServerInterceptorTestSuite) TestUnaryIncrementsStarted() { var before int var after int - before = s.sumCountersForMetricAndLabels("grpc_server_started_total", "PingEmpty", "unary") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingEmpty", "unary") s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) - after = s.sumCountersForMetricAndLabels("grpc_server_started_total", "PingEmpty", "unary") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingEmpty", "unary") assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingEmpty") - before = s.sumCountersForMetricAndLabels("grpc_server_started_total", "PingError", "unary") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingError", "unary") s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.Unavailable)}) - after = s.sumCountersForMetricAndLabels("grpc_server_started_total", "PingError", "unary") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingError", "unary") assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingError") } @@ -131,14 +127,14 @@ func (s *ServerInterceptorTestSuite) TestUnaryIncrementsHandled() { var before int var after int - before = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingEmpty", "unary", "OK") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingEmpty", "unary", "OK") s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK - after = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingEmpty", "unary", "OK") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingEmpty", "unary", "OK") assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_count should be incremented for PingEmpty") - before = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingError", "unary", "FailedPrecondition") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingError", "unary", "FailedPrecondition") s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition - after = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingError", "unary", "FailedPrecondition") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingError", "unary", "FailedPrecondition") assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingError") } @@ -146,14 +142,14 @@ func (s *ServerInterceptorTestSuite) TestUnaryIncrementsHistograms() { var before int var after int - before = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingEmpty", "unary") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingEmpty", "unary") s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK - after = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingEmpty", "unary") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingEmpty", "unary") assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_count should be incremented for PingEmpty") - before = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingError", "unary") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingError", "unary") s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition - after = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingError", "unary") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingError", "unary") assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingError") } @@ -161,9 +157,9 @@ func (s *ServerInterceptorTestSuite) TestStreamingIncrementsStarted() { var before int var after int - before = s.sumCountersForMetricAndLabels("grpc_server_started_total", "PingList", "server_stream") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingList", "server_stream") s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) - after = s.sumCountersForMetricAndLabels("grpc_server_started_total", "PingList", "server_stream") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_started_total", "PingList", "server_stream") assert.EqualValues(s.T(), before+1, after, "grpc_server_started_total should be incremented for PingList") } @@ -171,7 +167,7 @@ func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHistograms() { var before int var after int - before = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingList", "server_stream") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream") ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK // Do a read, just for kicks. for { @@ -181,14 +177,14 @@ func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHistograms() { } require.NoError(s.T(), err, "reading pingList shouldn't fail") } - after = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingList", "server_stream") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream") assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingList OK") - before = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingList", "server_stream") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream") _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition require.NoError(s.T(), err, "PingList must not fail immedietely") - after = s.sumCountersForMetricAndLabels("grpc_server_handling_seconds_count", "PingList", "server_stream") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handling_seconds_count", "PingList", "server_stream") assert.EqualValues(s.T(), before+1, after, "grpc_server_handling_seconds_count should be incremented for PingList FailedPrecondition") } @@ -196,7 +192,7 @@ func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHandled() { var before int var after int - before = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingList", "server_stream", "OK") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "OK") ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK // Do a read, just for kicks. for { @@ -206,20 +202,20 @@ func (s *ServerInterceptorTestSuite) TestStreamingIncrementsHandled() { } require.NoError(s.T(), err, "reading pingList shouldn't fail") } - after = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingList", "server_stream", "OK") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "OK") assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingList OK") - before = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition") + before = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition") _, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition require.NoError(s.T(), err, "PingList must not fail immedietely") - after = s.sumCountersForMetricAndLabels("grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition") + after = sumCountersForMetricAndLabels(s.T(), "grpc_server_handled_total", "PingList", "server_stream", "FailedPrecondition") assert.EqualValues(s.T(), before+1, after, "grpc_server_handled_total should be incremented for PingList FailedPrecondition") } func (s *ServerInterceptorTestSuite) TestStreamingIncrementsMessageCounts() { - beforeRecv := s.sumCountersForMetricAndLabels("grpc_server_msg_received_total", "PingList", "server_stream") - beforeSent := s.sumCountersForMetricAndLabels("grpc_server_msg_sent_total", "PingList", "server_stream") + beforeRecv := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_received_total", "PingList", "server_stream") + beforeSent := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_sent_total", "PingList", "server_stream") ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK // Do a read, just for kicks. count := 0 @@ -232,17 +228,17 @@ func (s *ServerInterceptorTestSuite) TestStreamingIncrementsMessageCounts() { count += 1 } require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match") - afterSent := s.sumCountersForMetricAndLabels("grpc_server_msg_sent_total", "PingList", "server_stream") - afterRecv := s.sumCountersForMetricAndLabels("grpc_server_msg_received_total", "PingList", "server_stream") + afterSent := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_sent_total", "PingList", "server_stream") + afterRecv := sumCountersForMetricAndLabels(s.T(), "grpc_server_msg_received_total", "PingList", "server_stream") assert.EqualValues(s.T(), beforeSent+countListResponses, afterSent, "grpc_server_msg_sent_total should be incremented 20 times for PingList") assert.EqualValues(s.T(), beforeRecv+1, afterRecv, "grpc_server_msg_sent_total should be incremented ones for PingList ") } -func (s *ServerInterceptorTestSuite) fetchPrometheusLines(metricName string, matchingLabelValues ...string) []string { +func fetchPrometheusLines(t *testing.T, metricName string, matchingLabelValues ...string) []string { resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "/", nil) - require.NoError(s.T(), err, "failed creating request for Prometheus handler") + require.NoError(t, err, "failed creating request for Prometheus handler") prometheus.Handler().ServeHTTP(resp, req) reader := bufio.NewReader(resp.Body) ret := []string{} @@ -251,7 +247,7 @@ func (s *ServerInterceptorTestSuite) fetchPrometheusLines(metricName string, mat if err == io.EOF { break } else { - require.NoError(s.T(), err, "error reading stuff") + require.NoError(t, err, "error reading stuff") } if !strings.HasPrefix(line, metricName) { continue @@ -270,18 +266,13 @@ func (s *ServerInterceptorTestSuite) fetchPrometheusLines(metricName string, mat return ret } -func (s *ServerInterceptorTestSuite) sumCountersForMetricAndLabels(metricName string, matchingLabelValues ...string) int { +func sumCountersForMetricAndLabels(t *testing.T, metricName string, matchingLabelValues ...string) int { count := 0 - found := false - for _, line := range s.fetchPrometheusLines(metricName, matchingLabelValues...) { + for _, line := range fetchPrometheusLines(t, metricName, matchingLabelValues...) { valueString := line[strings.LastIndex(line, " ")+1 : len(line)-1] valueFloat, err := strconv.ParseFloat(valueString, 32) - require.NoError(s.T(), err, "failed parsing value for line: %v", line) + require.NoError(t, err, "failed parsing value for line: %v", line) count += int(valueFloat) - found = true - } - if !found { - return -1 } return count }