Skip to content

Commit

Permalink
otelgrpc: add custom attributes to the stats handler
Browse files Browse the repository at this point in the history
Fixes #3894
  • Loading branch information
inigohu committed Feb 20, 2024
1 parent 65f3667 commit ce0e777
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 70 deletions.
58 changes: 58 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// traceInfo contains tracing information for an RPC.
type traceInfo struct {
name string
kind trace.SpanKind
}

// gRPCContext contains all the information needed to record metrics and traces.
type gRPCContext struct {
traceInfo *traceInfo
attrs []attribute.KeyValue
}

// Add attributes to a gRPCContext.
func (g *gRPCContext) AddAttrs(attrs ...attribute.KeyValue) {
g.attrs = append(g.attrs, attrs...)
}

type gRPCContextKey struct{}

// ContextWithGRPCContext returns a new context with the provided gRPCContext attached.
func ContextWithGRPCContext(ctx context.Context, gctx *gRPCContext) context.Context {
return context.WithValue(ctx, gRPCContextKey{}, gctx)
}

// GRPCContextFromContext retrieves a GRPCContext instance from the provided context if
// one is available. If no GRPCContext was found in the provided context a new, empty
// GRPCContext is returned and the second return value is false. In this case it is
// safe to use the GRPCContext but any attributes added to it will not be used.
func GRPCContextFromContext(ctx context.Context) (*gRPCContext, bool) { // nolint: revive
l, ok := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if !ok {
l = &gRPCContext{}
}
return l, ok
}
119 changes: 49 additions & 70 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,12 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
}

type serverHandler struct {
*config
}
Expand All @@ -66,25 +57,17 @@ func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
ctx = extract(ctx, h.config.Propagators)

name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)
gctx, _ := GRPCContextFromContext(ctx)
gctx.traceInfo.kind = trace.SpanKindServer

gctx := gRPCContext{
metricAttrs: attrs,
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
ctx = h.tagRPC(ctx, info)

return ctx
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := true
h.handleRPC(ctx, rs, isServer)
h.handleRPC(ctx, rs)
}

type clientHandler struct {
Expand All @@ -102,26 +85,17 @@ func NewClientHandler(opts ...Option) stats.Handler {

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
)
gctx, _ := GRPCContextFromContext(ctx)
gctx.traceInfo.kind = trace.SpanKindClient

gctx := gRPCContext{
metricAttrs: attrs,
}
ctx = h.tagRPC(ContextWithGRPCContext(ctx, gctx), info)

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
return inject(ctx, h.config.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := false
h.handleRPC(ctx, rs, isServer)
h.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -134,46 +108,54 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) { // nolint: revive // isServer is not a control flag.
span := trace.SpanFromContext(ctx)
var metricAttrs []attribute.KeyValue
var messageId int64
func (c *config) tagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)

gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if gctx != nil {
metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
gctx, _ := GRPCContextFromContext(ctx)
gctx.traceInfo.name = name
gctx.attrs = append(gctx.attrs, attrs...)

return ContextWithGRPCContext(ctx, gctx)
}

func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
gctx, _ := GRPCContextFromContext(ctx)
if gctx.traceInfo.kind == trace.SpanKindServer {
ctx = trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx))
}

ctx, span := c.tracer.Start(
ctx,
gctx.traceInfo.name,
trace.WithSpanKind(gctx.traceInfo.kind),
)

// access these counts atomically for hedging in the future
// number of messages sent from side (client || server)
var sentMsgs int64
// number of messages received on side (client || server)
var recvMsgs int64

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

if c.ReceivedEvent {
span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
semconv.MessageIDKey.Int64(messageId),
semconv.MessageIDKey.Int64(atomic.AddInt64(&recvMsgs, 1)),
semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
semconv.MessageUncompressedSizeKey.Int(rs.Length),
),
)
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

if c.SentEvent {
span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeSent,
semconv.MessageIDKey.Int64(messageId),
semconv.MessageIDKey.Int64(atomic.AddInt64(&sentMsgs, 1)),
semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
semconv.MessageUncompressedSizeKey.Int(rs.Length),
),
Expand All @@ -185,33 +167,30 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
span.SetAttributes(peerAttr(p.Addr.String())...)
}
case *stats.End:
var rpcStatusAttr attribute.KeyValue

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
if isServer {
if gctx.traceInfo.kind == trace.SpanKindServer {
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
} else {
span.SetStatus(codes.Error, s.Message())
}
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
gctx.AddAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())))
} else {
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
gctx.AddAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)))
}
span.SetAttributes(rpcStatusAttr)
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
span.SetAttributes(gctx.attrs...)
span.End()

// Use floating point division here for higher precision (instead of Millisecond method).
elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)

c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...))
}
c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(gctx.attrs...))
c.rpcRequestSize.Record(ctx, atomic.LoadInt64(&recvMsgs), metric.WithAttributes(gctx.attrs...))
c.rpcResponseSize.Record(ctx, atomic.LoadInt64(&sentMsgs), metric.WithAttributes(gctx.attrs...))
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&recvMsgs), metric.WithAttributes(gctx.attrs...))
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&sentMsgs), metric.WithAttributes(gctx.attrs...))
default:
return
}
Expand Down

0 comments on commit ce0e777

Please sign in to comment.