From 28015bb9aeba5d065cbd06f3924245e9e1466069 Mon Sep 17 00:00:00 2001 From: Piotr Kowalczuk Date: Thu, 15 Nov 2018 17:17:22 +0100 Subject: [PATCH 1/2] draft v2.0.0 (separate stats handlers) - initial commit --- draft.go | 228 ++++++++++++++++++++++++++++++++++++++++++++++++++ draft_test.go | 100 ++++++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100644 draft.go create mode 100644 draft_test.go diff --git a/draft.go b/draft.go new file mode 100644 index 0000000..5c3ff35 --- /dev/null +++ b/draft.go @@ -0,0 +1,228 @@ +package grpc_prometheus + +import ( + "fmt" + "strconv" + "strings" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/stats" +) + +const ( + namespace = "grpc" + labelService = "grpc_service" + labelMethod = "grpc_method" + labelType = "grpc_type" + labelCode = "grpc_code" + labelUserAgent = "grpc_user_agent" + labelFailFast = "grpc_fail_fast" +) + +type ctxKey int + +var ( + tagRPCKey ctxKey = 1 + tagConnKey ctxKey = 2 +) + +type Subsystem int + +var ( + Server Subsystem = 1 + Client Subsystem = 2 +) + +func (s Subsystem) String() string { + switch s { + case Server: + return "server" + case Client: + return "client" + default: + return "unknown" + } +} + +// NewRequestsTotalGaugeVecV1 exists for backward compatibility. +func NewRequestsTotalCounterVecV1(sub Subsystem) *prometheus.CounterVec { + name := "started_total" + help := fmt.Sprintf("Total number of RPCs started on the %s.", sub.String()) + switch sub { + case Server: + return newRequestsTotalCounterVec(sub.String(), name, help) + case Client: + return newRequestsTotalCounterVec(sub.String(), name, help) + default: + // TODO: panic? + panic("unknown subsystem") + } +} + +func NewRequestsTotalCounterVec(sub Subsystem) *prometheus.CounterVec { + switch sub { + case Server: + return newRequestsTotalCounterVec(sub.String(), "received_requests_total", "A total number of RPC requests received by the server.") + case Client: + return newRequestsTotalCounterVec(sub.String(), "sent_requests_total", "A total number of RPC requests sent by the client.") + default: + // TODO: panic? + panic("unknown subsystem") + } +} + +func newRequestsTotalCounterVec(sub, name, help string) *prometheus.CounterVec { + return prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: sub, + Name: name, + Help: help, + }, + []string{labelFailFast, labelService, labelMethod}, + //[]string{labelType, labelService, labelMethod}, TODO: IsServerStream and IsClientStream not available outside interceptors. Type label cannot be used. + ) +} + +type StatsHandlerCollector interface { + // Init reallocates possible dimensions for given metric. + Init(map[string]grpc.ServiceInfo) error + + stats.Handler + prometheus.Collector +} + +type StatsHandler struct { + handlers []StatsHandlerCollector +} + +var _ StatsHandlerCollector = &StatsHandler{} + +// NewStatsHandler allows to pass various number of handlers. +func NewStatsHandler(handlers ...StatsHandlerCollector) *StatsHandler { + return &StatsHandler{ + handlers: handlers, + } +} + +// Init implements StatsHandlerCollector interface. +// TODO: implement +func (h *StatsHandler) Init(info map[string]grpc.ServiceInfo) error { + return nil +} + +func (h *StatsHandler) TagRPC(ctx context.Context, inf *stats.RPCTagInfo) context.Context { + service, method := split(inf.FullMethodName) + + ctx = context.WithValue(ctx, tagRPCKey, prometheus.Labels{ + labelFailFast: strconv.FormatBool(inf.FailFast), + labelService: service, + labelMethod: method, + }) + + for _, c := range h.handlers { + ctx = c.TagRPC(ctx, inf) + } + return ctx +} + +// HandleRPC processes the RPC stats. +func (h *StatsHandler) HandleRPC(ctx context.Context, sts stats.RPCStats) { + for _, c := range h.handlers { + c.HandleRPC(ctx, sts) + } +} + +func (h *StatsHandler) TagConn(ctx context.Context, inf *stats.ConnTagInfo) context.Context { + for _, c := range h.handlers { + ctx = c.TagConn(ctx, inf) + } + return ctx +} + +// HandleConn processes the Conn stats. +func (h *StatsHandler) HandleConn(ctx context.Context, sts stats.ConnStats) { + for _, c := range h.handlers { + c.HandleConn(ctx, sts) + } +} + +// Describe implements prometheus Collector interface. +func (h *StatsHandler) Describe(in chan<- *prometheus.Desc) { + for _, c := range h.handlers { + c.Describe(in) + } +} + +// Collect implements prometheus Collector interface. +func (h *StatsHandler) Collect(in chan<- prometheus.Metric) { + for _, c := range h.handlers { + c.Collect(in) + } +} + +type RequestsTotalStatsHandler struct { + sub Subsystem + vec *prometheus.CounterVec +} + +// NewRequestsTotalStatsHandler ... +// The GaugeVec must have zero, one, two, three or four non-const non-curried labels. +// For those, the only allowed label names are "fail_fast", "handler", "service" and "user_agent". +func NewRequestsTotalStatsHandler(sub Subsystem, vec *prometheus.CounterVec) *RequestsTotalStatsHandler { + return &RequestsTotalStatsHandler{ + sub: sub, + vec: vec, + } +} + +// Init implements StatsHandlerCollector interface. +// TODO: implement +func (h *RequestsTotalStatsHandler) Init(info map[string]grpc.ServiceInfo) error { + return nil +} + +func (h *RequestsTotalStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx +} + +// HandleRPC processes the RPC stats. +func (h *RequestsTotalStatsHandler) HandleRPC(ctx context.Context, stat stats.RPCStats) { + lab, _ := ctx.Value(tagRPCKey).(prometheus.Labels) + + if beg, ok := stat.(*stats.Begin); ok { + switch { + case beg.IsClient() && h.sub == Client: + h.vec.With(lab).Inc() + case !beg.IsClient() && h.sub == Server: + h.vec.With(lab).Inc() + } + } +} + +func (h *RequestsTotalStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleConn processes the Conn stats. +func (h *RequestsTotalStatsHandler) HandleConn(ctx context.Context, stat stats.ConnStats) { +} + +// Describe implements prometheus Collector interface. +func (h *RequestsTotalStatsHandler) Describe(in chan<- *prometheus.Desc) { + h.vec.Describe(in) +} + +// Collect implements prometheus Collector interface. +func (h *RequestsTotalStatsHandler) Collect(in chan<- prometheus.Metric) { + h.vec.Collect(in) +} + +func split(name string) (string, string) { + if i := strings.LastIndex(name, "/"); i >= 0 { + return name[1:i], name[i+1:] + } + return "unknown", "unknown" +} diff --git a/draft_test.go b/draft_test.go new file mode 100644 index 0000000..faf3c98 --- /dev/null +++ b/draft_test.go @@ -0,0 +1,100 @@ +package grpc_prometheus_test + +import ( + "fmt" + "log" + "net" + "os" + "time" + + "google.golang.org/grpc" + + "github.com/grpc-ecosystem/go-grpc-prometheus" + pb "github.com/grpc-ecosystem/go-grpc-prometheus/examples/grpc-server-with-prometheus/protobuf" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" +) + +func ExampleDraft() { + // Listen an actual port. + lis, err := net.Listen("tcp", "127.0.0.1:0") + assert(err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + reg := prometheus.NewRegistry() + sts := grpc_prometheus.NewStatsHandler( + grpc_prometheus.NewRequestsTotalStatsHandler( + grpc_prometheus.Server, + grpc_prometheus.NewRequestsTotalCounterVec(grpc_prometheus.Server), + ), + grpc_prometheus.NewRequestsTotalStatsHandler( + grpc_prometheus.Server, + grpc_prometheus.NewRequestsTotalCounterVecV1(grpc_prometheus.Server), + ), + ) + srv := grpc.NewServer(grpc.StatsHandler(sts)) + imp := newDemoServer() + + pb.RegisterDemoServiceServer(srv, imp) + reg.MustRegister(sts) + + go func() { + if err := srv.Serve(lis); err != grpc.ErrServerStopped { + assert(err) + } + }() + + con, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) + assert(err) + + pb.NewDemoServiceClient(con).SayHello(ctx, &pb.HelloRequest{Name: "example"}) + + srv.GracefulStop() + + mf, err := reg.Gather() + assert(err) + + for _, m := range mf { + fmt.Println(m.GetName()) + } + + // Output: + // grpc_server_received_requests_total + // grpc_server_started_total +} + +func assert(err error) { + if err != nil { + log.Println("ERR:", err) + os.Exit(1) + } +} + +// demoServiceServer defines a Server. +type demoServiceServer struct{} + +func newDemoServer() *demoServiceServer { + return &demoServiceServer{} +} + +// SayHello implements a interface defined by protobuf. +func (s *demoServiceServer) SayHello(ctx context.Context, request *pb.HelloRequest) (*pb.HelloResponse, error) { + customizedCounterMetric.WithLabelValues(request.Name).Inc() + return &pb.HelloResponse{Message: fmt.Sprintf("Hello %s", request.Name)}, nil +} + +var ( + // Create a metrics registry. + reg = prometheus.NewRegistry() + + // Create some standard server metrics. + grpcMetrics = grpc_prometheus.NewServerMetrics() + + // Create a customized counter metric. + customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "demo_server_say_hello_method_handle_count", + Help: "Total number of RPCs handled on the server.", + }, []string{"name"}) +) From c4453c21e4f760886cc89fb43ec3c3a9dec16470 Mon Sep 17 00:00:00 2001 From: Piotr Kowalczuk Date: Thu, 15 Nov 2018 17:33:19 +0100 Subject: [PATCH 2/2] draft v2.0.0 (separate stats handlers) - go vet, megacheck fixes --- draft.go | 18 +++++++++--------- draft_test.go | 17 +---------------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/draft.go b/draft.go index 5c3ff35..2504961 100644 --- a/draft.go +++ b/draft.go @@ -12,20 +12,20 @@ import ( ) const ( - namespace = "grpc" - labelService = "grpc_service" - labelMethod = "grpc_method" - labelType = "grpc_type" - labelCode = "grpc_code" - labelUserAgent = "grpc_user_agent" - labelFailFast = "grpc_fail_fast" + namespace = "grpc" + labelService = "grpc_service" + labelMethod = "grpc_method" + //labelType = "grpc_type" + //labelCode = "grpc_code" + //labelUserAgent = "grpc_user_agent" + labelFailFast = "grpc_fail_fast" ) type ctxKey int var ( - tagRPCKey ctxKey = 1 - tagConnKey ctxKey = 2 + tagRPCKey ctxKey = 1 + //tagConnKey ctxKey = 2 ) type Subsystem int diff --git a/draft_test.go b/draft_test.go index faf3c98..50274e6 100644 --- a/draft_test.go +++ b/draft_test.go @@ -15,7 +15,7 @@ import ( "golang.org/x/net/context" ) -func ExampleDraft() { +func ExampleStatsHandler() { // Listen an actual port. lis, err := net.Listen("tcp", "127.0.0.1:0") assert(err) @@ -81,20 +81,5 @@ func newDemoServer() *demoServiceServer { // SayHello implements a interface defined by protobuf. func (s *demoServiceServer) SayHello(ctx context.Context, request *pb.HelloRequest) (*pb.HelloResponse, error) { - customizedCounterMetric.WithLabelValues(request.Name).Inc() return &pb.HelloResponse{Message: fmt.Sprintf("Hello %s", request.Name)}, nil } - -var ( - // Create a metrics registry. - reg = prometheus.NewRegistry() - - // Create some standard server metrics. - grpcMetrics = grpc_prometheus.NewServerMetrics() - - // Create a customized counter metric. - customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "demo_server_say_hello_method_handle_count", - Help: "Total number of RPCs handled on the server.", - }, []string{"name"}) -)