diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index ca78253e07e5..02852b2abe7c 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -22,11 +22,13 @@ import ( "time" "google.golang.org/grpc" + istats "google.golang.org/grpc/internal/stats" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" + otelattribute "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" ) type clientStatsHandler struct { @@ -49,11 +51,11 @@ func (csh *clientStatsHandler) initializeMetrics() { setOfMetrics := csh.o.MetricsOptions.Metrics.metrics - csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started.")) - csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) - csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) + csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) } func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -63,6 +65,15 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri } ctx = setCallInfo(ctx, ci) + if csh.o.MetricsOptions.pluginOption != nil { + md := csh.o.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } + } + } + startTime := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) csh.perCallMetrics(ctx, err, startTime, ci) @@ -98,6 +109,16 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc method: csh.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) + + if csh.o.MetricsOptions.pluginOption != nil { + md := csh.o.MetricsOptions.pluginOption.GetMetadata() + for k, vs := range md { + for _, v := range vs { + ctx = metadata.AppendToOutgoingContext(ctx, k, v) + } + } + } + startTime := time.Now() callback := func(err error) { @@ -110,7 +131,7 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { s := status.Convert(err) callLatency := float64(time.Since(startTime)) / float64(time.Second) - csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code())))) + csh.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code())))) } // TagConn exists to satisfy stats.Handler. @@ -123,11 +144,24 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {} // TagRPC implements per RPC attempt context management. func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - mi := &metricsInfo{ // populates information about RPC start. + // Numerous stats handlers can be used for the same channel. The cluster + // impl balancer which writes to this will only write once, thus have this + // stats handler's per attempt scoped context point to the same optional + // labels map if set. + var labels *istats.Labels + if labels = istats.GetLabels(ctx); labels == nil { + labels = &istats.Labels{ + TelemetryLabels: make(map[string]string), + } + ctx = istats.SetLabels(ctx, labels) + } + ai := &attemptInfo{ // populates information about RPC start. startTime: time.Now(), + xdsLabels: labels.TelemetryLabels, + method: info.FullMethodName, } ri := &rpcInfo{ - mi: mi, + ai: ai, } return setRPCInfo(ctx, ri) } @@ -138,10 +172,10 @@ func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") return } - csh.processRPCEvent(ctx, rs, ri.mi) + csh.processRPCEvent(ctx, rs, ri.ai) } -func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, mi *metricsInfo) { +func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { switch st := s.(type) { case *stats.Begin: ci := getCallInfo(ctx) @@ -150,34 +184,64 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS return } - csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target))) + csh.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target))) case *stats.OutPayload: - atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength)) + atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: - atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength)) + atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength)) + case *stats.InHeader: + csh.setLabelsFromPluginOption(ai, st.Header) + case *stats.InTrailer: + csh.setLabelsFromPluginOption(ai, st.Trailer) case *stats.End: - csh.processRPCEnd(ctx, mi, st) + csh.processRPCEnd(ctx, ai, st) default: } } -func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) { +func (csh *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) { + if ai.pluginOptionLabels == nil && csh.o.MetricsOptions.pluginOption != nil { + labels := csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata) + if labels == nil { + labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt. + } + ai.pluginOptionLabels = labels + } +} + +func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) { ci := getCallInfo(ctx) if ci == nil { logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present") return } - latency := float64(time.Since(mi.startTime)) / float64(time.Second) + latency := float64(time.Since(ai.startTime)) / float64(time.Second) st := "OK" if e.Error != nil { s, _ := status.FromError(e.Error) st = canonicalString(s.Code()) } - clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st)) + attributes := []otelattribute.KeyValue{ + otelattribute.String("grpc.method", ci.method), + otelattribute.String("grpc.target", ci.target), + otelattribute.String("grpc.status", st), + } + + for k, v := range ai.pluginOptionLabels { + attributes = append(attributes, otelattribute.String(k, v)) + } + + for _, o := range csh.o.MetricsOptions.OptionalLabels { + if val, ok := ai.xdsLabels[o]; ok { + attributes = append(attributes, otelattribute.String(o, val)) + } + } + + clientAttributeOption := otelmetric.WithAttributes(attributes...) csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption) - csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption) - csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption) + csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption) + csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption) } const ( diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index 07fbac3cd929..bd50b12d1056 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" + otelinternal "google.golang.org/grpc/stats/opentelemetry/internal" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" @@ -126,6 +127,13 @@ type MetricsOptions struct { // grpc.StaticMethodCallOption as a call option into Invoke or NewStream. // This only applies for server side metrics. MethodAttributeFilter func(string) bool + + // OptionalLabels are labels received from LB Policies that this component + // should add to metrics that record after receiving incoming metadata. + OptionalLabels []string + + // pluginOption is used to get labels to attach to certain metrics, if set. + pluginOption otelinternal.PluginOption } // DialOption returns a dial option which enables OpenTelemetry instrumentation @@ -187,7 +195,7 @@ func getCallInfo(ctx context.Context) *callInfo { // rpcInfo is RPC information scoped to the RPC attempt life span client side, // and the RPC life span server side. type rpcInfo struct { - mi *metricsInfo + ai *attemptInfo } type rpcInfoKey struct{} @@ -207,9 +215,9 @@ func removeLeadingSlash(mn string) string { return strings.TrimLeft(mn, "/") } -// metricsInfo is RPC information scoped to the RPC attempt life span client +// attemptInfo is RPC information scoped to the RPC attempt life span client // side, and the RPC life span server side. -type metricsInfo struct { +type attemptInfo struct { // access these counts atomically for hedging in the future: // number of bytes after compression (within each message) from side (client // || server). @@ -220,6 +228,9 @@ type metricsInfo struct { startTime time.Time method string + + pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted + xdsLabels map[string]string } type clientMetrics struct { diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index 13e8dfc94340..1507e1cfd20e 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -82,12 +82,12 @@ func (ssh *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf } } - mi := &metricsInfo{ + ai := &attemptInfo{ startTime: time.Now(), method: removeLeadingSlash(method), } ri := &rpcInfo{ - mi: mi, + ai: ai, } return setRPCInfo(ctx, ri) } @@ -99,35 +99,35 @@ func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present") return } - ssh.processRPCData(ctx, rs, ri.mi) + ssh.processRPCData(ctx, rs, ri.ai) } -func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) { +func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { switch st := s.(type) { case *stats.InHeader: - ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", mi.method))) + ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ai.method))) case *stats.OutPayload: - atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength)) + atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) case *stats.InPayload: - atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength)) + atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength)) case *stats.End: - ssh.processRPCEnd(ctx, mi, st) + ssh.processRPCEnd(ctx, ai, st) default: } } -func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) { - latency := float64(time.Since(mi.startTime)) / float64(time.Second) +func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) { + latency := float64(time.Since(ai.startTime)) / float64(time.Second) st := "OK" if e.Error != nil { s, _ := status.FromError(e.Error) st = canonicalString(s.Code()) } - serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", mi.method), attribute.String("grpc.status", st)) + serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ai.method), attribute.String("grpc.status", st)) ssh.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption) - ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), serverAttributeOption) - ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), serverAttributeOption) + ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), serverAttributeOption) + ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), serverAttributeOption) } const (