From cbdeb71fd3130c91d760f857c2f459fbd8138f2c Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Wed, 24 Jul 2024 20:06:57 -0400 Subject: [PATCH] stats: Add optional locality label in cluster_impl picker (#7434) --- stats/opentelemetry/client_metrics.go | 9 ++++++- stats/opentelemetry/csm/observability.go | 4 +++ stats/opentelemetry/csm/observability_test.go | 16 ++++++++---- test/xds/xds_telemetry_labels_test.go | 19 ++++++++------ xds/internal/balancer/clusterimpl/picker.go | 25 +++++++++++++++---- 5 files changed, 55 insertions(+), 18 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 3fbae43bf11b..d5b9aa46b1ee 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -150,7 +150,12 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) var labels *istats.Labels if labels = istats.GetLabels(ctx); labels == nil { labels = &istats.Labels{ - TelemetryLabels: make(map[string]string), + // The defaults for all the per call labels from a plugin that + // executes on the callpath that this OpenTelemetry component + // currently supports. + TelemetryLabels: map[string]string{ + "grpc.lb.locality": "", + }, } ctx = istats.SetLabels(ctx, labels) } @@ -232,6 +237,8 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, } for _, o := range h.options.MetricsOptions.OptionalLabels { + // TODO: Add a filter for converting to unknown if not present in the + // CSM Plugin Option layer by adding an optional labels API. if val, ok := ai.xdsLabels[o]; ok { attributes = append(attributes, otelattribute.String(o, val)) } diff --git a/stats/opentelemetry/csm/observability.go b/stats/opentelemetry/csm/observability.go index 466e7b52cd96..4db5d215ee94 100644 --- a/stats/opentelemetry/csm/observability.go +++ b/stats/opentelemetry/csm/observability.go @@ -70,6 +70,10 @@ func (o *perTargetDialOption) DialOptionForTarget(parsedTarget url.URL) grpc.Dia func dialOptionWithCSMPluginOption(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption { options.MetricsOptions.OptionalLabels = []string{"csm.service_name", "csm.service_namespace_name"} // Attach the two xDS Optional Labels for this component to not filter out. + return dialOptionSetCSM(options, po) +} + +func dialOptionSetCSM(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption { otelinternal.SetPluginOption.(func(options *opentelemetry.Options, po otelinternal.PluginOption))(&options, po) return opentelemetry.DialOption(options) } diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index d2b2884c20a2..7bc1e3cee565 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -425,9 +425,12 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = istats.SetLabels(ctx, &istats.Labels{ TelemetryLabels: map[string]string{ - // mock what the cluster impl would write here ("csm." xDS Labels) + // mock what the cluster impl would write here ("csm." xDS Labels + // and locality label) "csm.service_name": "service_name_val", "csm.service_namespace_name": "service_namespace_val", + + "grpc.lb.locality": "grpc.lb.locality_val", }, }) @@ -441,8 +444,9 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re // Optional Labels turned on. It then configures an interceptor to attach // labels, representing the cluster_impl picker. It then makes a unary RPC, and // expects xDS Labels labels to be attached to emitted relevant metrics. Full -// xDS System alongside OpenTelemetry will be tested with interop. (there is -// a test for xDS -> Stats handler and this tests -> OTel -> emission). +// xDS System alongside OpenTelemetry will be tested with interop. (there is a +// test for xDS -> Stats handler and this tests -> OTel -> emission). It also +// tests the optional per call locality label in the same manner. func (s) TestXDSLabels(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -457,11 +461,11 @@ func (s) TestXDSLabels(t *testing.T) { } po := newPluginOption(ctx) - dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + dopts := []grpc.DialOption{dialOptionSetCSM(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics(), - OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"}, }, }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)} if err := ss.Start(nil, dopts...); err != nil { @@ -489,6 +493,7 @@ func (s) TestXDSLabels(t *testing.T) { serviceNameAttr := attribute.String("csm.service_name", "service_name_val") serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val") + localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val") meshIDAttr := attribute.String("csm.mesh_id", "unknown") workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown") remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown") @@ -500,6 +505,7 @@ func (s) TestXDSLabels(t *testing.T) { unaryStatusAttr, serviceNameAttr, serviceNamespaceAttr, + localityAttr, meshIDAttr, workloadCanonicalServiceAttr, remoteWorkloadTypeAttr, diff --git a/test/xds/xds_telemetry_labels_test.go b/test/xds/xds_telemetry_labels_test.go index 544b6878a6e1..9068273628ee 100644 --- a/test/xds/xds_telemetry_labels_test.go +++ b/test/xds/xds_telemetry_labels_test.go @@ -28,11 +28,12 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/stats" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - testgrpc "google.golang.org/grpc/interop/grpc_testing" - testpb "google.golang.org/grpc/interop/grpc_testing" + "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/types/known/structpb" ) @@ -43,6 +44,9 @@ const serviceNamespaceKeyCSM = "csm.service_namespace_name" const serviceNameValue = "grpc-service" const serviceNamespaceValue = "grpc-service-namespace" +const localityKey = "grpc.lb.locality" +const localityValue = `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}` + // TestTelemetryLabels tests that telemetry labels from CDS make their way to // the stats handler. The stats handler sets the mutable context value that the // cluster impl picker will write telemetry labels to, and then the stats @@ -126,13 +130,14 @@ func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { // aren't started. All of these should have access to the desired telemetry // labels. case *stats.OutPayload, *stats.InPayload, *stats.End: - if label, ok := fsh.labels.TelemetryLabels[serviceNameKeyCSM]; !ok || label != serviceNameValue { - fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKeyCSM, serviceNameValue, label) + want := map[string]string{ + serviceNameKeyCSM: serviceNameValue, + serviceNamespaceKeyCSM: serviceNamespaceValue, + localityKey: localityValue, } - if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKeyCSM]; !ok || label != serviceNamespaceValue { - fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKeyCSM, serviceNamespaceValue, label) + if diff := cmp.Diff(fsh.labels.TelemetryLabels, want); diff != "" { + fsh.t.Fatalf("fsh.labels.TelemetryLabels (-got +want): %v", diff) } - default: // Nothing to assert for the other stats.Handler callouts. } diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index d8cb8df1a81c..fbadbb92ba39 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -19,6 +19,8 @@ package clusterimpl import ( + "context" + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" @@ -96,14 +98,23 @@ func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker { } } +func telemetryLabels(ctx context.Context) map[string]string { + if ctx == nil { + return nil + } + labels := stats.GetLabels(ctx) + if labels == nil { + return nil + } + return labels.TelemetryLabels +} + func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // Unconditionally set labels if present, even dropped or queued RPC's can // use these labels. - if info.Ctx != nil { - if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil { - for key, value := range d.telemetryLabels { - labels.TelemetryLabels[key] = value - } + if labels := telemetryLabels(info.Ctx); labels != nil { + for key, value := range d.telemetryLabels { + labels[key] = value } } @@ -156,6 +167,10 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return pr, err } + if labels := telemetryLabels(info.Ctx); labels != nil { + labels["grpc.lb.locality"] = lIDStr + } + if d.loadStore != nil { d.loadStore.CallStarted(lIDStr) oldDone := pr.Done