Skip to content

Commit

Permalink
stats/opentelemetry: Add usage of metrics registry (grpc#7410)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored and printchard committed Jul 30, 2024
1 parent 192dc41 commit f7c3afa
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 41 deletions.
9 changes: 9 additions & 0 deletions experimental/stats/metricregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ import (
"testing"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
)

func init() {
internal.SnapshotMetricRegistryForTesting = snapshotMetricsRegistryForTesting
}

var logger = grpclog.Component("metrics-registry")

// DefaultMetrics are the default metrics registered through global metrics
Expand Down Expand Up @@ -54,6 +59,10 @@ type MetricDescriptor struct {
// The type of metric. This is set by the metric registry, and not intended
// to be set by a component registering a metric.
Type MetricType
// Bounds are the bounds of this metric. This only applies to histogram
// metrics. If unset or set with length 0, stats handlers will fall back to
// default bounds.
Bounds []float64
}

// MetricType is the type of metric.
Expand Down
6 changes: 6 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ var (

// SetConnectedAddress sets the connected address for a SubConnState.
SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address)

// SnapshotMetricRegistryForTesting snapshots the global data of the metric
// registry. Registers a cleanup function on the provided testing.T that
// sets the metric registry to its original state. Only called in testing
// functions.
SnapshotMetricRegistryForTesting any // func(t *testing.T)
)

// HealthChecker defines the signature of the client-side LB channel health
Expand Down
12 changes: 9 additions & 3 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
)

type clientStatsHandler struct {
options Options

estats.MetricsRecorder
options Options
clientMetrics clientMetrics
}

Expand All @@ -52,14 +52,20 @@ func (h *clientStatsHandler) initializeMetrics() {

metrics := h.options.MetricsOptions.Metrics
if metrics == nil {
metrics = DefaultMetrics
metrics = DefaultMetrics()
}

h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))

rm := &registryMetrics{
optionalLabels: h.options.MetricsOptions.OptionalLabels,
}
h.MetricsRecorder = rm
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down
10 changes: 5 additions & 5 deletions stats/opentelemetry/csm/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) {
serverOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
}}, po),
}
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
},
}, po)}
Expand Down Expand Up @@ -368,13 +368,13 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) {
serverOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
}}, po),
}
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
},
}, po)}
Expand Down Expand Up @@ -460,7 +460,7 @@ func (s) TestXDSLabels(t *testing.T) {
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
},
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
Expand Down
8 changes: 3 additions & 5 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ func Test(t *testing.T) {
// component and the server.
func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) {
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(
metric.WithReader(reader),
)
provider := metric.NewMeterProvider(metric.WithReader(reader))
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Expand All @@ -74,12 +72,12 @@ func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.Manua
if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
MethodAttributeFilter: methodAttributeFilter,
}})}, opentelemetry.DialOption(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics,
Metrics: opentelemetry.DefaultMetrics(),
},
})); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions stats/opentelemetry/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package opentelemetry_test
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
stats2 "google.golang.org/grpc/experimental/stats"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/stats/opentelemetry"

"go.opentelemetry.io/otel/sdk/metric"
Expand Down Expand Up @@ -53,7 +53,7 @@ func Example_dialOption() {
opts := opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics, // equivalent to unset - distinct from empty
Metrics: opentelemetry.DefaultMetrics(), // equivalent to unset - distinct from empty
},
}
do := opentelemetry.DialOption(opts)
Expand Down Expand Up @@ -88,7 +88,7 @@ func ExampleMetrics_excludeSome() {
// To exclude specific metrics, initialize Options as follows:
opts := opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
Metrics: opentelemetry.DefaultMetrics.Remove(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize),
Metrics: opentelemetry.DefaultMetrics().Remove(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize),
},
}
do := opentelemetry.DialOption(opts)
Expand All @@ -103,7 +103,7 @@ func ExampleMetrics_disableAll() {
// To disable all metrics, initialize Options as follows:
opts := opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
Metrics: stats2.NewMetrics(), // Distinct to nil, which creates default metrics. This empty set creates no metrics.
Metrics: estats.NewMetrics(), // Distinct to nil, which creates default metrics. This empty set creates no metrics.
},
}
do := opentelemetry.DialOption(opts)
Expand All @@ -118,7 +118,7 @@ func ExampleMetrics_enableSome() {
// To only create specific metrics, initialize Options as follows:
opts := opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
Metrics: stats2.NewMetrics(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), // only create these metrics
Metrics: estats.NewMetrics(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), // only create these metrics
},
}
do := opentelemetry.DialOption(opts)
Expand Down
Loading

0 comments on commit f7c3afa

Please sign in to comment.