From 1596f2d0840e9282af200e25ad16790f065024a5 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 29 May 2024 19:32:40 -0400 Subject: [PATCH 1/8] Add e2e testing for CSM Observability --- stats/opentelemetry/client_metrics.go | 15 +- stats/opentelemetry/csm/observability_test.go | 508 ++++++++++++++++++ stats/opentelemetry/e2e_test.go | 385 ++----------- stats/opentelemetry/example_test.go | 5 - .../internal/testingutils/testingutils.go | 430 +++++++++++++++ stats/opentelemetry/opentelemetry.go | 6 - 6 files changed, 986 insertions(+), 363 deletions(-) create mode 100644 stats/opentelemetry/csm/observability_test.go create mode 100644 stats/opentelemetry/internal/testingutils/testingutils.go diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index cbc4e36fdcbe..8654132153fd 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -63,7 +63,7 @@ func (h *clientStatsHandler) initializeMetrics() { func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ci := &callInfo{ - target: h.determineTarget(cc), + target: cc.CanonicalTarget(), method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) @@ -83,17 +83,6 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string return err } -// determineTarget determines the target to record attributes with. This will be -// "other" if target filter is set and specifies, the target name as is -// otherwise. -func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string { - target := cc.CanonicalTarget() - if f := h.options.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) { - target = "other" - } - return target -} - // determineMethod determines the method to record attributes with. This will be // "other" if StaticMethod isn't specified or if method filter is set and // specifies, the method name as is otherwise. @@ -108,7 +97,7 @@ func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOpt func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { ci := &callInfo{ - target: h.determineTarget(cc), + target: cc.CanonicalTarget(), method: h.determineMethod(method, opts...), } ctx = setCallInfo(ctx, ci) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go new file mode 100644 index 000000000000..173ccef3dd4a --- /dev/null +++ b/stats/opentelemetry/csm/observability_test.go @@ -0,0 +1,508 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package csm + +import ( + "context" + "errors" + "io" + "os" + "testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + istats "google.golang.org/grpc/internal/stats" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/bootstrap" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats/opentelemetry" + "google.golang.org/grpc/stats/opentelemetry/internal/testingutils" +) + +// setupEnv configures the environment for CSM Observability Testing. It returns +// a cleanup function to be invoked to clear the environment. +func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID string, csmCanonicalServiceName string, csmWorkloadName string) func() { + clearEnv() + + cleanup, err := bootstrap.CreateFile(bootstrap.Options{ + NodeID: nodeID, + ServerURI: "xds_server_uri", + }) + if err != nil { + t.Fatalf("failed to create bootstrap: %v", err) + } + os.Setenv("CSM_CANONICAL_SERVICE_NAME", csmCanonicalServiceName) + os.Setenv("CSM_WORKLOAD_NAME", csmWorkloadName) + + var attributes []attribute.KeyValue + for k, v := range resourceDetectorEmissions { + attributes = append(attributes, attribute.String(k, v)) + } + // Return the attributes configured as part of the test in place + // of reading from resource. + attrSet := attribute.NewSet(attributes...) + origGetAttrSet := getAttrSetFromResourceDetector + getAttrSetFromResourceDetector = func(context.Context) *attribute.Set { + return &attrSet + } + + return func() { + cleanup() + os.Unsetenv("CSM_CANONICAL_SERVICE_NAME") + os.Unsetenv("CSM_WORKLOAD_NAME") + getAttrSetFromResourceDetector = origGetAttrSet + } +} + +// TestCSMPluginOption tests the CSM Plugin Option and labels. It configures the +// environment for the CSM Plugin Option to read from. It then configures a +// system with a gRPC Client and gRPC server with the OpenTelemetry Dial and +// Server Option configured with a CSM Plugin Option with certain unary and +// streaming handlers set to induce different ways of setting metadata exchange +// labels, and makes a Unary RPC and a Streaming RPC. These two RPCs should +// cause certain recording for each registered metric observed through a Manual +// Metrics Reader on the provided OpenTelemetry SDK's Meter Provider. The CSM +// Labels emitted from the plugin option should be attached to the relevant +// metrics. +func (s) TestCSMPluginOption(t *testing.T) { + resourceDetectorEmissions := map[string]string{ + "cloud.platform": "gcp_kubernetes_engine", + "cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location + "cloud.account.id": "cloud_account_id_val", + "k8s.namespace.name": "k8s_namespace_name_val", + "k8s.cluster.name": "k8s_cluster_name_val", + } + nodeID := "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" + csmCanonicalServiceName := "csm_canonical_service_name" + csmWorkloadName := "csm_workload_name" + cleanup := setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) + defer cleanup() + + attributesWant := map[string]string{ + "csm.workload_canonical_service": csmCanonicalServiceName, // from env + "csm.mesh_id": "mesh_id", // from bootstrap env var + + // No xDS Labels - this happens in a test below. + + "csm.remote_workload_type": "gcp_kubernetes_engine", + "csm.remote_workload_canonical_service": csmCanonicalServiceName, + "csm.remote_workload_project_id": "cloud_account_id_val", + "csm.remote_workload_cluster_name": "k8s_cluster_name_val", + "csm.remote_workload_namespace_name": "k8s_namespace_name_val", + "csm.remote_workload_location": "cloud_region_val", + "csm.remote_workload_name": csmWorkloadName, + } + + var csmLabels []attribute.KeyValue + for k, v := range attributesWant { + csmLabels = append(csmLabels, attribute.String(k, v)) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + tests := []struct { + name string + // To test the different operations for Unary and Streaming RPC's from + // the interceptor level that can plumb metadata exchange header in. + unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error + opts testingutils.MetricDataOptions + }{ + // Different permutations of operations that should all trigger csm md + // exchange labels to be written on the wire. + { + name: "normal-flow", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { // this is trailers only - no messages or headers sent + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + opts: testingutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryMessageSent: true, + StreamingMessageSent: false, + }, + }, + { + name: "trailers-only-unary-streaming", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return nil, errors.New("some error") // return an error and no message - this triggers trailers only - no messages or headers sent + }, + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + opts: testingutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryMessageSent: false, + StreamingMessageSent: false, + UnaryCallFailed: true, + }, + }, + { + name: "set-header-client-server-side", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + grpc.SetHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + stream.SetHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + opts: testingutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryMessageSent: true, + StreamingMessageSent: false, + }, + }, + { + name: "send-header-client-server-side", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + grpc.SendHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + stream.SendHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + opts: testingutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryMessageSent: true, + StreamingMessageSent: false, + }, + }, + { + name: "send-msg-client-server-side", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { + stream.Send(&testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}) + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + opts: testingutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryMessageSent: true, + StreamingMessageSent: true, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mr, ss := setup(ctx, t, test.unaryCallFunc, test.streamingCallFunc, true, nil) + defer ss.Stop() + + var request *testpb.SimpleRequest + if test.opts.UnaryMessageSent { + request = &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }} + } + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics to be emitted, which should be able to be observed + // through the Metric Reader. + ss.Client.UnaryCall(ctx, request, grpc.UseCompressor(gzip.Name)) + stream, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name)) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + if test.opts.StreamingMessageSent { + if err := stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("stream.Send failed") + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("stream.Recv failed with error: %v", err) + } + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + + rm := &metricdata.ResourceMetrics{} + mr.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + opts := test.opts + opts.Target = ss.Target + wantMetrics := testingutils.MetricData(opts) + testingutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) + }) + } +} + +// setup creates a stub server with the provided unary call and full duplex call +// handlers, alongside with OpenTelemetry component with a CSM Plugin Option +// configured on client and server side based off bool. It also takes in a unary +// interceptor to configure. It returns a reader for metrics emitted from the +// OpenTelemetry component and the stub server. +func setup(ctx context.Context, t *testing.T, unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error), streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error, serverOTelConfigured bool, clientUnaryInterceptor grpc.UnaryClientInterceptor) (*metric.ManualReader, *stubserver.StubServer) { // specific for plugin option + reader := metric.NewManualReader() + provider := metric.NewMeterProvider( + metric.WithReader(reader), + ) + ss := &stubserver.StubServer{ + UnaryCallF: unaryCallFunc, + FullDuplexCallF: streamingCallFunc, + } + + po := newPluginOption(ctx) + var sopts []grpc.ServerOption + if serverOTelConfigured { + sopts = append(sopts, serverOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + }}, po)) + } + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + }, + }, po)} + if clientUnaryInterceptor != nil { + dopts = append(dopts, grpc.WithUnaryInterceptor(clientUnaryInterceptor)) + } + if err := ss.Start(sopts, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + return reader, ss +} + +func unaryInterceptorAttachxDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = istats.SetLabels(ctx, &istats.Labels{ + TelemetryLabels: map[string]string{ + // mock what the cluster impl would write here ("csm." xDS Labels) + "csm.service_name": "service_name_val", + "csm.service_namespace_name": "service_namespace_val", + }, + }) + + // TagRPC will just see this in the context and set it's xDS Labels to point + // to this map on the heap. + return invoker(ctx, method, req, reply, cc, opts...) +} + +// TestxDSLabels tests that xDS Labels get emitted from OpenTelemetry metrics. +// This test configures OpenTelemetry with the CSM Plugin Option, and xDS +// Optional Labels turned on. It then configures an interceptor to attach +// labels, representing the cluster_impl picker. It then makes a unary RPC, and +// expects xDS Labels labels to be attached to emitted relevant metrics. Full +// xDS System alongside OpenTelemetry will be tested with interop. (there is +// a test for xDS -> Stats handler and this tests -> OTel -> emission). +func (s) TestxDSLabels(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + mr, ss := setup(ctx, t, func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, nil, false, unaryInterceptorAttachxDSLabels) + defer ss.Stop() + ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, grpc.UseCompressor(gzip.Name)) + + rm := &metricdata.ResourceMetrics{} + mr.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + targetAttr := attribute.String("grpc.target", ss.Target) + unaryStatusAttr := attribute.String("grpc.status", "OK") + + serviceNameAttr := attribute.String("csm.service_name", "service_name_val") + serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val") + meshIDAttr := attribute.String("csm.mesh_id", "unknown") + workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown") + remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown") + remoteWorkloadCanonicalServiceAttr := attribute.String("csm.remote_workload_canonical_service", "unknown") + + unaryMethodClientSideEnd := []attribute.KeyValue{ + unaryMethodAttr, + targetAttr, + unaryStatusAttr, + serviceNameAttr, + serviceNamespaceAttr, + meshIDAttr, + workloadCanonicalServiceAttr, + remoteWorkloadTypeAttr, + remoteWorkloadCanonicalServiceAttr, + } + + unaryCompressedBytesSentRecv := int64(57) // Fixed 10000 bytes with gzip assumption. + unaryBucketCounts := []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + unaryExtrema := metricdata.NewExtrema(int64(57)) + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, // Doesn't have xDS Labels, CSM Labels start from header or trailer from server, whichever comes first, so this doesn't need it + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: testingutils.DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: testingutils.DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: testingutils.DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr), + Count: 1, + Bounds: testingutils.DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } + + testingutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) +} + +// TestObservability tests that Observability global function compiles and runs +// without error. The actual functionality of this function will be verified in +// interop tests. +func (s) TestObservability(t *testing.T) { + cleanup := EnableObservability(context.Background(), opentelemetry.Options{}) + defer cleanup() +} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index c0850d6219b7..e0ca869fb1ed 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -18,7 +18,6 @@ package opentelemetry import ( "context" - "fmt" "io" "testing" "time" @@ -29,6 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/stats/opentelemetry/internal/testingutils" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" @@ -46,35 +46,10 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// waitForServerCompletedRPCs waits until the unary and streaming stats.End -// calls are finished processing. It does this by waiting for the expected -// metric triggered by stats.End to appear through the passed in metrics reader. -func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) { - for ; ctx.Err() == nil; <-time.After(time.Millisecond) { - rm := &metricdata.ResourceMetrics{} - reader.Collect(ctx, rm) - gotMetrics := map[string]metricdata.Metrics{} - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - gotMetrics[m.Name] = m - } - } - val, ok := gotMetrics[wantMetric.Name] - if !ok { - continue - } - if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - continue - } - return gotMetrics, nil - } - return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) -} - // setup creates a stub server with OpenTelemetry component configured on client // and server side. It returns a reader for metrics emitted from OpenTelemetry // component and the server. -func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { +func setup(t *testing.T, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { reader := metric.NewManualReader() provider := metric.NewMeterProvider( metric.WithReader(reader), @@ -94,23 +69,16 @@ func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReade } }, } - var taf func(string) bool - if tafOn { - taf = func(str string) bool { - return str != ss.Target - } - } + if err := ss.Start([]grpc.ServerOption{ServerOption(Options{ MetricsOptions: MetricsOptions{ MeterProvider: provider, Metrics: DefaultMetrics, - TargetAttributeFilter: taf, MethodAttributeFilter: maf, }})}, DialOption(Options{ MetricsOptions: MetricsOptions{ MeterProvider: provider, Metrics: DefaultMetrics, - TargetAttributeFilter: taf, MethodAttributeFilter: maf, }, })); err != nil { @@ -127,12 +95,11 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { // Will allow duplex/any other type of RPC. return str != "/grpc.testing.TestService/UnaryCall" } - // pull out setup into a helper - reader, ss := setup(t, true, maf) + reader, ss := setup(t, maf) defer ss.Stop() - // make a single RPC (unary rpc), and filter out the target and method - // that would correspond. + // make a single RPC (unary rpc), and filter out the method that would + // correspond. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ @@ -151,6 +118,12 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { } rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } wantMetrics := []metricdata.Metrics{ { @@ -160,11 +133,11 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", "other")), + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", ss.Target)), Value: 1, }, { - Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", "other")), + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", ss.Target)), Value: 1, }, }, @@ -172,54 +145,29 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { IsMonotonic: true, }, }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { // Method should go to "other" due to the method attribute filter. + Attributes: attribute.NewSet(attribute.String("grpc.method", "other"), attribute.String("grpc.status", "OK")), + Count: 1, + Bounds: testingutils.DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.status", "OK")), + Count: 1, + Bounds: testingutils.DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, } - gotMetrics := map[string]metricdata.Metrics{} - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - gotMetrics[m.Name] = m - } - } - - for _, metric := range wantMetrics { - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) - } - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) - } - } -} -// assertDataPointWithinFiveSeconds asserts the metric passed in contains -// a histogram with dataPoints that fall within buckets that are <=5. -func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { - histo, ok := metric.Data.(metricdata.Histogram[float64]) - if !ok { - return fmt.Errorf("metric data is not histogram") - } - for _, dataPoint := range histo.DataPoints { - var boundWithFive int - for i, bucket := range dataPoint.Bounds { - if bucket >= 5 { - boundWithFive = i - } - } - foundPoint := false - for i, bucket := range dataPoint.BucketCounts { - if i >= boundWithFive { - return fmt.Errorf("data point not found in bucket <=5 seconds") - } - if bucket == 1 { - foundPoint = true - break - } - } - if !foundPoint { - return fmt.Errorf("no data point found for metric") - } - } - return nil + testingutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) } // TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry @@ -232,7 +180,7 @@ func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { // on the Client (no StaticMethodCallOption set) and Server. The method // attribute on subsequent metrics should be bucketed in "other". func (s) TestAllMetricsOneFunction(t *testing.T) { - reader, ss := setup(t, false, nil) + reader, ss := setup(t, nil) defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -264,257 +212,12 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { } } - unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") - duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") - - targetAttr := attribute.String("grpc.target", ss.Target) - statusAttr := attribute.String("grpc.status", "OK") - - wantMetrics := []metricdata.Metrics{ - { - Name: "grpc.client.attempt.started", - Description: "Number of client call attempts started.", - Unit: "attempt", - Data: metricdata.Sum[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), - Value: 1, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr), - Value: 1, - }, - }, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - }, - }, - { - Name: "grpc.client.attempt.duration", - Description: "End-to-end time taken to complete a client call attempt.", - Unit: "s", - Data: metricdata.Histogram[float64]{ - DataPoints: []metricdata.HistogramDataPoint[float64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.client.attempt.sent_total_compressed_message_size", - Description: "Compressed message bytes sent per client call attempt.", - Unit: "By", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.client.attempt.rcvd_total_compressed_message_size", - Description: "Compressed message bytes received per call attempt.", - Unit: "By", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.client.call.duration", - Description: "Time taken by gRPC to complete an RPC from application's perspective.", - Unit: "s", - Data: metricdata.Histogram[float64]{ - DataPoints: []metricdata.HistogramDataPoint[float64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.server.call.started", - Description: "Number of server calls started.", - Unit: "call", - Data: metricdata.Sum[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr), - Value: 1, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr), - Value: 1, - }, - }, - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - }, - }, - { - Name: "grpc.server.call.sent_total_compressed_message_size", - Unit: "By", - Description: "Compressed message bytes sent per server call.", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.server.call.rcvd_total_compressed_message_size", - Unit: "By", - Description: "Compressed message bytes received per server call.", - Data: metricdata.Histogram[int64]{ - DataPoints: []metricdata.HistogramDataPoint[int64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(57)), - Max: metricdata.NewExtrema(int64(57)), - Sum: 57, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultSizeBounds, - BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - Min: metricdata.NewExtrema(int64(0)), - Max: metricdata.NewExtrema(int64(0)), - Sum: 0, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - { - Name: "grpc.server.call.duration", - Description: "End-to-end time taken to complete a call from server transport's perspective.", - Unit: "s", - Data: metricdata.Histogram[float64]{ - DataPoints: []metricdata.HistogramDataPoint[float64]{ - { - Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - { - Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), - Count: 1, - Bounds: DefaultLatencyBounds, - }, - }, - Temporality: metricdata.CumulativeTemporality, - }, - }, - } - - for _, metric := range wantMetrics { - if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { - // Sync the metric reader to see the event because stats.End is - // handled async server side. Thus, poll until metrics created from - // stats.End show up. - if gotMetrics, err = waitForServerCompletedRPCs(ctx, reader, metric, t); err != nil { - t.Fatalf("error waiting for sent total compressed message size for metric: %v", metric.Name) - } - continue - } - - // If one of the duration metrics, ignore the bucket counts, and make - // sure it count falls within a bucket <= 5 seconds (maximum duration of - // test due to context). - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) - } - if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) - } - if err := assertDataPointWithinFiveSeconds(val); err != nil { - t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) - } - continue - } - - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) - } - } + wantMetrics := testingutils.MetricData(testingutils.MetricDataOptions{ + Target: ss.Target, + UnaryMessageSent: true, + StreamingMessageSent: false, + }) + testingutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) stream, err = ss.Client.FullDuplexCall(ctx) if err != nil { @@ -541,6 +244,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { gotMetrics[m.Name] = m } } + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + + targetAttr := attribute.String("grpc.target", ss.Target) otherMethodAttr := attribute.String("grpc.method", "other") wantMetrics = []metricdata.Metrics{ { diff --git a/stats/opentelemetry/example_test.go b/stats/opentelemetry/example_test.go index 607bcf55e0a0..b4d6755b2299 100644 --- a/stats/opentelemetry/example_test.go +++ b/stats/opentelemetry/example_test.go @@ -17,8 +17,6 @@ package opentelemetry_test import ( - "strings" - "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/stats/opentelemetry" @@ -55,9 +53,6 @@ func Example_dialOption() { MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics, // equivalent to unset - distinct from empty - TargetAttributeFilter: func(str string) bool { - return !strings.HasPrefix(str, "dns") // Filter out DNS targets. - }, }, } do := opentelemetry.DialOption(opts) diff --git a/stats/opentelemetry/internal/testingutils/testingutils.go b/stats/opentelemetry/internal/testingutils/testingutils.go new file mode 100644 index 000000000000..e9716a35ef18 --- /dev/null +++ b/stats/opentelemetry/internal/testingutils/testingutils.go @@ -0,0 +1,430 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package testingutils contains helpers for OpenTelemetry tests. +package testingutils + +import ( + "context" + "fmt" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +var ( + // DefaultLatencyBounds are the default bounds for latency metrics. + DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} // provide "advice" through API, SDK should set this too + // DefaultSizeBounds are the default bounds for metrics which record size. + DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} +) + +// waitForServerCompletedRPCs waits until the unary and streaming stats.End +// calls are finished processing. It does this by waiting for the expected +// metric triggered by stats.End to appear through the passed in metrics reader. +func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + val, ok := gotMetrics[wantMetric.Name] + if !ok { + continue + } + if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + continue + } + return gotMetrics, nil + } + return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) +} + +// assertDataPointWithinFiveSeconds asserts the metric passed in contains +// a histogram with dataPoints that fall within buckets that are <=5. +func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { + histo, ok := metric.Data.(metricdata.Histogram[float64]) + if !ok { + return fmt.Errorf("metric data is not histogram") + } + for _, dataPoint := range histo.DataPoints { + var boundWithFive int + for i, bucket := range dataPoint.Bounds { + if bucket >= 5 { + boundWithFive = i + } + } + foundPoint := false + for i, bucket := range dataPoint.BucketCounts { + if i >= boundWithFive { + return fmt.Errorf("data point not found in bucket <=5 seconds") + } + if bucket == 1 { + foundPoint = true + break + } + } + if !foundPoint { + return fmt.Errorf("no data point found for metric") + } + } + return nil +} + +// MetricDataOptions are the options used to configure the metricData emissions +// of expected metrics data from NewMetricData. (rename function? this feels +// like the different config state spaces for xDS haha). +type MetricDataOptions struct { + // CSMLabels are the csm labels to attach to metrics which receive csm + // labels (all A66 expect client call and started RPC's client and server + // side). + CSMLabels []attribute.KeyValue + // Target is the target of the client and server. + Target string + // UnaryMessageSent is whether a message was sent for the unary RPC or not. + // This unary message is assumed to be 10000 bytes and the RPC is assumed to + // have a gzip compressor call option set. This assumes both client and peer + // sent a message. + UnaryMessageSent bool + // StreamingMessageSent is whether a message was sent for the streaming RPC + // or not. This unary message is assumed to be 10000 bytes and the RPC is + // assumed to have a gzip compressor call option set. This assumes both + // client and peer sent a message. + StreamingMessageSent bool + // UnaryCallFailed is whether the Unary Call failed, which would trigger + // trailers only. + UnaryCallFailed bool +} + +// MetricData returns a metricsDataSlice for A66 metrics for client and server +// with a unary RPC and streaming RPC with certain compression and message flow +// sent. If csmAttributes is set to true, the corresponding CSM Metrics (not +// client side call metrics, or started on client and server side). +func MetricData(options MetricDataOptions) []metricdata.Metrics { + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + + targetAttr := attribute.String("grpc.target", options.Target) + + unaryStatusAttr := attribute.String("grpc.status", "OK") + streamingStatusAttr := attribute.String("grpc.status", "OK") + if options.UnaryCallFailed { + unaryStatusAttr = attribute.String("grpc.status", "UNKNOWN") + } + + unaryMethodClientSideEnd := []attribute.KeyValue{ + unaryMethodAttr, + targetAttr, + unaryStatusAttr, + } + streamingMethodClientSideEnd := []attribute.KeyValue{ + duplexMethodAttr, + targetAttr, + streamingStatusAttr, + } + unaryMethodServerSideEnd := []attribute.KeyValue{ + unaryMethodAttr, + unaryStatusAttr, + } + + streamingMethodServerSideEnd := []attribute.KeyValue{ + duplexMethodAttr, + streamingStatusAttr, + } + + unaryMethodClientSideEnd = append(unaryMethodClientSideEnd, options.CSMLabels...) + streamingMethodClientSideEnd = append(streamingMethodClientSideEnd, options.CSMLabels...) + unaryMethodServerSideEnd = append(unaryMethodServerSideEnd, options.CSMLabels...) + streamingMethodServerSideEnd = append(streamingMethodServerSideEnd, options.CSMLabels...) + unaryCompressedBytesSentRecv := int64(0) + unaryBucketCounts := []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + unaryExtrema := metricdata.NewExtrema(int64(0)) + if options.UnaryMessageSent { + unaryCompressedBytesSentRecv = 57 // Fixed 10000 bytes with gzip assumption. + unaryBucketCounts = []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + unaryExtrema = metricdata.NewExtrema(int64(57)) + } + + var streamingCompressedBytesSentRecv int64 + streamingBucketCounts := []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + streamingExtrema := metricdata.NewExtrema(int64(0)) + if options.StreamingMessageSent { + streamingCompressedBytesSentRecv = 57 // Fixed 10000 bytes with gzip assumption. + streamingBucketCounts = []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + streamingExtrema = metricdata.NewExtrema(int64(57)) + } + + return []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(streamingMethodClientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodClientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, streamingStatusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: unaryBucketCounts, + Min: unaryExtrema, + Max: unaryExtrema, + Sum: unaryCompressedBytesSentRecv, + }, + { + Attributes: attribute.NewSet(streamingMethodServerSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: streamingBucketCounts, + Min: streamingExtrema, + Max: streamingExtrema, + Sum: streamingCompressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodServerSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(streamingMethodServerSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } +} + +// CompareGotWantMetrics asserts wantMetrics are what we expect. It polls for +// eventual server metrics (not emitted synchronously with client side rpc +// returning), and for duration metrics makes sure the data point is within +// possible testing time (five seconds from context timeout). +func CompareGotWantMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { // return an error instead of t... + for _, metric := range wantMetrics { + if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { + // Sync the metric reader to see the event because stats.End is + // handled async server side. Thus, poll until metrics created from + // stats.End show up. + var err error + if gotMetrics, err = waitForServerCompletedRPCs(ctx, mr, metric, t); err != nil { // move to shared helper + t.Fatalf("error waiting for sent total compressed message size for metric %v: %v", metric.Name, err) + } + continue + } + + // If one of the duration metrics, ignore the bucket counts, and make + // sure it count falls within a bucket <= 5 seconds (maximum duration of + // test due to context). + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("metric %v not present in recorded metrics", metric.Name) + } + if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { + t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + } + if err := assertDataPointWithinFiveSeconds(val); err != nil { + t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) + } + continue + } + + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + } + } +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index 6a6bcd1627da..4bc195b4e12c 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -120,12 +120,6 @@ type MetricsOptions struct { // will be recorded. Metrics *Metrics - // TargetAttributeFilter is a callback that takes the target string of the - // channel and returns a bool representing whether to use target as a label - // value or use the string "other". If unset, will use the target string as - // is. This only applies for client side metrics. - TargetAttributeFilter func(string) bool - // MethodAttributeFilter is to record the method name of RPCs handled by // grpc.UnknownServiceHandler, but take care to limit the values allowed, as // allowing too many will increase cardinality and could cause severe memory From 4a961e4350c4f52b15521a64a035dbb5087a6443 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 29 May 2024 19:45:15 -0400 Subject: [PATCH 2/8] Vet --- stats/opentelemetry/csm/observability_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index 173ccef3dd4a..b11e99b60e94 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -344,7 +344,7 @@ func unaryInterceptorAttachxDSLabels(ctx context.Context, method string, req, re ctx = istats.SetLabels(ctx, &istats.Labels{ TelemetryLabels: map[string]string{ // mock what the cluster impl would write here ("csm." xDS Labels) - "csm.service_name": "service_name_val", + "csm.service_name": "service_name_val", "csm.service_namespace_name": "service_namespace_val", }, }) From f48ac82e443bbede734d6276be718ffd78ef1df6 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 31 May 2024 17:33:36 -0400 Subject: [PATCH 3/8] Responded to Easwar's comments --- stats/opentelemetry/csm/observability_test.go | 28 +++++++++---------- stats/opentelemetry/e2e_test.go | 12 ++++---- .../testutils.go} | 25 +++++++++-------- 3 files changed, 34 insertions(+), 31 deletions(-) rename stats/opentelemetry/internal/{testingutils/testingutils.go => testutils/testutils.go} (94%) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index b11e99b60e94..e3de299700cd 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -37,7 +37,7 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats/opentelemetry" - "google.golang.org/grpc/stats/opentelemetry/internal/testingutils" + "google.golang.org/grpc/stats/opentelemetry/internal/testutils" ) // setupEnv configures the environment for CSM Observability Testing. It returns @@ -126,7 +126,7 @@ func (s) TestCSMPluginOption(t *testing.T) { // the interceptor level that can plumb metadata exchange header in. unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error - opts testingutils.MetricDataOptions + opts testutils.MetricDataOptions }{ // Different permutations of operations that should all trigger csm md // exchange labels to be written on the wire. @@ -145,7 +145,7 @@ func (s) TestCSMPluginOption(t *testing.T) { } } }, - opts: testingutils.MetricDataOptions{ + opts: testutils.MetricDataOptions{ CSMLabels: csmLabels, UnaryMessageSent: true, StreamingMessageSent: false, @@ -164,7 +164,7 @@ func (s) TestCSMPluginOption(t *testing.T) { } } }, - opts: testingutils.MetricDataOptions{ + opts: testutils.MetricDataOptions{ CSMLabels: csmLabels, UnaryMessageSent: false, StreamingMessageSent: false, @@ -189,7 +189,7 @@ func (s) TestCSMPluginOption(t *testing.T) { } } }, - opts: testingutils.MetricDataOptions{ + opts: testutils.MetricDataOptions{ CSMLabels: csmLabels, UnaryMessageSent: true, StreamingMessageSent: false, @@ -213,7 +213,7 @@ func (s) TestCSMPluginOption(t *testing.T) { } } }, - opts: testingutils.MetricDataOptions{ + opts: testutils.MetricDataOptions{ CSMLabels: csmLabels, UnaryMessageSent: true, StreamingMessageSent: false, @@ -237,7 +237,7 @@ func (s) TestCSMPluginOption(t *testing.T) { } } }, - opts: testingutils.MetricDataOptions{ + opts: testutils.MetricDataOptions{ CSMLabels: csmLabels, UnaryMessageSent: true, StreamingMessageSent: true, @@ -294,8 +294,8 @@ func (s) TestCSMPluginOption(t *testing.T) { opts := test.opts opts.Target = ss.Target - wantMetrics := testingutils.MetricData(opts) - testingutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) + wantMetrics := testutils.MetricData(opts) + testutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) }) } } @@ -435,7 +435,7 @@ func (s) TestxDSLabels(t *testing.T) { { Attributes: attribute.NewSet(unaryMethodClientSideEnd...), Count: 1, - Bounds: testingutils.DefaultLatencyBounds, + Bounds: testutils.DefaultLatencyBounds, }, }, Temporality: metricdata.CumulativeTemporality, @@ -450,7 +450,7 @@ func (s) TestxDSLabels(t *testing.T) { { Attributes: attribute.NewSet(unaryMethodClientSideEnd...), Count: 1, - Bounds: testingutils.DefaultSizeBounds, + Bounds: testutils.DefaultSizeBounds, BucketCounts: unaryBucketCounts, Min: unaryExtrema, Max: unaryExtrema, @@ -469,7 +469,7 @@ func (s) TestxDSLabels(t *testing.T) { { Attributes: attribute.NewSet(unaryMethodClientSideEnd...), Count: 1, - Bounds: testingutils.DefaultSizeBounds, + Bounds: testutils.DefaultSizeBounds, BucketCounts: unaryBucketCounts, Min: unaryExtrema, Max: unaryExtrema, @@ -488,7 +488,7 @@ func (s) TestxDSLabels(t *testing.T) { { Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, unaryStatusAttr), Count: 1, - Bounds: testingutils.DefaultLatencyBounds, + Bounds: testutils.DefaultLatencyBounds, }, }, Temporality: metricdata.CumulativeTemporality, @@ -496,7 +496,7 @@ func (s) TestxDSLabels(t *testing.T) { }, } - testingutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) + testutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) } // TestObservability tests that Observability global function compiles and runs diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index e0ca869fb1ed..439598bf4912 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -28,7 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" - "google.golang.org/grpc/stats/opentelemetry/internal/testingutils" + "google.golang.org/grpc/stats/opentelemetry/internal/testutils" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" @@ -154,12 +154,12 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { { // Method should go to "other" due to the method attribute filter. Attributes: attribute.NewSet(attribute.String("grpc.method", "other"), attribute.String("grpc.status", "OK")), Count: 1, - Bounds: testingutils.DefaultLatencyBounds, + Bounds: testutils.DefaultLatencyBounds, }, { Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.status", "OK")), Count: 1, - Bounds: testingutils.DefaultLatencyBounds, + Bounds: testutils.DefaultLatencyBounds, }, }, Temporality: metricdata.CumulativeTemporality, @@ -167,7 +167,7 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { }, } - testingutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) } // TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry @@ -212,12 +212,12 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { } } - wantMetrics := testingutils.MetricData(testingutils.MetricDataOptions{ + wantMetrics := testutils.MetricData(testutils.MetricDataOptions{ Target: ss.Target, UnaryMessageSent: true, StreamingMessageSent: false, }) - testingutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) stream, err = ss.Client.FullDuplexCall(ctx) if err != nil { diff --git a/stats/opentelemetry/internal/testingutils/testingutils.go b/stats/opentelemetry/internal/testutils/testutils.go similarity index 94% rename from stats/opentelemetry/internal/testingutils/testingutils.go rename to stats/opentelemetry/internal/testutils/testutils.go index e9716a35ef18..ad0133ace5dd 100644 --- a/stats/opentelemetry/internal/testingutils/testingutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -15,7 +15,7 @@ */ // Package testingutils contains helpers for OpenTelemetry tests. -package testingutils +package testutils import ( "context" @@ -31,7 +31,7 @@ import ( var ( // DefaultLatencyBounds are the default bounds for latency metrics. - DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} // provide "advice" through API, SDK should set this too + DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} // DefaultSizeBounds are the default bounds for metrics which record size. DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} ) @@ -39,7 +39,10 @@ var ( // waitForServerCompletedRPCs waits until the unary and streaming stats.End // calls are finished processing. It does this by waiting for the expected // metric triggered by stats.End to appear through the passed in metrics reader. -func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) { +// +// Returns a new gotMetrics map containing the metric data being polled for, or +// an error if failed to wait for metric. +func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric.Reader, wantMetric metricdata.Metrics) (map[string]metricdata.Metrics, error) { for ; ctx.Err() == nil; <-time.After(time.Millisecond) { rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) @@ -61,9 +64,10 @@ func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantM return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) } -// assertDataPointWithinFiveSeconds asserts the metric passed in contains -// a histogram with dataPoints that fall within buckets that are <=5. -func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { +// checkDataPointWithinFiveSeconds checks if the metric passed in contains a +// histogram with dataPoints that fall within buckets that are <=5. Returns an +// error if check fails. +func checkDataPointWithinFiveSeconds(metric metricdata.Metrics) error { histo, ok := metric.Data.(metricdata.Histogram[float64]) if !ok { return fmt.Errorf("metric data is not histogram") @@ -93,8 +97,7 @@ func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { } // MetricDataOptions are the options used to configure the metricData emissions -// of expected metrics data from NewMetricData. (rename function? this feels -// like the different config state spaces for xDS haha). +// of expected metrics data from NewMetricData. type MetricDataOptions struct { // CSMLabels are the csm labels to attach to metrics which receive csm // labels (all A66 expect client call and started RPC's client and server @@ -393,14 +396,14 @@ func MetricData(options MetricDataOptions) []metricdata.Metrics { // eventual server metrics (not emitted synchronously with client side rpc // returning), and for duration metrics makes sure the data point is within // possible testing time (five seconds from context timeout). -func CompareGotWantMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { // return an error instead of t... +func CompareGotWantMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { for _, metric := range wantMetrics { if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { // Sync the metric reader to see the event because stats.End is // handled async server side. Thus, poll until metrics created from // stats.End show up. var err error - if gotMetrics, err = waitForServerCompletedRPCs(ctx, mr, metric, t); err != nil { // move to shared helper + if gotMetrics, err = waitForServerCompletedRPCs(ctx, t, mr, metric); err != nil { // move to shared helper t.Fatalf("error waiting for sent total compressed message size for metric %v: %v", metric.Name, err) } continue @@ -417,7 +420,7 @@ func CompareGotWantMetrics(ctx context.Context, t *testing.T, mr *metric.ManualR if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { t.Fatalf("metrics data type not equal for metric: %v", metric.Name) } - if err := assertDataPointWithinFiveSeconds(val); err != nil { + if err := checkDataPointWithinFiveSeconds(val); err != nil { t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) } continue From 6695b389a5648033d2144a098fefdfacfb4909b7 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 31 May 2024 17:38:04 -0400 Subject: [PATCH 4/8] Vet --- stats/opentelemetry/internal/testutils/testutils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go index ad0133ace5dd..3df0b8cda043 100644 --- a/stats/opentelemetry/internal/testutils/testutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -14,7 +14,7 @@ * limitations under the License. */ -// Package testingutils contains helpers for OpenTelemetry tests. +// Package testutils contains helpers for OpenTelemetry tests. package testutils import ( From 6f01d1a46079ab6dcf8093a5db43e1cec701b892 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 3 Jun 2024 11:44:12 -0400 Subject: [PATCH 5/8] Responded to Easwar's comments --- stats/opentelemetry/csm/observability_test.go | 4 +- stats/opentelemetry/e2e_test.go | 40 +++++++++---------- .../internal/testutils/testutils.go | 13 +++--- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index e3de299700cd..cc0f7a2cb19a 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -295,7 +295,7 @@ func (s) TestCSMPluginOption(t *testing.T) { opts := test.opts opts.Target = ss.Target wantMetrics := testutils.MetricData(opts) - testutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) + testutils.CompareMetrics(ctx, t, mr, gotMetrics, wantMetrics) }) } } @@ -496,7 +496,7 @@ func (s) TestxDSLabels(t *testing.T) { }, } - testutils.CompareGotWantMetrics(ctx, t, mr, gotMetrics, wantMetrics) + testutils.CompareMetrics(ctx, t, mr, gotMetrics, wantMetrics) } // TestObservability tests that Observability global function compiles and runs diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 439598bf4912..34f5b8657c98 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package opentelemetry +package opentelemetry_test import ( "context" @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/stats/opentelemetry" "google.golang.org/grpc/stats/opentelemetry/internal/testutils" "go.opentelemetry.io/otel/attribute" @@ -49,7 +50,7 @@ func Test(t *testing.T) { // setup creates a stub server with OpenTelemetry component configured on client // and server side. It returns a reader for metrics emitted from OpenTelemetry // component and the server. -func setup(t *testing.T, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { +func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { reader := metric.NewManualReader() provider := metric.NewMeterProvider( metric.WithReader(reader), @@ -70,16 +71,15 @@ func setup(t *testing.T, maf func(string) bool) (*metric.ManualReader, *stubserv }, } - if err := ss.Start([]grpc.ServerOption{ServerOption(Options{ - MetricsOptions: MetricsOptions{ + if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: DefaultMetrics, - MethodAttributeFilter: maf, - }})}, DialOption(Options{ - MetricsOptions: MetricsOptions{ - MeterProvider: provider, - Metrics: DefaultMetrics, - MethodAttributeFilter: maf, + Metrics: opentelemetry.DefaultMetrics, + MethodAttributeFilter: methodAttributeFilter, + }})}, opentelemetry.DialOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, }, })); err != nil { t.Fatalf("Error starting endpoint server: %v", err) @@ -87,19 +87,19 @@ func setup(t *testing.T, maf func(string) bool) (*metric.ManualReader, *stubserv return reader, ss } -// TestMethodTargetAttributeFilter tests the method and target attribute filter. -// The method and target filter set should bucket the grpc.method/grpc.target -// attribute into "other" if filter specifies. -func (s) TestMethodTargetAttributeFilter(t *testing.T) { +// TestMethodAttributeFilter tests the method attribute filter. The method and +// target filter set should bucket the grpc.method attribute into "other" if the +// method attribute filter specifies. +func (s) TestMethodAttributeFilter(t *testing.T) { maf := func(str string) bool { // Will allow duplex/any other type of RPC. - return str != "/grpc.testing.TestService/UnaryCall" + return str != testpb.TestService_UnaryCall_FullMethodName } reader, ss := setup(t, maf) defer ss.Stop() - // make a single RPC (unary rpc), and filter out the method that would - // correspond. + // Make a Unary and Streaming RPC. The Unary RPC should be filtered by the + // method attribute filter, and the Full Duplex (Streaming) RPC should not. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ @@ -167,7 +167,7 @@ func (s) TestMethodTargetAttributeFilter(t *testing.T) { }, } - testutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) } // TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry @@ -217,7 +217,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { UnaryMessageSent: true, StreamingMessageSent: false, }) - testutils.CompareGotWantMetrics(ctx, t, reader, gotMetrics, wantMetrics) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) stream, err = ss.Client.FullDuplexCall(ctx) if err != nil { diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go index 3df0b8cda043..5cb4805200fa 100644 --- a/stats/opentelemetry/internal/testutils/testutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -29,6 +29,9 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) +// Redefine default bounds here to avoid a cyclic dependency with top level +// opentelemetry package. Could define once through internal, but would make +// external opentelemetry godoc less readable. var ( // DefaultLatencyBounds are the default bounds for latency metrics. DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} @@ -392,11 +395,11 @@ func MetricData(options MetricDataOptions) []metricdata.Metrics { } } -// CompareGotWantMetrics asserts wantMetrics are what we expect. It polls for -// eventual server metrics (not emitted synchronously with client side rpc -// returning), and for duration metrics makes sure the data point is within -// possible testing time (five seconds from context timeout). -func CompareGotWantMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { +// CompareMetrics asserts wantMetrics are what we expect. It polls for eventual +// server metrics (not emitted synchronously with client side rpc returning), +// and for duration metrics makes sure the data point is within possible testing +// time (five seconds from context timeout). +func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, gotMetrics map[string]metricdata.Metrics, wantMetrics []metricdata.Metrics) { for _, metric := range wantMetrics { if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { // Sync the metric reader to see the event because stats.End is From 7de2e2f6f30281f1c2cebaea584d123729bfc957 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 6 Jun 2024 12:28:23 -0400 Subject: [PATCH 6/8] Responded to Easwar's comments --- stats/opentelemetry/csm/observability_test.go | 391 ++++++++++------ stats/opentelemetry/e2e_test.go | 11 +- .../internal/testutils/testutils.go | 430 ++++++++++++++++-- 3 files changed, 651 insertions(+), 181 deletions(-) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index cc0f7a2cb19a..b739ab7b77b0 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -40,9 +40,12 @@ import ( "google.golang.org/grpc/stats/opentelemetry/internal/testutils" ) -// setupEnv configures the environment for CSM Observability Testing. It returns -// a cleanup function to be invoked to clear the environment. -func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID string, csmCanonicalServiceName string, csmWorkloadName string) func() { +// setupEnv configures the environment for CSM Observability Testing. It sets +// the bootstrap env var to a bootstrap file with a nodeID provided. It sets CSM +// Env Vars as well, and mocks the resource detector's returned attribute set to +// simulate the environment. It registers a cleanup function on the provided t +// to restore the environment to it's original state. +func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID string, csmCanonicalServiceName string, csmWorkloadName string) { clearEnv() cleanup, err := bootstrap.CreateFile(bootstrap.Options{ @@ -50,8 +53,10 @@ func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID ServerURI: "xds_server_uri", }) if err != nil { - t.Fatalf("failed to create bootstrap: %v", err) + t.Fatalf("Failed to create bootstrap: %v", err) } + oldCSMCanonicalServiceName, csmCanonicalServiceNamePresent := os.LookupEnv("CSM_CANONICAL_SERVICE_NAME") + oldCSMWorkloadName, csmWorkloadNamePresent := os.LookupEnv("CSM_WORKLOAD_NAME") os.Setenv("CSM_CANONICAL_SERVICE_NAME", csmCanonicalServiceName) os.Setenv("CSM_WORKLOAD_NAME", csmWorkloadName) @@ -66,26 +71,33 @@ func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID getAttrSetFromResourceDetector = func(context.Context) *attribute.Set { return &attrSet } - - return func() { + t.Cleanup(func() { cleanup() - os.Unsetenv("CSM_CANONICAL_SERVICE_NAME") - os.Unsetenv("CSM_WORKLOAD_NAME") + if csmCanonicalServiceNamePresent { + os.Setenv("CSM_CANONICAL_SERVICE_NAME", oldCSMCanonicalServiceName) + } else { + os.Unsetenv("CSM_CANONICAL_SERVICE_NAME") + } + if csmWorkloadNamePresent { + os.Setenv("CSM_WORKLOAD_NAME", oldCSMWorkloadName) + } else { + os.Unsetenv("CSM_WORKLOAD_NAME") + } + getAttrSetFromResourceDetector = origGetAttrSet - } + }) } -// TestCSMPluginOption tests the CSM Plugin Option and labels. It configures the -// environment for the CSM Plugin Option to read from. It then configures a -// system with a gRPC Client and gRPC server with the OpenTelemetry Dial and -// Server Option configured with a CSM Plugin Option with certain unary and -// streaming handlers set to induce different ways of setting metadata exchange -// labels, and makes a Unary RPC and a Streaming RPC. These two RPCs should -// cause certain recording for each registered metric observed through a Manual -// Metrics Reader on the provided OpenTelemetry SDK's Meter Provider. The CSM -// Labels emitted from the plugin option should be attached to the relevant -// metrics. -func (s) TestCSMPluginOption(t *testing.T) { +// TestCSMPluginOptionUnary tests the CSM Plugin Option and labels. It +// configures the environment for the CSM Plugin Option to read from. It then +// configures a system with a gRPC Client and gRPC server with the OpenTelemetry +// Dial and Server Option configured with a CSM Plugin Option with a certain +// unary handler set to induce different ways of setting metadata exchange +// labels, and makes a Unary RPC. This RPC should cause certain recording for +// each registered metric observed through a Manual Metrics Reader on the +// provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the +// plugin option should be attached to the relevant metrics. +func (s) TestCSMPluginOptionUnary(t *testing.T) { resourceDetectorEmissions := map[string]string{ "cloud.platform": "gcp_kubernetes_engine", "cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location @@ -96,8 +108,7 @@ func (s) TestCSMPluginOption(t *testing.T) { nodeID := "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" csmCanonicalServiceName := "csm_canonical_service_name" csmWorkloadName := "csm_workload_name" - cleanup := setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) - defer cleanup() + setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) attributesWant := map[string]string{ "csm.workload_canonical_service": csmCanonicalServiceName, // from env @@ -122,14 +133,11 @@ func (s) TestCSMPluginOption(t *testing.T) { defer cancel() tests := []struct { name string - // To test the different operations for Unary and Streaming RPC's from - // the interceptor level that can plumb metadata exchange header in. - unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) - streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error - opts testutils.MetricDataOptions + // To test the different operations for Unary RPC's from the interceptor + // level that can plumb metadata exchange header in. + unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + opts testutils.MetricDataOptions }{ - // Different permutations of operations that should all trigger csm md - // exchange labels to be written on the wire. { name: "normal-flow", unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { @@ -137,25 +145,168 @@ func (s) TestCSMPluginOption(t *testing.T) { Body: make([]byte, 10000), }}, nil }, - streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { // this is trailers only - no messages or headers sent - for { - _, err := stream.Recv() - if err == io.EOF { - return nil - } - } - }, opts: testutils.MetricDataOptions{ - CSMLabels: csmLabels, - UnaryMessageSent: true, - StreamingMessageSent: false, + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), }, }, { - name: "trailers-only-unary-streaming", + name: "trailers-only", unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return nil, errors.New("some error") // return an error and no message - this triggers trailers only - no messages or headers sent }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCallFailed: true, + }, + }, + { + name: "set-header", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + grpc.SetHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + { + name: "send-header", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + grpc.SendHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) + + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + { + name: "send-msg", + unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + opts: testutils.MetricDataOptions{ + CSMLabels: csmLabels, + UnaryCompressedMessageSize: float64(57), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + ss := &stubserver.StubServer{UnaryCallF: test.unaryCallFunc} + po := newPluginOption(ctx) + sopts := []grpc.ServerOption{ + serverOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + }}, po), + } + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + }, + }, po)} + if err := ss.Start(sopts, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + var request *testpb.SimpleRequest + if test.opts.UnaryCompressedMessageSize != 0 { + request = &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }} + } + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics to be emitted, which should be able to be observed + // through the Metric Reader. + ss.Client.UnaryCall(ctx, request, grpc.UseCompressor(gzip.Name)) + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + opts := test.opts + opts.Target = ss.Target + wantMetrics := testutils.MetricDataUnary(opts) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + }) + } +} + +// TestCSMPluginOptionStreaming tests the CSM Plugin Option and labels. It +// configures the environment for the CSM Plugin Option to read from. It then +// configures a system with a gRPC Client and gRPC server with the OpenTelemetry +// Dial and Server Option configured with a CSM Plugin Option with a certain +// streaming handler set to induce different ways of setting metadata exchange +// labels, and makes a Streaming RPC. This RPC should cause certain recording +// for each registered metric observed through a Manual Metrics Reader on the +// provided OpenTelemetry SDK's Meter Provider. The CSM Labels emitted from the +// plugin option should be attached to the relevant metrics. +func (s) TestCSMPluginOptionStreaming(t *testing.T) { + resourceDetectorEmissions := map[string]string{ + "cloud.platform": "gcp_kubernetes_engine", + "cloud.region": "cloud_region_val", // availability_zone isn't present, so this should become location + "cloud.account.id": "cloud_account_id_val", + "k8s.namespace.name": "k8s_namespace_name_val", + "k8s.cluster.name": "k8s_cluster_name_val", + } + nodeID := "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" + csmCanonicalServiceName := "csm_canonical_service_name" + csmWorkloadName := "csm_workload_name" + setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) + + attributesWant := map[string]string{ + "csm.workload_canonical_service": csmCanonicalServiceName, // from env + "csm.mesh_id": "mesh_id", // from bootstrap env var + + // No xDS Labels - this happens in a test below. + + "csm.remote_workload_type": "gcp_kubernetes_engine", + "csm.remote_workload_canonical_service": csmCanonicalServiceName, + "csm.remote_workload_project_id": "cloud_account_id_val", + "csm.remote_workload_cluster_name": "k8s_cluster_name_val", + "csm.remote_workload_namespace_name": "k8s_namespace_name_val", + "csm.remote_workload_location": "cloud_region_val", + "csm.remote_workload_name": csmWorkloadName, + } + + var csmLabels []attribute.KeyValue + for k, v := range attributesWant { + csmLabels = append(csmLabels, attribute.String(k, v)) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + tests := []struct { + name string + // To test the different operations for Streaming RPC's from the + // interceptor level that can plumb metadata exchange header in. + streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error + opts testutils.MetricDataOptions + }{ + { + name: "trailers-only", streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { for { _, err := stream.Recv() @@ -165,21 +316,11 @@ func (s) TestCSMPluginOption(t *testing.T) { } }, opts: testutils.MetricDataOptions{ - CSMLabels: csmLabels, - UnaryMessageSent: false, - StreamingMessageSent: false, - UnaryCallFailed: true, + CSMLabels: csmLabels, }, }, { - name: "set-header-client-server-side", - unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - grpc.SetHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) - - return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), - }}, nil - }, + name: "set-header", streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { stream.SetHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) for { @@ -190,20 +331,11 @@ func (s) TestCSMPluginOption(t *testing.T) { } }, opts: testutils.MetricDataOptions{ - CSMLabels: csmLabels, - UnaryMessageSent: true, - StreamingMessageSent: false, + CSMLabels: csmLabels, }, }, { - name: "send-header-client-server-side", - unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - grpc.SendHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) - - return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), - }}, nil - }, + name: "send-header", streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { stream.SendHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) for { @@ -214,18 +346,11 @@ func (s) TestCSMPluginOption(t *testing.T) { } }, opts: testutils.MetricDataOptions{ - CSMLabels: csmLabels, - UnaryMessageSent: true, - StreamingMessageSent: false, + CSMLabels: csmLabels, }, }, { - name: "send-msg-client-server-side", - unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), - }}, nil - }, + name: "send-msg", streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { stream.Send(&testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{ Body: make([]byte, 10000), @@ -238,35 +363,42 @@ func (s) TestCSMPluginOption(t *testing.T) { } }, opts: testutils.MetricDataOptions{ - CSMLabels: csmLabels, - UnaryMessageSent: true, - StreamingMessageSent: true, + CSMLabels: csmLabels, + StreamingCompressedMessageSize: float64(57), }, }, } - for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mr, ss := setup(ctx, t, test.unaryCallFunc, test.streamingCallFunc, true, nil) - defer ss.Stop() - - var request *testpb.SimpleRequest - if test.opts.UnaryMessageSent { - request = &testpb.SimpleRequest{Payload: &testpb.Payload{ - Body: make([]byte, 10000), - }} + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + ss := &stubserver.StubServer{FullDuplexCallF: test.streamingCallFunc} + po := newPluginOption(ctx) + sopts := []grpc.ServerOption{ + serverOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + }}, po), + } + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + }, + }, po)} + if err := ss.Start(sopts, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) } + defer ss.Stop() - // Make two RPC's, a unary RPC and a streaming RPC. These should cause - // certain metrics to be emitted, which should be able to be observed - // through the Metric Reader. - ss.Client.UnaryCall(ctx, request, grpc.UseCompressor(gzip.Name)) stream, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name)) if err != nil { t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) } - if test.opts.StreamingMessageSent { + if test.opts.StreamingCompressedMessageSize != 0 { if err := stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{ Body: make([]byte, 10000), }}); err != nil { @@ -283,7 +415,7 @@ func (s) TestCSMPluginOption(t *testing.T) { } rm := &metricdata.ResourceMetrics{} - mr.Collect(ctx, rm) + reader.Collect(ctx, rm) gotMetrics := map[string]metricdata.Metrics{} for _, sm := range rm.ScopeMetrics { @@ -294,52 +426,12 @@ func (s) TestCSMPluginOption(t *testing.T) { opts := test.opts opts.Target = ss.Target - wantMetrics := testutils.MetricData(opts) - testutils.CompareMetrics(ctx, t, mr, gotMetrics, wantMetrics) + wantMetrics := testutils.MetricDataStreaming(opts) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) }) } } -// setup creates a stub server with the provided unary call and full duplex call -// handlers, alongside with OpenTelemetry component with a CSM Plugin Option -// configured on client and server side based off bool. It also takes in a unary -// interceptor to configure. It returns a reader for metrics emitted from the -// OpenTelemetry component and the stub server. -func setup(ctx context.Context, t *testing.T, unaryCallFunc func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error), streamingCallFunc func(stream testgrpc.TestService_FullDuplexCallServer) error, serverOTelConfigured bool, clientUnaryInterceptor grpc.UnaryClientInterceptor) (*metric.ManualReader, *stubserver.StubServer) { // specific for plugin option - reader := metric.NewManualReader() - provider := metric.NewMeterProvider( - metric.WithReader(reader), - ) - ss := &stubserver.StubServer{ - UnaryCallF: unaryCallFunc, - FullDuplexCallF: streamingCallFunc, - } - - po := newPluginOption(ctx) - var sopts []grpc.ServerOption - if serverOTelConfigured { - sopts = append(sopts, serverOptionWithCSMPluginOption(opentelemetry.Options{ - MetricsOptions: opentelemetry.MetricsOptions{ - MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, - }}, po)) - } - dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ - MetricsOptions: opentelemetry.MetricsOptions{ - MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, - OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism - }, - }, po)} - if clientUnaryInterceptor != nil { - dopts = append(dopts, grpc.WithUnaryInterceptor(clientUnaryInterceptor)) - } - if err := ss.Start(sopts, dopts...); err != nil { - t.Fatalf("Error starting endpoint server: %v", err) - } - return reader, ss -} - func unaryInterceptorAttachxDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = istats.SetLabels(ctx, &istats.Labels{ TelemetryLabels: map[string]string{ @@ -364,18 +456,35 @@ func unaryInterceptorAttachxDSLabels(ctx context.Context, method string, req, re func (s) TestxDSLabels(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - mr, ss := setup(ctx, t, func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), - }}, nil - }, nil, false, unaryInterceptorAttachxDSLabels) + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + } + + po := newPluginOption(ctx) + dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + }, + }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachxDSLabels)} + if err := ss.Start(nil, dopts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ Body: make([]byte, 10000), }}, grpc.UseCompressor(gzip.Name)) rm := &metricdata.ResourceMetrics{} - mr.Collect(ctx, rm) + reader.Collect(ctx, rm) gotMetrics := map[string]metricdata.Metrics{} for _, sm := range rm.ScopeMetrics { @@ -496,7 +605,7 @@ func (s) TestxDSLabels(t *testing.T) { }, } - testutils.CompareMetrics(ctx, t, mr, gotMetrics, wantMetrics) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) } // TestObservability tests that Observability global function compiles and runs @@ -504,5 +613,5 @@ func (s) TestxDSLabels(t *testing.T) { // interop tests. func (s) TestObservability(t *testing.T) { cleanup := EnableObservability(context.Background(), opentelemetry.Options{}) - defer cleanup() + cleanup() } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 34f5b8657c98..2890881bbd98 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -87,9 +87,9 @@ func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.Manua return reader, ss } -// TestMethodAttributeFilter tests the method attribute filter. The method and -// target filter set should bucket the grpc.method attribute into "other" if the -// method attribute filter specifies. +// TestMethodAttributeFilter tests the method attribute filter. The method +// filter set should bucket the grpc.method attribute into "other" if the method +// attribute filter specifies. func (s) TestMethodAttributeFilter(t *testing.T) { maf := func(str string) bool { // Will allow duplex/any other type of RPC. @@ -213,9 +213,8 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { } wantMetrics := testutils.MetricData(testutils.MetricDataOptions{ - Target: ss.Target, - UnaryMessageSent: true, - StreamingMessageSent: false, + Target: ss.Target, + UnaryCompressedMessageSize: float64(57), }) testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go index 5cb4805200fa..2be3921dae7b 100644 --- a/stats/opentelemetry/internal/testutils/testutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -77,17 +77,17 @@ func checkDataPointWithinFiveSeconds(metric metricdata.Metrics) error { } for _, dataPoint := range histo.DataPoints { var boundWithFive int - for i, bucket := range dataPoint.Bounds { - if bucket >= 5 { + for i, bound := range dataPoint.Bounds { + if bound >= 5 { boundWithFive = i } } foundPoint := false - for i, bucket := range dataPoint.BucketCounts { + for i, count := range dataPoint.BucketCounts { if i >= boundWithFive { return fmt.Errorf("data point not found in bucket <=5 seconds") } - if bucket == 1 { + if count == 1 { foundPoint = true break } @@ -108,19 +108,395 @@ type MetricDataOptions struct { CSMLabels []attribute.KeyValue // Target is the target of the client and server. Target string - // UnaryMessageSent is whether a message was sent for the unary RPC or not. - // This unary message is assumed to be 10000 bytes and the RPC is assumed to - // have a gzip compressor call option set. This assumes both client and peer - // sent a message. - UnaryMessageSent bool - // StreamingMessageSent is whether a message was sent for the streaming RPC - // or not. This unary message is assumed to be 10000 bytes and the RPC is - // assumed to have a gzip compressor call option set. This assumes both - // client and peer sent a message. - StreamingMessageSent bool // UnaryCallFailed is whether the Unary Call failed, which would trigger // trailers only. UnaryCallFailed bool + // UnaryCompressedMessageSize is the compressed message size of the Unary + // RPC. This assumes both client and server sent the same message size. + UnaryCompressedMessageSize float64 + // StreamingCompressedMessageSize is the compressed message size of the + // Streaming RPC. This assumes both client and server sent the same message + // size. + StreamingCompressedMessageSize float64 +} + +// createBucketCounts creates a list of bucket counts based off the +// recordingPoints and bounds. Both recordingPoints and bounds are assumed to be +// in order. +func createBucketCounts(recordingPoints []float64, bounds []float64) []uint64 { + var bucketCounts []uint64 + var recordingPointIndex int + for _, bound := range bounds { + var bucketCount uint64 + if recordingPointIndex >= len(recordingPoints) { + bucketCounts = append(bucketCounts, bucketCount) + continue + } + for recordingPoints[recordingPointIndex] <= bound { + bucketCount += 1 + recordingPointIndex += 1 + if recordingPointIndex >= len(recordingPoints) { + break + } + } + bucketCounts = append(bucketCounts, bucketCount) + } + // The rest of the recording points are last bound -> infinity. + bucketCounts = append(bucketCounts, uint64(len(recordingPoints)-recordingPointIndex)) + return bucketCounts +} + +// MetricDataUnary returns a list of expected metrics defined in A66 for a +// client and server for one unary RPC. +func MetricDataUnary(options MetricDataOptions) []metricdata.Metrics { + methodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + targetAttr := attribute.String("grpc.target", options.Target) + statusAttr := attribute.String("grpc.status", "OK") + if options.UnaryCallFailed { + statusAttr = attribute.String("grpc.status", "UNKNOWN") + } + clientSideEnd := []attribute.KeyValue{ + methodAttr, + targetAttr, + statusAttr, + } + serverSideEnd := []attribute.KeyValue{ + methodAttr, + statusAttr, + } + clientSideEnd = append(clientSideEnd, options.CSMLabels...) + serverSideEnd = append(serverSideEnd, options.CSMLabels...) + compressedBytesSentRecv := int64(options.UnaryCompressedMessageSize) + bucketCounts := createBucketCounts([]float64{options.UnaryCompressedMessageSize}, DefaultSizeBounds) + extrema := metricdata.NewExtrema(int64(options.UnaryCompressedMessageSize)) + return []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } +} + +// MetricDataStreaming returns a list of expected metrics defined in A66 for a +// client and server for one streaming RPC. +func MetricDataStreaming(options MetricDataOptions) []metricdata.Metrics { + methodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + targetAttr := attribute.String("grpc.target", options.Target) + statusAttr := attribute.String("grpc.status", "OK") + clientSideEnd := []attribute.KeyValue{ + methodAttr, + targetAttr, + statusAttr, + } + serverSideEnd := []attribute.KeyValue{ + methodAttr, + statusAttr, + } + clientSideEnd = append(clientSideEnd, options.CSMLabels...) + serverSideEnd = append(serverSideEnd, options.CSMLabels...) + compressedBytesSentRecv := int64(options.StreamingCompressedMessageSize) + bucketCounts := createBucketCounts([]float64{options.StreamingCompressedMessageSize}, DefaultSizeBounds) + extrema := metricdata.NewExtrema(int64(options.StreamingCompressedMessageSize)) + return []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(clientSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(methodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(methodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: bucketCounts, + Min: extrema, + Max: extrema, + Sum: compressedBytesSentRecv, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(serverSideEnd...), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } } // MetricData returns a metricsDataSlice for A66 metrics for client and server @@ -130,15 +506,12 @@ type MetricDataOptions struct { func MetricData(options MetricDataOptions) []metricdata.Metrics { unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") - targetAttr := attribute.String("grpc.target", options.Target) - unaryStatusAttr := attribute.String("grpc.status", "OK") streamingStatusAttr := attribute.String("grpc.status", "OK") if options.UnaryCallFailed { unaryStatusAttr = attribute.String("grpc.status", "UNKNOWN") } - unaryMethodClientSideEnd := []attribute.KeyValue{ unaryMethodAttr, targetAttr, @@ -153,7 +526,6 @@ func MetricData(options MetricDataOptions) []metricdata.Metrics { unaryMethodAttr, unaryStatusAttr, } - streamingMethodServerSideEnd := []attribute.KeyValue{ duplexMethodAttr, streamingStatusAttr, @@ -163,23 +535,13 @@ func MetricData(options MetricDataOptions) []metricdata.Metrics { streamingMethodClientSideEnd = append(streamingMethodClientSideEnd, options.CSMLabels...) unaryMethodServerSideEnd = append(unaryMethodServerSideEnd, options.CSMLabels...) streamingMethodServerSideEnd = append(streamingMethodServerSideEnd, options.CSMLabels...) - unaryCompressedBytesSentRecv := int64(0) - unaryBucketCounts := []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} - unaryExtrema := metricdata.NewExtrema(int64(0)) - if options.UnaryMessageSent { - unaryCompressedBytesSentRecv = 57 // Fixed 10000 bytes with gzip assumption. - unaryBucketCounts = []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} - unaryExtrema = metricdata.NewExtrema(int64(57)) - } + unaryCompressedBytesSentRecv := int64(options.UnaryCompressedMessageSize) + unaryBucketCounts := createBucketCounts([]float64{options.UnaryCompressedMessageSize}, DefaultSizeBounds) + unaryExtrema := metricdata.NewExtrema(int64(options.UnaryCompressedMessageSize)) - var streamingCompressedBytesSentRecv int64 - streamingBucketCounts := []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} - streamingExtrema := metricdata.NewExtrema(int64(0)) - if options.StreamingMessageSent { - streamingCompressedBytesSentRecv = 57 // Fixed 10000 bytes with gzip assumption. - streamingBucketCounts = []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} - streamingExtrema = metricdata.NewExtrema(int64(57)) - } + streamingCompressedBytesSentRecv := int64(options.StreamingCompressedMessageSize) + streamingBucketCounts := createBucketCounts([]float64{options.StreamingCompressedMessageSize}, DefaultSizeBounds) + streamingExtrema := metricdata.NewExtrema(int64(options.StreamingCompressedMessageSize)) return []metricdata.Metrics{ { From 235b84a648847899c8b13d3b557157cef3047e38 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 10 Jun 2024 11:48:33 -0400 Subject: [PATCH 7/8] Responded to Easwar's comments --- stats/opentelemetry/csm/observability_test.go | 56 +++++++++---------- stats/opentelemetry/e2e_test.go | 12 ++-- .../internal/testutils/testutils.go | 12 ++-- 3 files changed, 36 insertions(+), 44 deletions(-) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index b739ab7b77b0..607df7fb3e88 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -45,9 +45,7 @@ import ( // Env Vars as well, and mocks the resource detector's returned attribute set to // simulate the environment. It registers a cleanup function on the provided t // to restore the environment to it's original state. -func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID string, csmCanonicalServiceName string, csmWorkloadName string) { - clearEnv() - +func setupEnv(t *testing.T, resourceDetectorEmissions map[string]string, nodeID, csmCanonicalServiceName, csmWorkloadName string) { cleanup, err := bootstrap.CreateFile(bootstrap.Options{ NodeID: nodeID, ServerURI: "xds_server_uri", @@ -105,9 +103,9 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { "k8s.namespace.name": "k8s_namespace_name_val", "k8s.cluster.name": "k8s_cluster_name_val", } - nodeID := "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" - csmCanonicalServiceName := "csm_canonical_service_name" - csmWorkloadName := "csm_workload_name" + const nodeID = "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" + const csmCanonicalServiceName = "csm_canonical_service_name" + const csmWorkloadName = "csm_workload_name" setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) attributesWant := map[string]string{ @@ -142,7 +140,7 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { name: "normal-flow", unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, opts: testutils.MetricDataOptions{ @@ -166,7 +164,7 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { grpc.SetHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, opts: testutils.MetricDataOptions{ @@ -180,7 +178,7 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { grpc.SendHeader(ctx, metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, opts: testutils.MetricDataOptions{ @@ -192,7 +190,7 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { name: "send-msg", unaryCallFunc: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, opts: testutils.MetricDataOptions{ @@ -219,7 +217,7 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics, - OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, }, }, po)} if err := ss.Start(sopts, dopts...); err != nil { @@ -233,9 +231,9 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { Body: make([]byte, 10000), }} } - // Make two RPC's, a unary RPC and a streaming RPC. These should cause - // certain metrics to be emitted, which should be able to be observed - // through the Metric Reader. + // Make a Unary RPC. These should cause certain metrics to be + // emitted, which should be able to be observed through the Metric + // Reader. ss.Client.UnaryCall(ctx, request, grpc.UseCompressor(gzip.Name)) rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) @@ -272,9 +270,9 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { "k8s.namespace.name": "k8s_namespace_name_val", "k8s.cluster.name": "k8s_cluster_name_val", } - nodeID := "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" - csmCanonicalServiceName := "csm_canonical_service_name" - csmWorkloadName := "csm_workload_name" + const nodeID = "projects/12345/networks/mesh:mesh_id/nodes/aaaa-aaaa-aaaa-aaaa" + const csmCanonicalServiceName = "csm_canonical_service_name" + const csmWorkloadName = "csm_workload_name" setupEnv(t, resourceDetectorEmissions, nodeID, csmCanonicalServiceName, csmWorkloadName) attributesWant := map[string]string{ @@ -309,8 +307,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { name: "trailers-only", streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { for { - _, err := stream.Recv() - if err == io.EOF { + if _, err := stream.Recv(); err == io.EOF { return nil } } @@ -324,8 +321,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { stream.SetHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) for { - _, err := stream.Recv() - if err == io.EOF { + if _, err := stream.Recv(); err == io.EOF { return nil } } @@ -339,8 +335,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { streamingCallFunc: func(stream testgrpc.TestService_FullDuplexCallServer) error { stream.SendHeader(metadata.New(map[string]string{"some-metadata": "some-metadata-val"})) for { - _, err := stream.Recv() - if err == io.EOF { + if _, err := stream.Recv(); err == io.EOF { return nil } } @@ -356,8 +351,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { Body: make([]byte, 10000), }}) for { - _, err := stream.Recv() - if err == io.EOF { + if _, err := stream.Recv(); err == io.EOF { return nil } } @@ -385,7 +379,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics, - OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, }, }, po)} if err := ss.Start(sopts, dopts...); err != nil { @@ -411,7 +405,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } rm := &metricdata.ResourceMetrics{} @@ -432,7 +426,7 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { } } -func unaryInterceptorAttachxDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { +func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = istats.SetLabels(ctx, &istats.Labels{ TelemetryLabels: map[string]string{ // mock what the cluster impl would write here ("csm." xDS Labels) @@ -461,7 +455,7 @@ func (s) TestxDSLabels(t *testing.T) { ss := &stubserver.StubServer{ UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, } @@ -471,9 +465,9 @@ func (s) TestxDSLabels(t *testing.T) { MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics, - OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, // should be a no-op unless receive labels through optional labels mechanism + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, }, - }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachxDSLabels)} + }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)} if err := ss.Start(nil, dopts...); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 2890881bbd98..aa4734741dd8 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -58,7 +58,7 @@ func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.Manua ss := &stubserver.StubServer{ UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ - Body: make([]byte, 10000), + Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil }, FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { @@ -114,7 +114,7 @@ func (s) TestMethodAttributeFilter(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } rm := &metricdata.ResourceMetrics{} reader.Collect(ctx, rm) @@ -199,7 +199,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } rm := &metricdata.ResourceMetrics{} @@ -225,7 +225,7 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { stream.CloseSend() if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) } // This Invoke doesn't pass the StaticMethodCallOption. Thus, the method // attribute should become "other" on client side metrics. Since it is also @@ -299,10 +299,10 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { for _, metric := range wantMetrics { val, ok := gotMetrics[metric.Name] if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) } if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) } } } diff --git a/stats/opentelemetry/internal/testutils/testutils.go b/stats/opentelemetry/internal/testutils/testutils.go index 2be3921dae7b..10856409eeaf 100644 --- a/stats/opentelemetry/internal/testutils/testutils.go +++ b/stats/opentelemetry/internal/testutils/testutils.go @@ -59,9 +59,7 @@ func waitForServerCompletedRPCs(ctx context.Context, t *testing.T, reader metric if !ok { continue } - if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - continue - } + metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) return gotMetrics, nil } return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) @@ -769,7 +767,7 @@ func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, // stats.End show up. var err error if gotMetrics, err = waitForServerCompletedRPCs(ctx, t, mr, metric); err != nil { // move to shared helper - t.Fatalf("error waiting for sent total compressed message size for metric %v: %v", metric.Name, err) + t.Fatal(err) } continue } @@ -779,11 +777,11 @@ func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, // test due to context). val, ok := gotMetrics[metric.Name] if !ok { - t.Fatalf("metric %v not present in recorded metrics", metric.Name) + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) } if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) } if err := checkDataPointWithinFiveSeconds(val); err != nil { t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) @@ -792,7 +790,7 @@ func CompareMetrics(ctx context.Context, t *testing.T, mr *metric.ManualReader, } if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) } } } From aa1ed029861654069fa0e6e57391a03cabe16190 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 10 Jun 2024 14:12:18 -0400 Subject: [PATCH 8/8] Capitalize xDS --- stats/opentelemetry/csm/observability_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index 607df7fb3e88..efa97199f530 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -440,14 +440,14 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re return invoker(ctx, method, req, reply, cc, opts...) } -// TestxDSLabels tests that xDS Labels get emitted from OpenTelemetry metrics. +// TestXDSLabels tests that xDS Labels get emitted from OpenTelemetry metrics. // This test configures OpenTelemetry with the CSM Plugin Option, and xDS // Optional Labels turned on. It then configures an interceptor to attach // labels, representing the cluster_impl picker. It then makes a unary RPC, and // expects xDS Labels labels to be attached to emitted relevant metrics. Full // xDS System alongside OpenTelemetry will be tested with interop. (there is // a test for xDS -> Stats handler and this tests -> OTel -> emission). -func (s) TestxDSLabels(t *testing.T) { +func (s) TestXDSLabels(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() reader := metric.NewManualReader()