Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: Various CSM Observability bug fixes #7278

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,33 @@ import (
)

type clientStatsHandler struct {
o Options
options Options

clientMetrics clientMetrics
}

func (h *clientStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
if h.o.MetricsOptions.MeterProvider == nil {
if h.options.MetricsOptions.MeterProvider == nil {
return
}

meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version))
if meter == nil {
return
}

setOfMetrics := h.o.MetricsOptions.Metrics.metrics
dfawley marked this conversation as resolved.
Show resolved Hide resolved
metrics := h.options.MetricsOptions.Metrics
if metrics == nil {
metrics = DefaultMetrics
}

h.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptStarted = createInt64Counter(metrics.metrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.metrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(metrics.metrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -65,8 +68,8 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}
ctx = setCallInfo(ctx, ci)

if h.o.MetricsOptions.pluginOption != nil {
md := h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
Expand All @@ -85,7 +88,7 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
// otherwise.
func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
target := cc.CanonicalTarget()
if f := h.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
if f := h.options.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
target = "other"
}
return target
Expand All @@ -110,8 +113,8 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}
ctx = setCallInfo(ctx, ci)

if h.o.MetricsOptions.pluginOption != nil {
md := h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
md := h.options.MetricsOptions.pluginOption.GetMetadata()
for k, vs := range md {
for _, v := range vs {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
Expand Down Expand Up @@ -200,8 +203,8 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta
}

func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil {
labels := h.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
labels := h.options.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
Expand Down Expand Up @@ -232,7 +235,7 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
attributes = append(attributes, otelattribute.String(k, v))
}

for _, o := range h.o.MetricsOptions.OptionalLabels {
for _, o := range h.options.MetricsOptions.OptionalLabels {
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
Expand Down
4 changes: 2 additions & 2 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type MetricsOptions struct {
// configured for an individual metric turned on, the API call in this component
// will create a default view for that metric.
func DialOption(o Options) grpc.DialOption {
csh := &clientStatsHandler{o: o}
csh := &clientStatsHandler{options: o}
csh.initializeMetrics()
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
}
Expand All @@ -169,7 +169,7 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g
// configured for an individual metric turned on, the API call in this component
// will create a default view for that metric.
func ServerOption(o Options) grpc.ServerOption {
ssh := &serverStatsHandler{o: o}
ssh := &serverStatsHandler{options: o}
ssh.initializeMetrics()
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh))
}
Expand Down
35 changes: 19 additions & 16 deletions stats/opentelemetry/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,31 @@ import (
)

type serverStatsHandler struct {
o Options
options Options

serverMetrics serverMetrics
}

func (h *serverStatsHandler) initializeMetrics() {
// Will set no metrics to record, logically making this stats handler a
// no-op.
if h.o.MetricsOptions.MeterProvider == nil {
if h.options.MetricsOptions.MeterProvider == nil {
return
}

meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version))
if meter == nil {
return
}
setOfMetrics := h.o.MetricsOptions.Metrics.metrics
metrics := h.options.MetricsOptions.Metrics
if metrics == nil {
metrics = DefaultMetrics
}

h.serverMetrics.callStarted = createInt64Counter(setOfMetrics, "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started."))
h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.serverMetrics.callStarted = createInt64Counter(metrics.metrics, "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started."))
h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.metrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.serverMetrics.callDuration = createFloat64Histogram(metrics.metrics, "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

// attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the
Expand Down Expand Up @@ -82,8 +85,8 @@ func (s *attachLabelsTransportStream) SendHeader(md metadata.MD) error {

func (h *serverStatsHandler) unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
var metadataExchangeLabels metadata.MD
if h.o.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.options.MetricsOptions.pluginOption.GetMetadata()
}

sts := grpc.ServerTransportStreamFromContext(ctx)
Expand Down Expand Up @@ -143,8 +146,8 @@ func (s *attachLabelsStream) SendMsg(m any) error {

func (h *serverStatsHandler) streamInterceptor(srv any, ss grpc.ServerStream, ssi *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
var metadataExchangeLabels metadata.MD
if h.o.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.o.MetricsOptions.pluginOption.GetMetadata()
if h.options.MetricsOptions.pluginOption != nil {
metadataExchangeLabels = h.options.MetricsOptions.pluginOption.GetMetadata()
}
als := &attachLabelsStream{
ServerStream: ss,
Expand All @@ -171,8 +174,8 @@ func (h *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
// TagRPC implements per RPC context management.
func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
method := info.FullMethodName
if h.o.MetricsOptions.MethodAttributeFilter != nil {
if !h.o.MetricsOptions.MethodAttributeFilter(method) {
if h.options.MetricsOptions.MethodAttributeFilter != nil {
if !h.options.MetricsOptions.MethodAttributeFilter(method) {
method = "other"
}
}
Expand Down Expand Up @@ -210,8 +213,8 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) {
case *stats.InHeader:
if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil {
labels := h.o.MetricsOptions.pluginOption.GetLabels(st.Header)
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
labels := h.options.MetricsOptions.pluginOption.GetLabels(st.Header)
if labels == nil {
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
}
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
const serviceNameKey = "service_name"
const serviceNameKeyCSM = "csm.service_name"
const serviceNamespaceKey = "service_namespace"
const serviceNamespaceKeyCSM = "csm.service_namespace"
const serviceNamespaceKeyCSM = "csm.service_namespace_name"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serv
}
if val, ok := fields["service_namespace"]; ok {
if _, ok := val.GetKind().(*structpb.Value_StringValue); ok {
telemetryLabels["csm.service_namespace"] = val.GetStringValue()
telemetryLabels["csm.service_namespace_name"] = val.GetStringValue()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,8 @@ func (s) TestUnmarshalCluster(t *testing.T) {
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
Raw: v3ClusterAnyWithTelemetryLabels,
TelemetryLabels: map[string]string{
"csm.service_name": "grpc-service",
"csm.service_namespace": "grpc-service-namespace",
"csm.service_name": "grpc-service",
"csm.service_namespace_name": "grpc-service-namespace",
},
},
},
Expand Down
Loading