Skip to content

Commit

Permalink
stats: Various CSM Observability bug fixes (#7278)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored May 29, 2024
1 parent fe82db4 commit 0756c0d
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 39 deletions.
37 changes: 20 additions & 17 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,33 @@ import (
)

type clientStatsHandler struct {
o Options
options Options

clientMetrics clientMetrics
}

func (h *clientStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
if h.o.MetricsOptions.MeterProvider == nil {
if h.options.MetricsOptions.MeterProvider == nil {
return
}

meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version))
if meter == nil {
return
}

setOfMetrics := h.o.MetricsOptions.Metrics.metrics
metrics := h.options.MetricsOptions.Metrics
if metrics == nil {
metrics = DefaultMetrics
}

h.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptStarted = createInt64Counter(metrics.metrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.metrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(metrics.metrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -65,8 +68,8 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}
ctx = setCallInfo(ctx, ci)

if h.o.MetricsOptions.pluginOption != nil {
md := h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
Expand All @@ -85,7 +88,7 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
// otherwise.
func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
target := cc.CanonicalTarget()
if f := h.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
if f := h.options.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
target = "other"
}
return target
Expand All @@ -110,8 +113,8 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}
ctx = setCallInfo(ctx, ci)

if h.o.MetricsOptions.pluginOption != nil {
md := h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
Expand Down Expand Up @@ -200,8 +203,8 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta
}

func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil {
labels := h.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
labels := h.options.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
Expand Down Expand Up @@ -232,7 +235,7 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
attributes = append(attributes, otelattribute.String(k, v))
}

for _, o := range h.o.MetricsOptions.OptionalLabels {
for _, o := range h.options.MetricsOptions.OptionalLabels {
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
Expand Down
4 changes: 2 additions & 2 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type MetricsOptions struct {
// configured for an individual metric turned on, the API call in this component
// will create a default view for that metric.
func DialOption(o Options) grpc.DialOption {
csh := &clientStatsHandler{o: o}
csh := &clientStatsHandler{options: o}
csh.initializeMetrics()
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
}
Expand All @@ -175,7 +175,7 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g
// configured for an individual metric turned on, the API call in this component
// will create a default view for that metric.
func ServerOption(o Options) grpc.ServerOption {
ssh := &serverStatsHandler{o: o}
ssh := &serverStatsHandler{options: o}
ssh.initializeMetrics()
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh))
}
Expand Down
35 changes: 19 additions & 16 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,31 @@ import (
)

type serverStatsHandler struct {
o Options
options Options

serverMetrics serverMetrics
}

func (h *serverStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
if h.o.MetricsOptions.MeterProvider == nil {
if h.options.MetricsOptions.MeterProvider == nil {
return
}

meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version))
if meter == nil {
return
}
setOfMetrics := h.o.MetricsOptions.Metrics.metrics
metrics := h.options.MetricsOptions.Metrics
if metrics == nil {
metrics = DefaultMetrics
}

h.serverMetrics.callStarted = createInt64Counter(setOfMetrics, "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started."))
h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.serverMetrics.callStarted = createInt64Counter(metrics.metrics, "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started."))
h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callDuration = createFloat64Histogram(metrics.metrics, "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

// attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the
Expand Down Expand Up @@ -82,8 +85,8 @@ func (s *attachLabelsTransportStream) SendHeader(md metadata.MD) error {

func (h *serverStatsHandler) unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
var metadataExchangeLabels metadata.MD
if h.o.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.options.MetricsOptions.pluginOption.GetMetadata()
}

sts := grpc.ServerTransportStreamFromContext(ctx)
Expand Down Expand Up @@ -143,8 +146,8 @@ func (s *attachLabelsStream) SendMsg(m any) error {

func (h *serverStatsHandler) streamInterceptor(srv any, ss grpc.ServerStream, ssi *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
var metadataExchangeLabels metadata.MD
if h.o.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.options.MetricsOptions.pluginOption.GetMetadata()
}
als := &attachLabelsStream{
ServerStream: ss,
Expand All @@ -171,8 +174,8 @@ func (h *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
// TagRPC implements per RPC context management.
func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
method := info.FullMethodName
if h.o.MetricsOptions.MethodAttributeFilter != nil {
if !h.o.MetricsOptions.MethodAttributeFilter(method) {
if h.options.MetricsOptions.MethodAttributeFilter != nil {
if !h.options.MetricsOptions.MethodAttributeFilter(method) {
method = "other"
}
}
Expand Down Expand Up @@ -210,8 +213,8 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.InHeader:
if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil {
labels := h.o.MetricsOptions.pluginOption.GetLabels(st.Header)
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
labels := h.options.MetricsOptions.pluginOption.GetLabels(st.Header)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
const serviceNameKey = "service_name"
const serviceNameKeyCSM = "csm.service_name"
const serviceNamespaceKey = "service_namespace"
const serviceNamespaceKeyCSM = "csm.service_namespace"
const serviceNamespaceKeyCSM = "csm.service_namespace_name"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serv
}
if val, ok := fields["service_namespace"]; ok {
if _, ok := val.GetKind().(*structpb.Value_StringValue); ok {
telemetryLabels["csm.service_namespace"] = val.GetStringValue()
telemetryLabels["csm.service_namespace_name"] = val.GetStringValue()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,8 @@ func (s) TestUnmarshalCluster(t *testing.T) {
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
Raw: v3ClusterAnyWithTelemetryLabels,
TelemetryLabels: map[string]string{
"csm.service_name": "grpc-service",
"csm.service_namespace": "grpc-service-namespace",
"csm.service_name": "grpc-service",
"csm.service_namespace_name": "grpc-service-namespace",
},
},
},
Expand Down

0 comments on commit 0756c0d

Please sign in to comment.