Skip to content
This repository has been archived by the owner on Apr 18, 2023. It is now read-only.

Commit

Permalink
use stats.handler instead of interceptor and add message size monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
tonywang committed Apr 17, 2020
1 parent b7dd0c7 commit eba288c
Show file tree
Hide file tree
Showing 14 changed files with 813 additions and 13 deletions.
21 changes: 21 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var (

// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()

// ClientStatsHandler is a gRPC client-side stats.Handler that provides Prometheus monitoring for RPCs.
ClientStatsHandler = DefaultClientMetrics.NewClientStatsHandler()
)

func init() {
Expand Down Expand Up @@ -55,3 +58,21 @@ func EnableClientStreamSendTimeHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableClientStreamSendTimeHistogram(opts...)
prom.Register(DefaultClientMetrics.clientStreamSendHistogram)
}

// EnableClientMsgSizeReceivedBytesHistogram turns on recording of
// single message send time of streaming RPCs.
// This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableMsgSizeReceivedBytesHistogram(opts...)
prom.Register(DefaultClientMetrics.clientMsgSizeReceivedHistogram)
}

// EnableClientMsgSizeSentBytesHistogram turns on recording of
// single message send time of streaming RPCs.
// This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientMsgSizeSentBytesHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableMsgSizeSentBytesHistogram(opts...)
prom.Register(DefaultClientMetrics.clientMsgSizeSentHistogram)
}
85 changes: 80 additions & 5 deletions client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientStartedCounter *prom.CounterVec
clientStartedCounterOpts prom.CounterOpts
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec

clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
Expand All @@ -29,6 +31,14 @@ type ClientMetrics struct {
clientStreamSendHistogramEnabled bool
clientStreamSendHistogramOpts prom.HistogramOpts
clientStreamSendHistogram *prom.HistogramVec

clientMsgSizeReceivedHistogramEnabled bool
clientMsgSizeReceivedHistogramOpts prom.HistogramOpts
clientMsgSizeReceivedHistogram *prom.HistogramVec

clientMsgSizeSentHistogramEnabled bool
clientMsgSizeSentHistogramOpts prom.HistogramOpts
clientMsgSizeSentHistogram *prom.HistogramVec
}

// NewClientMetrics returns a ClientMetrics object. Use a new instance of
Expand Down Expand Up @@ -82,7 +92,21 @@ func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
Buckets: prom.DefBuckets,
},
clientStreamSendHistogram: nil,
clientStreamSendHistogram: nil,
clientMsgSizeReceivedHistogramEnabled: false,
clientMsgSizeReceivedHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_size_received_bytes",
Help: "Histogram of message sizes received by the client.",
Buckets: defMsgBytesBuckets,
},
clientMsgSizeReceivedHistogram: nil,
clientMsgSizeSentHistogramEnabled: false,
clientMsgSizeSentHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_size_sent_bytes",
Help: "Histogram of message sizes sent by the client.",
Buckets: defMsgBytesBuckets,
},
clientMsgSizeSentHistogram: nil,
}
}

Expand All @@ -103,6 +127,12 @@ func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Describe(ch)
}
if m.clientMsgSizeReceivedHistogramEnabled {
m.clientMsgSizeReceivedHistogram.Describe(ch)
}
if m.clientMsgSizeSentHistogramEnabled {
m.clientMsgSizeSentHistogram.Describe(ch)
}
}

// Collect is called by the Prometheus registry when collecting
Expand All @@ -122,6 +152,12 @@ func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Collect(ch)
}
if m.clientMsgSizeReceivedHistogramEnabled {
m.clientMsgSizeReceivedHistogram.Collect(ch)
}
if m.clientMsgSizeSentHistogramEnabled {
m.clientMsgSizeSentHistogram.Collect(ch)
}
}

// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
Expand Down Expand Up @@ -173,6 +209,38 @@ func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOpt
m.clientStreamSendHistogramEnabled = true
}

// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query. It takes
// options to configure histogram options such as the defined buckets.
func (m *ClientMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientMsgSizeReceivedHistogramOpts)
}
if !m.clientMsgSizeReceivedHistogramEnabled {
m.clientMsgSizeReceivedHistogram = prom.NewHistogramVec(
m.clientMsgSizeReceivedHistogramOpts,
[]string{"grpc_service", "grpc_method", "grpc_stats"},
)
}
m.clientMsgSizeReceivedHistogramEnabled = true
}

// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query. It
// takes options to configure histogram options such as the defined buckets.
func (m *ClientMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientMsgSizeSentHistogramOpts)
}
if !m.clientMsgSizeSentHistogramEnabled {
m.clientMsgSizeSentHistogram = prom.NewHistogramVec(
m.clientMsgSizeSentHistogramOpts,
[]string{"grpc_service", "grpc_method", "grpc_stats"},
)
}
m.clientMsgSizeSentHistogramEnabled = true
}

// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down Expand Up @@ -202,6 +270,13 @@ func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc
}
}

// NewClientStatsHandler is a gRPC client-side stats.Handler that providers Prometheus monitoring for RPCs.
func (m *ClientMetrics) NewClientStatsHandler() stats.Handler {
return &clientStatsHandler{
clientMetrics: m,
}
}

func clientStreamType(desc *grpc.StreamDesc) grpcType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
Expand Down
32 changes: 32 additions & 0 deletions client_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *c
return r
}

func newClientReporterForStatsHanlder(m *ClientMetrics, fullMethod string) *clientReporter {
r := &clientReporter{
metrics: m,
rpcType: Unary,
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
return r
}

// timer is a helper interface to time functions.
type timer interface {
ObserveDuration() time.Duration
Expand All @@ -54,10 +63,21 @@ func (r *clientReporter) ReceiveMessageTimer() timer {
return emptyTimer
}

func (r *clientReporter) StartedConn() {
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

func (r *clientReporter) ReceivedMessage() {
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

// ReceivedMessageSize counts the size of received messages on client-side
func (r *clientReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) {
if r.metrics.clientMsgSizeReceivedHistogramEnabled {
r.metrics.clientMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
}
}

func (r *clientReporter) SendMessageTimer() timer {
if r.metrics.clientStreamSendHistogramEnabled {
hist := r.metrics.clientStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
Expand All @@ -71,6 +91,18 @@ func (r *clientReporter) SentMessage() {
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

// SentMessageSize counts the size of sent messages on client-side
func (r *clientReporter) SentMessageSize(rpcStats grpcStats, size float64) {
if r.metrics.clientMsgSizeSentHistogramEnabled {
r.metrics.clientMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
}
}

// StartTime is used to reset the value of the startTime
func (r *clientReporter) StartTime(t time.Time) {
r.startTime = t
}

func (r *clientReporter) Handled(code codes.Code) {
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.clientHandledHistogramEnabled {
Expand Down
61 changes: 61 additions & 0 deletions client_stats_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package grpc_prometheus

import (
"context"

"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

type clientStatsHandler struct {
clientMetrics *ClientMetrics
}

// TagRPC implements the stats.Hanlder interface.
func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
rpcInfo := newRPCInfo(info.FullMethodName)
return context.WithValue(ctx, &rpcInfoKey, rpcInfo)
}

// HandleRPC implements the stats.Hanlder interface.
func (h *clientStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo)
if !ok {
return
}
monitor := newClientReporterForStatsHanlder(h.clientMetrics, v.fullMethodName)
switch s := s.(type) {
case *stats.Begin:
v.startTime = s.BeginTime
monitor.StartedConn()
case *stats.End:
code := status.Code(s.Error)
monitor.StartTime(v.startTime)
monitor.Handled(code)
case *stats.InHeader:
monitor.ReceivedMessageSize(Header, float64(s.WireLength))
case *stats.InPayload:
monitor.ReceivedMessage()
// TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset
// See: https://github.com/grpc/grpc-go/issues/1647
monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5))
case *stats.InTrailer:
monitor.ReceivedMessageSize(Tailer, float64(s.WireLength))
case *stats.OutHeader:
// TODO: Add the sent header message size stats, if the wire length of the send header is provided
case *stats.OutPayload:
monitor.SentMessage()
monitor.SentMessageSize(Payload, float64(s.WireLength))
case *stats.OutTrailer:
monitor.SentMessageSize(Tailer, float64(s.WireLength))
}
}

// TagConn implements the stats.Hanlder interface.
func (h *clientStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}

// HandleConn implements the stats.Hanlder interface.
func (h *clientStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
}
Loading

0 comments on commit eba288c

Please sign in to comment.