From 0756c0d67ebeaccc7320e1f86955fd0071311a3f Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Wed, 29 May 2024 19:23:32 -0400 Subject: [PATCH] stats: Various CSM Observability bug fixes (#7278) --- stats/opentelemetry/client_metrics.go | 37 ++++++++++--------- stats/opentelemetry/opentelemetry.go | 4 +- stats/opentelemetry/server_metrics.go | 35 ++++++++++-------- test/xds/xds_telemetry_labels_test.go | 2 +- .../xdsclient/xdsresource/unmarshal_cds.go | 2 +- .../xdsresource/unmarshal_cds_test.go | 4 +- 6 files changed, 45 insertions(+), 39 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 05d2d8cac799..cbc4e36fdcbe 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -32,7 +32,7 @@ import ( ) type clientStatsHandler struct { - o Options + options Options clientMetrics clientMetrics } @@ -40,22 +40,25 @@ type clientStatsHandler struct { func (h *clientStatsHandler) initializeMetrics() { // Will set no metrics to record, logically making this stats handler a // no-op. - if h.o.MetricsOptions.MeterProvider == nil { + if h.options.MetricsOptions.MeterProvider == nil { return } - meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version) + meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version)) if meter == nil { return } - setOfMetrics := h.o.MetricsOptions.Metrics.metrics + metrics := h.options.MetricsOptions.Metrics + if metrics == nil { + metrics = DefaultMetrics + } - h.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) - h.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) - h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.clientMetrics.attemptStarted = createInt64Counter(metrics.metrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) + h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.metrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.clientMetrics.callDuration = createFloat64Histogram(metrics.metrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) } func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -65,8 +68,8 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string } ctx = setCallInfo(ctx, ci) - if h.o.MetricsOptions.pluginOption != nil { - md := h.o.MetricsOptions.pluginOption.GetMetadata() + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() for k, vs := range md { for _, v := range vs { ctx = metadata.AppendToOutgoingContext(ctx, k, v) @@ -85,7 +88,7 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string // otherwise. func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string { target := cc.CanonicalTarget() - if f := h.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) { + if f := h.options.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) { target = "other" } return target @@ -110,8 +113,8 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } ctx = setCallInfo(ctx, ci) - if h.o.MetricsOptions.pluginOption != nil { - md := h.o.MetricsOptions.pluginOption.GetMetadata() + if h.options.MetricsOptions.pluginOption != nil { + md := h.options.MetricsOptions.pluginOption.GetMetadata() for k, vs := range md { for _, v := range vs { ctx = metadata.AppendToOutgoingContext(ctx, k, v) @@ -200,8 +203,8 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta } func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) { - if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil { - labels := h.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata) + if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil { + labels := h.options.MetricsOptions.pluginOption.GetLabels(incomingMetadata) if labels == nil { labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt. } @@ -232,7 +235,7 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, attributes = append(attributes, otelattribute.String(k, v)) } - for _, o := range h.o.MetricsOptions.OptionalLabels { + for _, o := range h.options.MetricsOptions.OptionalLabels { if val, ok := ai.xdsLabels[o]; ok { attributes = append(attributes, otelattribute.String(o, val)) } diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index 7aed64999929..6a6bcd1627da 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -155,7 +155,7 @@ type MetricsOptions struct { // configured for an individual metric turned on, the API call in this component // will create a default view for that metric. func DialOption(o Options) grpc.DialOption { - csh := &clientStatsHandler{o: o} + csh := &clientStatsHandler{options: o} csh.initializeMetrics() return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) } @@ -175,7 +175,7 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g // configured for an individual metric turned on, the API call in this component // will create a default view for that metric. func ServerOption(o Options) grpc.ServerOption { - ssh := &serverStatsHandler{o: o} + ssh := &serverStatsHandler{options: o} ssh.initializeMetrics() return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh)) } diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index b10c97cc6635..a20d6a261fc6 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -32,7 +32,7 @@ import ( ) type serverStatsHandler struct { - o Options + options Options serverMetrics serverMetrics } @@ -40,20 +40,23 @@ type serverStatsHandler struct { func (h *serverStatsHandler) initializeMetrics() { // Will set no metrics to record, logically making this stats handler a // no-op. - if h.o.MetricsOptions.MeterProvider == nil { + if h.options.MetricsOptions.MeterProvider == nil { return } - meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version) + meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version)) if meter == nil { return } - setOfMetrics := h.o.MetricsOptions.Metrics.metrics + metrics := h.options.MetricsOptions.Metrics + if metrics == nil { + metrics = DefaultMetrics + } - h.serverMetrics.callStarted = createInt64Counter(setOfMetrics, "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started.")) - h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) - h.serverMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + h.serverMetrics.callStarted = createInt64Counter(metrics.metrics, "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started.")) + h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + h.serverMetrics.callDuration = createFloat64Histogram(metrics.metrics, "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) } // attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the @@ -82,8 +85,8 @@ func (s *attachLabelsTransportStream) SendHeader(md metadata.MD) error { func (h *serverStatsHandler) unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { var metadataExchangeLabels metadata.MD - if h.o.MetricsOptions.pluginOption != nil { - metadataExchangeLabels = h.o.MetricsOptions.pluginOption.GetMetadata() + if h.options.MetricsOptions.pluginOption != nil { + metadataExchangeLabels = h.options.MetricsOptions.pluginOption.GetMetadata() } sts := grpc.ServerTransportStreamFromContext(ctx) @@ -143,8 +146,8 @@ func (s *attachLabelsStream) SendMsg(m any) error { func (h *serverStatsHandler) streamInterceptor(srv any, ss grpc.ServerStream, ssi *grpc.StreamServerInfo, handler grpc.StreamHandler) error { var metadataExchangeLabels metadata.MD - if h.o.MetricsOptions.pluginOption != nil { - metadataExchangeLabels = h.o.MetricsOptions.pluginOption.GetMetadata() + if h.options.MetricsOptions.pluginOption != nil { + metadataExchangeLabels = h.options.MetricsOptions.pluginOption.GetMetadata() } als := &attachLabelsStream{ ServerStream: ss, @@ -171,8 +174,8 @@ func (h *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {} // TagRPC implements per RPC context management. func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { method := info.FullMethodName - if h.o.MetricsOptions.MethodAttributeFilter != nil { - if !h.o.MetricsOptions.MethodAttributeFilter(method) { + if h.options.MetricsOptions.MethodAttributeFilter != nil { + if !h.options.MetricsOptions.MethodAttributeFilter(method) { method = "other" } } @@ -210,8 +213,8 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { switch st := s.(type) { case *stats.InHeader: - if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil { - labels := h.o.MetricsOptions.pluginOption.GetLabels(st.Header) + if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil { + labels := h.options.MetricsOptions.pluginOption.GetLabels(st.Header) if labels == nil { labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt. } diff --git a/test/xds/xds_telemetry_labels_test.go b/test/xds/xds_telemetry_labels_test.go index 11858d8fbce4..d67f260f5401 100644 --- a/test/xds/xds_telemetry_labels_test.go +++ b/test/xds/xds_telemetry_labels_test.go @@ -39,7 +39,7 @@ import ( const serviceNameKey = "service_name" const serviceNameKeyCSM = "csm.service_name" const serviceNamespaceKey = "service_namespace" -const serviceNamespaceKeyCSM = "csm.service_namespace" +const serviceNamespaceKeyCSM = "csm.service_namespace_name" const serviceNameValue = "grpc-service" const serviceNamespaceValue = "grpc-service-namespace" diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go index 58ca9d25c2a7..9eaef4d9b4f8 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go @@ -94,7 +94,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serv } if val, ok := fields["service_namespace"]; ok { if _, ok := val.GetKind().(*structpb.Value_StringValue); ok { - telemetryLabels["csm.service_namespace"] = val.GetStringValue() + telemetryLabels["csm.service_namespace_name"] = val.GetStringValue() } } } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go index 10af6eab7381..ce7846ce5aac 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go @@ -1363,8 +1363,8 @@ func (s) TestUnmarshalCluster(t *testing.T) { LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, Raw: v3ClusterAnyWithTelemetryLabels, TelemetryLabels: map[string]string{ - "csm.service_name": "grpc-service", - "csm.service_namespace": "grpc-service-namespace", + "csm.service_name": "grpc-service", + "csm.service_namespace_name": "grpc-service-namespace", }, }, },