From 936924832b71f9f8c519c6c098f5ac0a166430f6 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Sat, 9 Nov 2024 11:11:22 +0530 Subject: [PATCH 1/5] fix(spanner): use spanner options when initializing monitoring exporter --- spanner/client.go | 2 +- spanner/metrics.go | 16 +++++++++++----- spanner/metrics_test.go | 16 ++++++++++------ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index a795e0f7a32c..6f048643f6df 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -508,7 +508,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf } // Create a OpenTelemetry metrics configuration - metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider) + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider, opts...) if err != nil { return nil, err } diff --git a/spanner/metrics.go b/spanner/metrics.go index cdc4b3df3d79..ad9005afeadf 100644 --- a/spanner/metrics.go +++ b/spanner/metrics.go @@ -167,7 +167,12 @@ var ( return "global" } - exporterOpts = []option.ClientOption{} + // GCM exporter should use the same options as Bigtable client + // createExporterOptions takes Spanner client options and returns exporter options + // Overwritten in tests + createExporterOptions = func(spannerOpts ...option.ClientOption) []option.ClientOption { + return spannerOpts + } ) type metricInfo struct { @@ -193,7 +198,7 @@ type builtinMetricsTracerFactory struct { attemptCount metric.Int64Counter // Counter for the number of attempts. } -func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider) (*builtinMetricsTracerFactory, error) { +func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) { clientUID, err := generateClientUID() if err != nil { log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID) @@ -224,7 +229,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP var meterProvider *sdkmetric.MeterProvider if metricsProvider == nil { // Create default meter provider - mpOptions, err := builtInMeterProviderOptions(project) + mpOptions, err := builtInMeterProviderOptions(project, opts...) if err != nil { return tracerFactory, err } @@ -247,8 +252,9 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP return tracerFactory, err } -func builtInMeterProviderOptions(project string) ([]sdkmetric.Option, error) { - defaultExporter, err := newMonitoringExporter(context.Background(), project, exporterOpts...) +func builtInMeterProviderOptions(project string, opts ...option.ClientOption) ([]sdkmetric.Option, error) { + allOpts := createExporterOptions(opts...) + defaultExporter, err := newMonitoringExporter(context.Background(), project, allOpts...) if err != nil { return nil, err } diff --git a/spanner/metrics_test.go b/spanner/metrics_test.go index eb6512c7ab02..2fc60e1682ec 100644 --- a/spanner/metrics_test.go +++ b/spanner/metrics_test.go @@ -89,14 +89,18 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { } go monitoringServer.Serve() defer monitoringServer.Shutdown() - origExporterOpts := exporterOpts - exporterOpts = []option.ClientOption{ - option.WithEndpoint(monitoringServer.Endpoint), - option.WithoutAuthentication(), - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + + // Override exporter options + origCreateExporterOptions := createExporterOptions + createExporterOptions = func(opts ...option.ClientOption) []option.ClientOption { + return []option.ClientOption{ + option.WithEndpoint(monitoringServer.Endpoint), // Connect to mock + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } } defer func() { - exporterOpts = origExporterOpts + createExporterOptions = origCreateExporterOptions }() tests := []struct { From 2415d713943918f2ed04d5ecbbc8a476f901e282 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Sat, 9 Nov 2024 12:43:53 +0530 Subject: [PATCH 2/5] fix test --- spanner/metrics_monitoring_exporter.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spanner/metrics_monitoring_exporter.go b/spanner/metrics_monitoring_exporter.go index b0d098274c51..bbf5d62d37ce 100644 --- a/spanner/metrics_monitoring_exporter.go +++ b/spanner/metrics_monitoring_exporter.go @@ -29,6 +29,7 @@ import ( monitoring "cloud.google.com/go/monitoring/apiv3/v2" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "cloud.google.com/go/spanner/internal" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/sdk/metric" otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -37,6 +38,7 @@ import ( googlemetricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -147,7 +149,7 @@ func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetr Name: name, TimeSeries: tss[i:j], } - err = me.client.CreateServiceTimeSeries(ctx, req) + err = me.client.CreateServiceTimeSeries(metadata.NewOutgoingContext(ctx, metadata.Pairs("x-goog-api-client", "gccl/"+internal.Version)), req) if err != nil { if status.Code(err) == codes.PermissionDenied { err = fmt.Errorf("%w Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/spanner/docs/view-manage-client-side-metrics#access-client-side-metrics to set up permissions", From 8aa36eaea702d51ed2da3c7751c8c0249372d3f6 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Tue, 12 Nov 2024 12:13:41 +0530 Subject: [PATCH 3/5] incorporate changes --- spanner/client_test.go | 2 +- spanner/metrics.go | 2 +- spanner/metrics_monitoring_exporter.go | 4 ++-- spanner/spannertest/integration_test.go | 5 +---- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 82b2a9a2d493..29b3323fbb32 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -76,7 +76,7 @@ func setupMockedTestServerWithConfigAndGCPMultiendpointPool(t *testing.T, config if len(token) != 1 { return status.Errorf(codes.Internal, "unexpected number of api client token headers: %v", len(token)) } - if !strings.HasPrefix(token[0], "gl-go/") { + if !strings.Contains(token[0], "gl-go/") { return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0]) } if !strings.Contains(token[0], "gccl/") { diff --git a/spanner/metrics.go b/spanner/metrics.go index ad9005afeadf..ffa57aa39d3e 100644 --- a/spanner/metrics.go +++ b/spanner/metrics.go @@ -167,7 +167,7 @@ var ( return "global" } - // GCM exporter should use the same options as Bigtable client + // GCM exporter should use the same options as Spanner client // createExporterOptions takes Spanner client options and returns exporter options // Overwritten in tests createExporterOptions = func(spannerOpts ...option.ClientOption) []option.ClientOption { diff --git a/spanner/metrics_monitoring_exporter.go b/spanner/metrics_monitoring_exporter.go index bbf5d62d37ce..d70365935f6e 100644 --- a/spanner/metrics_monitoring_exporter.go +++ b/spanner/metrics_monitoring_exporter.go @@ -30,6 +30,7 @@ import ( monitoring "cloud.google.com/go/monitoring/apiv3/v2" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "cloud.google.com/go/spanner/internal" + "github.com/googleapis/gax-go/v2/callctx" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/sdk/metric" otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -38,7 +39,6 @@ import ( googlemetricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -149,7 +149,7 @@ func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetr Name: name, TimeSeries: tss[i:j], } - err = me.client.CreateServiceTimeSeries(metadata.NewOutgoingContext(ctx, metadata.Pairs("x-goog-api-client", "gccl/"+internal.Version)), req) + err = me.client.CreateServiceTimeSeries(callctx.SetHeaders(ctx, "x-goog-api-client", "gccl/"+internal.Version), req) if err != nil { if status.Code(err) == codes.PermissionDenied { err = fmt.Errorf("%w Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/spanner/docs/view-manage-client-side-metrics#access-client-side-metrics to set up permissions", diff --git a/spanner/spannertest/integration_test.go b/spanner/spannertest/integration_test.go index 22165c0ba34d..dbdfa3ac8aeb 100644 --- a/spanner/spannertest/integration_test.go +++ b/spanner/spannertest/integration_test.go @@ -142,10 +142,7 @@ func makeClient(t *testing.T) (*spanner.Client, *dbadmin.DatabaseAdminClient, *v client, _, err = spanner.NewMultiEndpointClient(ctx, dbName(), gmeCfg, opts...) os.Setenv("SPANNER_EMULATOR_HOST", old) } else { - opts = append(opts, - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), - option.WithoutAuthentication(), - option.WithEndpoint(srv.Addr)) + opts = append(opts, option.WithGRPCConn(conn)) client, err = spanner.NewClient(ctx, dbName(), opts...) } if err != nil { From 0903a4a6651675f9e9034e771245440feffe0568 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Tue, 12 Nov 2024 12:34:16 +0530 Subject: [PATCH 4/5] fix tests --- spanner/client.go | 3 +-- spanner/metrics.go | 11 +++++------ spanner/metrics_monitoring_exporter.go | 26 +++++++++++++++----------- spanner/read_test.go | 2 +- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index 6f048643f6df..1e21f5934029 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -507,8 +507,7 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf metricsProvider = noop.NewMeterProvider() } - // Create a OpenTelemetry metrics configuration - metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider, opts...) + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, database, metricsProvider, config.Compression, opts...) if err != nil { return nil, err } diff --git a/spanner/metrics.go b/spanner/metrics.go index ffa57aa39d3e..a765e004f15a 100644 --- a/spanner/metrics.go +++ b/spanner/metrics.go @@ -21,11 +21,10 @@ import ( "errors" "fmt" "hash/fnv" - "strings" - "log" "os" "strconv" + "strings" "time" "github.com/google/uuid" @@ -198,7 +197,7 @@ type builtinMetricsTracerFactory struct { attemptCount metric.Int64Counter // Counter for the number of attempts. } -func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) { +func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsProvider metric.MeterProvider, compression string, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) { clientUID, err := generateClientUID() if err != nil { log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID) @@ -229,7 +228,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP var meterProvider *sdkmetric.MeterProvider if metricsProvider == nil { // Create default meter provider - mpOptions, err := builtInMeterProviderOptions(project, opts...) + mpOptions, err := builtInMeterProviderOptions(project, compression, opts...) if err != nil { return tracerFactory, err } @@ -252,9 +251,9 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath string, metricsP return tracerFactory, err } -func builtInMeterProviderOptions(project string, opts ...option.ClientOption) ([]sdkmetric.Option, error) { +func builtInMeterProviderOptions(project, compression string, opts ...option.ClientOption) ([]sdkmetric.Option, error) { allOpts := createExporterOptions(opts...) - defaultExporter, err := newMonitoringExporter(context.Background(), project, allOpts...) + defaultExporter, err := newMonitoringExporter(context.Background(), project, compression, allOpts...) if err != nil { return nil, err } diff --git a/spanner/metrics_monitoring_exporter.go b/spanner/metrics_monitoring_exporter.go index d70365935f6e..d776ae62bf86 100644 --- a/spanner/metrics_monitoring_exporter.go +++ b/spanner/metrics_monitoring_exporter.go @@ -39,6 +39,7 @@ import ( googlemetricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/grpc/codes" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -76,21 +77,22 @@ func (e errUnexpectedAggregationKind) Error() string { // Google Cloud Monitoring. // Default exporter for built-in metrics type monitoringExporter struct { - shutdown chan struct{} - client *monitoring.MetricClient - shutdownOnce sync.Once - projectID string + shutdown chan struct{} + client *monitoring.MetricClient + shutdownOnce sync.Once + projectID, compression string } -func newMonitoringExporter(ctx context.Context, project string, opts ...option.ClientOption) (*monitoringExporter, error) { +func newMonitoringExporter(ctx context.Context, project, compression string, opts ...option.ClientOption) (*monitoringExporter, error) { client, err := monitoring.NewMetricClient(ctx, opts...) if err != nil { return nil, err } return &monitoringExporter{ - client: client, - shutdown: make(chan struct{}), - projectID: project, + client: client, + shutdown: make(chan struct{}), + projectID: project, + compression: compression, }, nil } @@ -137,19 +139,21 @@ func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetr } name := fmt.Sprintf("projects/%s", me.projectID) - + ctx = callctx.SetHeaders(ctx, "x-goog-api-client", "gccl/"+internal.Version) + if me.compression == gzip.Name { + ctx = callctx.SetHeaders(ctx, requestsCompressionHeader, gzip.Name) + } errs := []error{err} for i := 0; i < len(tss); i += sendBatchSize { j := i + sendBatchSize if j >= len(tss) { j = len(tss) } - req := &monitoringpb.CreateTimeSeriesRequest{ Name: name, TimeSeries: tss[i:j], } - err = me.client.CreateServiceTimeSeries(callctx.SetHeaders(ctx, "x-goog-api-client", "gccl/"+internal.Version), req) + err = me.client.CreateServiceTimeSeries(ctx, req) if err != nil { if status.Code(err) == codes.PermissionDenied { err = fmt.Errorf("%w Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/spanner/docs/view-manage-client-side-metrics#access-client-side-metrics to set up permissions", diff --git a/spanner/read_test.go b/spanner/read_test.go index 99ab2362a1ee..9f6f3602595b 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -1778,7 +1778,7 @@ func TestIteratorStopEarly(t *testing.T) { } func TestIteratorWithError(t *testing.T) { - metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider()) + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(context.Background(), "projects/my-project/instances/my-instance/databases/my-database", noop.NewMeterProvider(), "identity") if err != nil { t.Fatalf("failed to create metrics tracer factory: %v", err) } From 3b31f223da4a668efc125bcc2328851f5b09a7a3 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Tue, 12 Nov 2024 22:14:13 +0530 Subject: [PATCH 5/5] format struct --- spanner/metrics_monitoring_exporter.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spanner/metrics_monitoring_exporter.go b/spanner/metrics_monitoring_exporter.go index d776ae62bf86..8eb79323fdd0 100644 --- a/spanner/metrics_monitoring_exporter.go +++ b/spanner/metrics_monitoring_exporter.go @@ -77,10 +77,11 @@ func (e errUnexpectedAggregationKind) Error() string { // Google Cloud Monitoring. // Default exporter for built-in metrics type monitoringExporter struct { - shutdown chan struct{} - client *monitoring.MetricClient - shutdownOnce sync.Once - projectID, compression string + shutdown chan struct{} + client *monitoring.MetricClient + shutdownOnce sync.Once + projectID string + compression string } func newMonitoringExporter(ctx context.Context, project, compression string, opts ...option.ClientOption) (*monitoringExporter, error) {