Skip to content
This repository has been archived by the owner on Apr 18, 2023. It is now read-only.

Commit

Permalink
draft v2.0.0 (separate stats handlers) - initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrkowalczuk committed Nov 15, 2018
1 parent 68e3a13 commit ce2a716
Show file tree
Hide file tree
Showing 2 changed files with 306 additions and 0 deletions.
212 changes: 212 additions & 0 deletions draft.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package grpc_prometheus

import (
"fmt"
"strconv"
"strings"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc/stats"
)

const (
namespace = "grpc"
labelService = "grpc_service"
labelMethod = "grpc_method"
labelType = "grpc_type"
labelCode = "grpc_code"
labelUserAgent = "grpc_user_agent"
labelFailFast = "grpc_fail_fast"
)

type ctxKey int

var (
tagRPCKey ctxKey = 1
tagConnKey ctxKey = 2
)

type Subsystem int

var (
Server Subsystem = 1
Client Subsystem = 2
)

func (s Subsystem) String() string {
switch s {
case Server:
return "server"
case Client:
return "client"
default:
return "unknown"
}
}

// NewRequestsTotalGaugeVecV1 exists for backward compatibility.
func NewRequestsTotalCounterVecV1(sub Subsystem) *prometheus.CounterVec {
name := "started_total"
help := fmt.Sprintf("Total number of RPCs started on the %s.", sub.String())
switch sub {
case Server:
return newRequestsTotalCounterVec(sub.String(), name, help)
case Client:
return newRequestsTotalCounterVec(sub.String(), name, help)
default:
// TODO: panic?
panic("unknown subsystem")
}
}

func NewRequestsTotalCounterVec(sub Subsystem) *prometheus.CounterVec {
switch sub {
case Server:
return newRequestsTotalCounterVec(sub.String(), "received_requests_total", "A total number of RPC requests received by the server.")
case Client:
return newRequestsTotalCounterVec(sub.String(), "sent_requests_total", "A total number of RPC requests sent by the client.")
default:
// TODO: panic?
panic("unknown subsystem")
}
}

func newRequestsTotalCounterVec(sub, name, help string) *prometheus.CounterVec {
return prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: sub,
Name: name,
Help: help,
},
[]string{labelFailFast, labelService, labelMethod},
//[]string{labelType, labelService, labelMethod}, TODO: IsServerStream and IsClientStream not available outside interceptors. Type label cannot be used.
)
}

type StatsHandlerCollector interface {
stats.Handler
prometheus.Collector
}

type StatsHandler struct {
handlers []StatsHandlerCollector
}

var _ StatsHandlerCollector = &StatsHandler{}

// NewStatsHandler allows to pass various number of handlers.
func NewStatsHandler(handlers ...StatsHandlerCollector) *StatsHandler {
return &StatsHandler{
handlers: handlers,
}
}

func (h *StatsHandler) TagRPC(ctx context.Context, inf *stats.RPCTagInfo) context.Context {
service, method := split(inf.FullMethodName)

ctx = context.WithValue(ctx, tagRPCKey, prometheus.Labels{
labelFailFast: strconv.FormatBool(inf.FailFast),
labelService: service,
labelMethod: method,
})

for _, c := range h.handlers {
ctx = c.TagRPC(ctx, inf)
}
return ctx
}

// HandleRPC processes the RPC stats.
func (h *StatsHandler) HandleRPC(ctx context.Context, sts stats.RPCStats) {
for _, c := range h.handlers {
c.HandleRPC(ctx, sts)
}
}

func (h *StatsHandler) TagConn(ctx context.Context, inf *stats.ConnTagInfo) context.Context {
for _, c := range h.handlers {
ctx = c.TagConn(ctx, inf)
}
return ctx
}

// HandleConn processes the Conn stats.
func (h *StatsHandler) HandleConn(ctx context.Context, sts stats.ConnStats) {
for _, c := range h.handlers {
c.HandleConn(ctx, sts)
}
}

// Describe implements prometheus Collector interface.
func (h *StatsHandler) Describe(in chan<- *prometheus.Desc) {
for _, c := range h.handlers {
c.Describe(in)
}
}

// Collect implements prometheus Collector interface.
func (h *StatsHandler) Collect(in chan<- prometheus.Metric) {
for _, c := range h.handlers {
c.Collect(in)
}
}

type RequestsTotalStatsHandler struct {
sub Subsystem
vec *prometheus.CounterVec
}

// NewRequestsTotalStatsHandler ...
// The GaugeVec must have zero, one, two, three or four non-const non-curried labels.
// For those, the only allowed label names are "fail_fast", "handler", "service" and "user_agent".
func NewRequestsTotalStatsHandler(sub Subsystem, vec *prometheus.CounterVec) *RequestsTotalStatsHandler {
return &RequestsTotalStatsHandler{
sub: sub,
vec: vec,
}
}

func (h *RequestsTotalStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}

// HandleRPC processes the RPC stats.
func (h *RequestsTotalStatsHandler) HandleRPC(ctx context.Context, stat stats.RPCStats) {
lab, _ := ctx.Value(tagRPCKey).(prometheus.Labels)

if beg, ok := stat.(*stats.Begin); ok {
switch {
case beg.IsClient() && h.sub == Client:
h.vec.With(lab).Inc()
case !beg.IsClient() && h.sub == Server:
h.vec.With(lab).Inc()
}
}
}

func (h *RequestsTotalStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

// HandleConn processes the Conn stats.
func (h *RequestsTotalStatsHandler) HandleConn(ctx context.Context, stat stats.ConnStats) {
}

// Describe implements prometheus Collector interface.
func (h *RequestsTotalStatsHandler) Describe(in chan<- *prometheus.Desc) {
h.vec.Describe(in)
}

// Collect implements prometheus Collector interface.
func (h *RequestsTotalStatsHandler) Collect(in chan<- prometheus.Metric) {
h.vec.Collect(in)
}

func split(name string) (string, string) {
if i := strings.LastIndex(name, "/"); i >= 0 {
return name[1:i], name[i+1:]
}
return "unknown", "unknown"
}
94 changes: 94 additions & 0 deletions draft_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package grpc_prometheus_test

import (
"fmt"
"log"
"net"
"os"
"time"

"google.golang.org/grpc"

"github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/grpc-ecosystem/go-grpc-prometheus/examples/grpc-server-with-prometheus/protobuf"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
)

func ExampleDraft() {
// Listen an actual port.
lis, err := net.Listen("tcp", "127.0.0.1:0")
assert(err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

reg := prometheus.NewRegistry()
sts := grpc_prometheus.NewStatsHandler(
grpc_prometheus.NewRequestsTotalStatsHandler(
grpc_prometheus.Server,
grpc_prometheus.NewRequestsTotalCounterVec(grpc_prometheus.Server),
),
)
srv := grpc.NewServer(grpc.StatsHandler(sts))
imp := newDemoServer()

pb.RegisterDemoServiceServer(srv, imp)
reg.MustRegister(sts)

go func() {
if err := srv.Serve(lis); err != grpc.ErrServerStopped {
assert(err)
}
}()

con, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
assert(err)

pb.NewDemoServiceClient(con).SayHello(ctx, &pb.HelloRequest{Name: "example"})

srv.GracefulStop()

mf, err := reg.Gather()
assert(err)

for _, m := range mf {
fmt.Println(m.GetName())
}

// Output: grpc_server_received_requests_total
}

func assert(err error) {
if err != nil {
log.Println("ERR:", err)
os.Exit(1)
}
}

// demoServiceServer defines a Server.
type demoServiceServer struct{}

func newDemoServer() *demoServiceServer {
return &demoServiceServer{}
}

// SayHello implements a interface defined by protobuf.
func (s *demoServiceServer) SayHello(ctx context.Context, request *pb.HelloRequest) (*pb.HelloResponse, error) {
customizedCounterMetric.WithLabelValues(request.Name).Inc()
return &pb.HelloResponse{Message: fmt.Sprintf("Hello %s", request.Name)}, nil
}

var (
// Create a metrics registry.
reg = prometheus.NewRegistry()

// Create some standard server metrics.
grpcMetrics = grpc_prometheus.NewServerMetrics()

// Create a customized counter metric.
customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "demo_server_say_hello_method_handle_count",
Help: "Total number of RPCs handled on the server.",
}, []string{"name"})
)

0 comments on commit ce2a716

Please sign in to comment.