diff --git a/.chloggen/revert-10931.yaml b/.chloggen/revert-10931.yaml new file mode 100644 index 00000000000..fb01b3dc9f8 --- /dev/null +++ b/.chloggen/revert-10931.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate `TelemetrySettings.LeveledMeterProvider` and undo deprecation of `TelemetrySettings.MeterProvider` + +# One or more tracking issues or pull requests related to the change +issues: [11061] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/mdatagen/internal/command_test.go b/cmd/mdatagen/internal/command_test.go index 27358666ed4..a143da7ec02 100644 --- a/cmd/mdatagen/internal/command_test.go +++ b/cmd/mdatagen/internal/command_test.go @@ -604,11 +604,11 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("") } @@ -642,11 +642,11 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("") } diff --git a/cmd/mdatagen/internal/samplereceiver/generated_component_telemetry_test.go b/cmd/mdatagen/internal/samplereceiver/generated_component_telemetry_test.go index ea344d16f51..64fd8e37ebf 100644 --- a/cmd/mdatagen/internal/samplereceiver/generated_component_telemetry_test.go +++ b/cmd/mdatagen/internal/samplereceiver/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() receiver.Settings { set := receivertest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("sample")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go index 8b64677d4d4..0ec05adee17 100644 --- a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go +++ b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go @@ -7,17 +7,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/internal/receiver/samplereceiver") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/internal/receiver/samplereceiver") } @@ -35,7 +36,6 @@ type TelemetryBuilder struct { observeProcessRuntimeTotalAllocBytes func(context.Context, metric.Observer) error QueueLength metric.Int64ObservableGauge RequestDuration metric.Float64Histogram - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -62,7 +62,7 @@ func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.O // InitQueueLength configures the QueueLength metric. func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) error { var err error - builder.QueueLength, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge( + builder.QueueLength, err = builder.meter.Int64ObservableGauge( "otelcol_queue_length", metric.WithDescription("This metric is optional and therefore not initialized in NewTelemetryBuilder."), metric.WithUnit("{items}"), @@ -70,7 +70,7 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric if err != nil { return err } - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(func(_ context.Context, o metric.Observer) error { + _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.QueueLength, cb(), opts...) return nil }, builder.QueueLength) @@ -80,27 +80,27 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.BatchSizeTriggerSend, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.BatchSizeTriggerSend, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_batch_size_trigger_send", metric.WithDescription("Number of times the batch was sent due to a size trigger [deprecated since v0.110.0]"), metric.WithUnit("{times}"), ) errs = errors.Join(errs, err) - builder.ProcessRuntimeTotalAllocBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableCounter( + builder.ProcessRuntimeTotalAllocBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableCounter( "otelcol_process_runtime_total_alloc_bytes", metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), metric.WithUnit("By"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes) errs = errors.Join(errs, err) - builder.RequestDuration, err = builder.meters[configtelemetry.LevelBasic].Float64Histogram( + builder.RequestDuration, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Float64Histogram( "otelcol_request_duration", metric.WithDescription("Duration of request [alpha]"), metric.WithUnit("s"), @@ -109,3 +109,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl b/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl index 405dd6b4f02..3aa5db213aa 100644 --- a/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl +++ b/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl @@ -31,8 +31,8 @@ type componentTestTelemetry struct { {{- if or isConnector isExporter isExtension isProcessor isReceiver }} func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings { set := {{ .Status.Class }}test.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("{{ .Type }}")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } {{- end }} @@ -40,6 +40,7 @@ func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings { func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/cmd/mdatagen/internal/templates/telemetry.go.tmpl b/cmd/mdatagen/internal/templates/telemetry.go.tmpl index b98122fa1d1..235d4bea59f 100644 --- a/cmd/mdatagen/internal/templates/telemetry.go.tmpl +++ b/cmd/mdatagen/internal/templates/telemetry.go.tmpl @@ -20,11 +20,11 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("{{ .ScopeName }}") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("{{ .ScopeName }}") } @@ -44,7 +44,6 @@ type TelemetryBuilder struct { observe{{ $name.Render }} func(context.Context, metric.Observer) error {{- end }} {{- end }} - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -64,7 +63,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { // Init{{ $name.Render }} configures the {{ $name.Render }} metric. func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) error { var err error - builder.{{ $name.Render }}, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].{{ $metric.Data.Instrument }}( + builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}( "otelcol_{{ $name }}", metric.WithDescription("{{ $metric.Description }}"), metric.WithUnit("{{ $metric.Unit }}"), @@ -76,7 +75,7 @@ func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async if err != nil { return err } - _, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].RegisterCallback(func(_ context.Context, o metric.Observer) error { + _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.Observe{{ casesTitle $metric.Data.BasicType }}(builder.{{ $name.Render }}, cb(), opts...) return nil }, builder.{{ $name.Render }}) @@ -103,18 +102,16 @@ func With{{ $name.Render }}Callback(cb func() {{ $metric.Data.BasicType }}, opts // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - {{- range $level, $val := .Telemetry.Levels }} - builder.meters[configtelemetry.Level{{ casesTitle $level }}] = LeveledMeter(settings, configtelemetry.Level{{ casesTitle $level }}) - {{- end }} + builder.meter = Meter(settings) var err, errs error {{- range $name, $metric := .Telemetry.Metrics }} {{- if not $metric.Optional }} - builder.{{ $name.Render }}, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].{{ $metric.Data.Instrument }}( + builder.{{ $name.Render }}, err = getLeveledMeter(builder.meter, configtelemetry.Level{{ $metric.Level }}, settings.MetricsLevel).{{ $metric.Data.Instrument }}( "otelcol_{{ $name }}", metric.WithDescription("{{ $metric.Description }}{{ $metric.Stability }}"), metric.WithUnit("{{ $metric.Unit }}"), @@ -124,7 +121,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme ) errs = errors.Join(errs, err) {{- if $metric.Data.Async }} - _, err = builder.meters[configtelemetry.Level{{ casesTitle $metric.Level.String }}].RegisterCallback(builder.observe{{ $name.Render }}, builder.{{ $name.Render }}) + _, err = getLeveledMeter(builder.meter, configtelemetry.Level{{ $metric.Level }}, settings.MetricsLevel).RegisterCallback(builder.observe{{ $name.Render }}, builder.{{ $name.Render }}) errs = errors.Join(errs, err) {{- end }} {{- end }} @@ -132,4 +129,11 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme return &builder, errs } +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} + {{- end }} diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index 353b039d13e..f666e69fd97 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -5,6 +5,7 @@ package componenttest // import "go.opentelemetry.io/collector/component/compone import ( "context" + "errors" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -12,10 +13,11 @@ import ( "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.uber.org/multierr" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pdata/pcommon" ) const ( @@ -32,12 +34,10 @@ const ( ) type TestTelemetry struct { - ts component.TelemetrySettings id component.ID + ts component.TelemetrySettings SpanRecorder *tracetest.SpanRecorder - - reader *sdkmetric.ManualReader - meterProvider *sdkmetric.MeterProvider + reader *sdkmetric.ManualReader } // CheckExporterTraces checks that for the current exported values for trace exporter metrics match given values. @@ -101,12 +101,9 @@ func (tts *TestTelemetry) CheckScraperMetrics(receiver component.ID, scraper com // Shutdown unregisters any views and shuts down the SpanRecorder func (tts *TestTelemetry) Shutdown(ctx context.Context) error { - var errs error - errs = multierr.Append(errs, tts.SpanRecorder.Shutdown(ctx)) - if tts.meterProvider != nil { - errs = multierr.Append(errs, tts.meterProvider.Shutdown(ctx)) - } - return errs + return errors.Join( + tts.ts.TracerProvider.(*sdktrace.TracerProvider).Shutdown(ctx), + tts.ts.MeterProvider.(*sdkmetric.MeterProvider).Shutdown(ctx)) } // TelemetrySettings returns the TestTelemetry's TelemetrySettings @@ -118,23 +115,26 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings { // The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods. // The caller must defer a call to `Shutdown` on the returned TestTelemetry. func SetupTelemetry(id component.ID) (TestTelemetry, error) { - sr := new(tracetest.SpanRecorder) - tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) - settings := TestTelemetry{ - ts: NewNopTelemetrySettings(), id: id, - SpanRecorder: sr, + reader: sdkmetric.NewManualReader(), + SpanRecorder: new(tracetest.SpanRecorder), } - settings.ts.TracerProvider = tp - settings.reader = sdkmetric.NewManualReader() - settings.meterProvider = sdkmetric.NewMeterProvider( + mp := sdkmetric.NewMeterProvider( sdkmetric.WithResource(resource.Empty()), sdkmetric.WithReader(settings.reader), ) - settings.ts.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { - return settings.meterProvider + + settings.ts = component.TelemetrySettings{ + Logger: zap.NewNop(), + TracerProvider: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(settings.SpanRecorder)), + MeterProvider: mp, + LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { + return mp + }, + MetricsLevel: configtelemetry.LevelDetailed, + Resource: pcommon.NewResource(), } return settings, nil diff --git a/component/telemetry.go b/component/telemetry.go index ffb3ac25411..81fb0afb42a 100644 --- a/component/telemetry.go +++ b/component/telemetry.go @@ -22,12 +22,11 @@ type TelemetrySettings struct { TracerProvider trace.TracerProvider // MeterProvider that the factory can pass to other instrumented third-party libraries. - // - // Deprecated [v0.109.0]: use LeveledMeterProvider instead. MeterProvider metric.MeterProvider // LeveledMeterProvider returns a MeterProvider for a Level that the factory can // pass to other instrumented third-party libraries. + // Deprecated [v0.114.0]: use MeterProvider instead. LeveledMeterProvider func(level configtelemetry.Level) metric.MeterProvider // MetricsLevel represents the configuration value set when the collector diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 1b1b15963ca..5def5bbb0a1 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -16,6 +16,8 @@ import ( "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" @@ -337,7 +339,7 @@ func (gcs *ClientConfig) getGrpcDialOptions( otelOpts := []otelgrpc.Option{ otelgrpc.WithTracerProvider(settings.TracerProvider), otelgrpc.WithPropagators(otel.GetTextMapPropagator()), - otelgrpc.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)), + otelgrpc.WithMeterProvider(getLeveledMeterProvider(settings)), } // Enable OpenTelemetry observability plugin. @@ -481,7 +483,7 @@ func (gss *ServerConfig) getGrpcServerOptions( otelOpts := []otelgrpc.Option{ otelgrpc.WithTracerProvider(settings.TracerProvider), otelgrpc.WithPropagators(otel.GetTextMapPropagator()), - otelgrpc.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)), + otelgrpc.WithMeterProvider(getLeveledMeterProvider(settings)), } // Enable OpenTelemetry observability plugin. @@ -575,3 +577,10 @@ func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.Stre return handler(srv, wrapServerStream(ctx, stream)) } + +func getLeveledMeterProvider(settings component.TelemetrySettings) metric.MeterProvider { + if configtelemetry.LevelDetailed <= settings.MetricsLevel { + return settings.MeterProvider + } + return noop.MeterProvider{} +} diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index c2e8bcf1269..47e79155d58 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -20,6 +20,7 @@ require ( go.opentelemetry.io/collector/pdata/testdata v0.113.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 go.opentelemetry.io/otel v1.32.0 + go.opentelemetry.io/otel/metric v1.32.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.67.1 @@ -40,7 +41,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/extension v0.113.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.113.0 // indirect - go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 3f70b64bf5e..69ac3900fe5 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -18,6 +18,8 @@ import ( "github.com/rs/cors" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "golang.org/x/net/http2" "golang.org/x/net/publicsuffix" @@ -226,7 +228,7 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett otelOpts := []otelhttp.Option{ otelhttp.WithTracerProvider(settings.TracerProvider), otelhttp.WithPropagators(otel.GetTextMapPropagator()), - otelhttp.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)), + otelhttp.WithMeterProvider(getLeveledMeterProvider(settings)), } // wrapping http transport with otelhttp transport to enable otel instrumentation if settings.TracerProvider != nil && settings.MeterProvider != nil { @@ -465,7 +467,7 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { return r.URL.Path }), - otelhttp.WithMeterProvider(settings.LeveledMeterProvider(configtelemetry.LevelDetailed)), + otelhttp.WithMeterProvider(getLeveledMeterProvider(settings)), } // Enable OpenTelemetry observability plugin. @@ -553,3 +555,10 @@ func maxRequestBodySizeInterceptor(next http.Handler, maxRecvSize int64) http.Ha next.ServeHTTP(w, r) }) } + +func getLeveledMeterProvider(settings component.TelemetrySettings) metric.MeterProvider { + if configtelemetry.LevelDetailed <= settings.MetricsLevel { + return settings.MeterProvider + } + return noop.MeterProvider{} +} diff --git a/exporter/exporterhelper/generated_component_telemetry_test.go b/exporter/exporterhelper/generated_component_telemetry_test.go index ee107fbb7f4..86c7b77619a 100644 --- a/exporter/exporterhelper/generated_component_telemetry_test.go +++ b/exporter/exporterhelper/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() exporter.Settings { set := exportertest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("exporterhelper")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index b61d6cda6f2..800e4a759ca 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -7,17 +7,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/exporter/exporterhelper") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/exporter/exporterhelper") } @@ -41,7 +42,6 @@ type TelemetryBuilder struct { ExporterSentLogRecords metric.Int64Counter ExporterSentMetricPoints metric.Int64Counter ExporterSentSpans metric.Int64Counter - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -58,7 +58,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { // InitExporterQueueCapacity configures the ExporterQueueCapacity metric. func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) error { var err error - builder.ExporterQueueCapacity, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge( + builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_capacity", metric.WithDescription("Fixed capacity of the retry queue (in batches)"), metric.WithUnit("{batches}"), @@ -66,7 +66,7 @@ func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts if err != nil { return err } - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(func(_ context.Context, o metric.Observer) error { + _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.ExporterQueueCapacity, cb(), opts...) return nil }, builder.ExporterQueueCapacity) @@ -76,7 +76,7 @@ func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts // InitExporterQueueSize configures the ExporterQueueSize metric. func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) error { var err error - builder.ExporterQueueSize, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge( + builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_size", metric.WithDescription("Current size of the retry queue (in batches)"), metric.WithUnit("{batches}"), @@ -84,7 +84,7 @@ func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ... if err != nil { return err } - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(func(_ context.Context, o metric.Observer) error { + _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.ExporterQueueSize, cb(), opts...) return nil }, builder.ExporterQueueSize) @@ -94,61 +94,61 @@ func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ... // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.ExporterEnqueueFailedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterEnqueueFailedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_enqueue_failed_log_records", metric.WithDescription("Number of log records failed to be added to the sending queue."), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ExporterEnqueueFailedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterEnqueueFailedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_enqueue_failed_metric_points", metric.WithDescription("Number of metric points failed to be added to the sending queue."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ExporterEnqueueFailedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterEnqueueFailedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_enqueue_failed_spans", metric.WithDescription("Number of spans failed to be added to the sending queue."), metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) - builder.ExporterSendFailedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterSendFailedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_send_failed_log_records", metric.WithDescription("Number of log records in failed attempts to send to destination."), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ExporterSendFailedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterSendFailedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_send_failed_metric_points", metric.WithDescription("Number of metric points in failed attempts to send to destination."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ExporterSendFailedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterSendFailedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_send_failed_spans", metric.WithDescription("Number of spans in failed attempts to send to destination."), metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) - builder.ExporterSentLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterSentLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_sent_log_records", metric.WithDescription("Number of log record successfully sent to destination."), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ExporterSentMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterSentMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_sent_metric_points", metric.WithDescription("Number of metric points successfully sent to destination."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ExporterSentSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ExporterSentSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_exporter_sent_spans", metric.WithDescription("Number of spans successfully sent to destination."), metric.WithUnit("{spans}"), @@ -156,3 +156,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index db80f457a50..5791ddbd694 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -19,7 +19,6 @@ import ( "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -85,11 +84,9 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) requestCount := 1000 spansPerRequest := 100 @@ -101,14 +98,14 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) } td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) } // Added to test logic that check for empty resources. td := ptrace.NewTraces() - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + assert.NoError(t, traces.ConsumeTraces(context.Background(), td)) - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) receivedTraces := sink.AllTraces() @@ -128,11 +125,9 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) requestCount := 1000 spansPerRequest := 150 @@ -142,12 +137,12 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) } - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) } // Added to test logic that check for empty resources. td := ptrace.NewTraces() - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) // wait for all spans to be reported for { @@ -157,7 +152,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { <-time.After(cfg.Timeout) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) for i := 0; i < len(sink.AllTraces())-1; i++ { @@ -182,21 +177,20 @@ func TestBatchProcessorSentBySize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = sendBatchSize cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + + traces, err := NewFactory().CreateTraces(context.Background(), tel.NewSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) start := time.Now() sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) elapsed := time.Since(start) require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) @@ -285,6 +279,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { @@ -303,21 +298,20 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + + traces, err := NewFactory().CreateTraces(context.Background(), tel.NewSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) start := time.Now() sizeSum := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) elapsed := time.Since(start) require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) @@ -425,6 +419,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestBatchProcessorSentByTimeout(t *testing.T) { @@ -438,15 +433,13 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { spansPerRequest := 10 start := time.Now() - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) } // Wait for at least one batch to be sent. @@ -461,7 +454,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds()) // This should not change the results in the sink, verified by the expectedBatchesNum - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) expectedBatchesNum := 1 expectedBatchingFactor := 5 @@ -479,26 +472,24 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { } func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { - cfg := Config{ + cfg := &Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, } sink := new(consumertest.TracesSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, &cfg) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) requestCount := 10 spansPerRequest := 10 for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + require.NoError(t, traces.ConsumeTraces(context.Background(), td)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) require.Len(t, sink.AllTraces(), 1) @@ -507,7 +498,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { func TestBatchMetricProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. - cfg := Config{ + cfg := &Config{ Timeout: 200 * time.Millisecond, SendBatchSize: 50, } @@ -516,38 +507,36 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { metricsPerRequest := 5 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics() for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) - metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + ms := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ { - metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex)) + ms.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex)) } md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) - require.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + require.NoError(t, metrics.ConsumeMetrics(context.Background(), md)) } // Added to test case with empty resources sent. md := pmetric.NewMetrics() - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, metrics.ConsumeMetrics(context.Background(), md)) - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, metrics.Shutdown(context.Background())) require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount()) receivedMds := sink.AllMetrics() metricsReceivedByName := metricsReceivedByName(receivedMds) for requestNum := 0; requestNum < requestCount; requestNum++ { - metrics := sentResourceMetrics.At(requestNum).ScopeMetrics().At(0).Metrics() + ms := sentResourceMetrics.At(requestNum).ScopeMetrics().At(0).Metrics() for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ { require.EqualValues(t, - metrics.At(metricIndex), + ms.At(metricIndex), metricsReceivedByName[getTestMetricName(requestNum, metricIndex)]) } } @@ -559,7 +548,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. - cfg := Config{ + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 50, } @@ -572,20 +561,18 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { ) sink := new(consumertest.MetricsSink) - creationSet := tel.NewSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) + metrics, err := NewFactory().CreateMetrics(context.Background(), tel.NewSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) start := time.Now() size := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) size += sizer.MetricsSize(md) - require.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + require.NoError(t, metrics.ConsumeMetrics(context.Background(), md)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, metrics.Shutdown(context.Background())) elapsed := time.Since(start) require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) @@ -675,6 +662,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { @@ -698,7 +686,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { } func TestBatchMetricsProcessor_Timeout(t *testing.T) { - cfg := Config{ + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 101, } @@ -706,16 +694,14 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) start := time.Now() for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) - require.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + require.NoError(t, metrics.ConsumeMetrics(context.Background(), md)) } // Wait for at least one batch to be sent. @@ -730,7 +716,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds()) // This should not change the results in the sink, verified by the expectedBatchesNum - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, metrics.Shutdown(context.Background())) expectedBatchesNum := 1 expectedBatchingFactor := 5 @@ -747,7 +733,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { } func TestBatchMetricProcessor_Shutdown(t *testing.T) { - cfg := Config{ + cfg := &Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, } @@ -755,18 +741,16 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) - require.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + require.NoError(t, metrics.ConsumeMetrics(context.Background(), md)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, metrics.Shutdown(context.Background())) require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount()) require.Len(t, sink.AllMetrics(), 1) @@ -833,7 +817,7 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) { func BenchmarkBatchMetricProcessor(b *testing.B) { b.StopTimer() - cfg := Config{ + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 2000, } @@ -842,7 +826,7 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { func BenchmarkMultiBatchMetricProcessor(b *testing.B) { b.StopTimer() - cfg := Config{ + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 2000, MetadataKeys: []string{"test", "test2"}, @@ -850,24 +834,22 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { runMetricsProcessorBenchmark(b, cfg) } -func runMetricsProcessorBenchmark(b *testing.B, cfg Config) { +func runMetricsProcessorBenchmark(b *testing.B, cfg *Config) { ctx := context.Background() sink := new(metricsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - metricsPerRequest := 1000 - batcher, err := newMetricsBatchProcessor(creationSet, sink, &cfg) + metrics, err := NewFactory().CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(b, err) - require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost())) + require.NoError(b, metrics.Start(ctx, componenttest.NewNopHost())) + const metricsPerRequest = 1000 b.StartTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - require.NoError(b, batcher.ConsumeMetrics(ctx, testdata.GenerateMetrics(metricsPerRequest))) + require.NoError(b, metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(metricsPerRequest))) } }) b.StopTimer() - require.NoError(b, batcher.Shutdown(ctx)) + require.NoError(b, metrics.Shutdown(ctx)) require.Equal(b, b.N*metricsPerRequest, sink.metricsCount) } @@ -892,7 +874,7 @@ func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) er func TestBatchLogProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. - cfg := Config{ + cfg := &Config{ Timeout: 200 * time.Millisecond, SendBatchSize: 50, } @@ -901,38 +883,36 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) sentResourceLogs := plog.NewLogs().ResourceLogs() for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) - logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + lrs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() for logIndex := 0; logIndex < logsPerRequest; logIndex++ { - logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex)) + lrs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex)) } ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) - require.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + require.NoError(t, logs.ConsumeLogs(context.Background(), ld)) } // Added to test case with empty resources sent. ld := plog.NewLogs() - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, logs.ConsumeLogs(context.Background(), ld)) - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, logs.Shutdown(context.Background())) require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount()) receivedMds := sink.AllLogs() logsReceivedBySeverityText := logsReceivedBySeverityText(receivedMds) for requestNum := 0; requestNum < requestCount; requestNum++ { - logs := sentResourceLogs.At(requestNum).ScopeLogs().At(0).LogRecords() + lrs := sentResourceLogs.At(requestNum).ScopeLogs().At(0).LogRecords() for logIndex := 0; logIndex < logsPerRequest; logIndex++ { require.EqualValues(t, - logs.At(logIndex), + lrs.At(logIndex), logsReceivedBySeverityText[getTestLogSeverityText(requestNum, logIndex)]) } } @@ -944,7 +924,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. - cfg := Config{ + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 50, } @@ -955,20 +935,18 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { ) sink := new(consumertest.LogsSink) - creationSet := tel.NewSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) + logs, err := NewFactory().CreateLogs(context.Background(), tel.NewSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) start := time.Now() size := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) size += sizer.LogsSize(ld) - require.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + require.NoError(t, logs.ConsumeLogs(context.Background(), ld)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, logs.Shutdown(context.Background())) elapsed := time.Since(start) require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) @@ -1058,10 +1036,11 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { }, }, }) + require.NoError(t, tel.Shutdown(context.Background())) } func TestBatchLogsProcessor_Timeout(t *testing.T) { - cfg := Config{ + cfg := &Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 100, } @@ -1069,16 +1048,14 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) start := time.Now() for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) - require.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + require.NoError(t, logs.ConsumeLogs(context.Background(), ld)) } // Wait for at least one batch to be sent. @@ -1093,7 +1070,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds()) // This should not change the results in the sink, verified by the expectedBatchesNum - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, logs.Shutdown(context.Background())) expectedBatchesNum := 1 expectedBatchingFactor := 5 @@ -1110,7 +1087,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { } func TestBatchLogProcessor_Shutdown(t *testing.T) { - cfg := Config{ + cfg := &Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, } @@ -1118,18 +1095,16 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) - require.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + require.NoError(t, logs.ConsumeLogs(context.Background(), ld)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, logs.Shutdown(context.Background())) require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount()) require.Len(t, sink.AllLogs(), 1) @@ -1197,11 +1172,9 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { cfg.SendBatchSize = 1000 cfg.Timeout = 10 * time.Minute cfg.MetadataKeys = []string{"token1", "token2"} - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) bg := context.Background() callCtxs := []context.Context{ @@ -1248,10 +1221,10 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { // use round-robin to assign context. num := requestNum % len(callCtxs) expectByContext[num] += spansPerRequest - require.NoError(t, batcher.ConsumeTraces(callCtxs[num], td)) + require.NoError(t, traces.ConsumeTraces(callCtxs[num], td)) } - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) // The following tests are the same as TestBatchProcessorSpansDelivered(). require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) @@ -1290,10 +1263,9 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit - creationSet := processortest.NewNopSettings() - batcher, err := newTracesBatchProcessor(creationSet, sink, cfg) + traces, err := NewFactory().CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) bg := context.Background() for requestNum := 0; requestNum < cardLimit; requestNum++ { @@ -1304,7 +1276,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { }), }) - require.NoError(t, batcher.ConsumeTraces(ctx, td)) + require.NoError(t, traces.ConsumeTraces(ctx, td)) } td := testdata.GenerateTraces(1) @@ -1313,38 +1285,36 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { "token": {"limit_exceeded"}, }), }) - err = batcher.ConsumeTraces(ctx, td) + err = traces.ConsumeTraces(ctx, td) require.Error(t, err) assert.True(t, consumererror.IsPermanent(err)) require.ErrorContains(t, err, "too many") - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, traces.Shutdown(context.Background())) } func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no maximum, just a pass through. - cfg := Config{} + cfg := &Config{} require.NoError(t, cfg.Validate()) const requestCount = 5 const logsPerRequest = 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }() + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, logs.Shutdown(context.Background())) }() expect := 0 for requestNum := 0; requestNum < requestCount; requestNum++ { cnt := logsPerRequest + requestNum expect += cnt ld := testdata.GenerateLogs(cnt) - require.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + require.NoError(t, logs.ConsumeLogs(context.Background(), ld)) } // Wait for all batches. @@ -1366,23 +1336,21 @@ func TestBatchSplitOnly(t *testing.T) { const requestCount = 5 const logsPerRequest = 100 - cfg := Config{ + cfg := &Config{ SendBatchMaxSize: maxBatch, } require.NoError(t, cfg.Validate()) sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newLogsBatchProcessor(creationSet, sink, &cfg) + logs, err := NewFactory().CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, sink) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }() + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, logs.Shutdown(context.Background())) }() for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) - require.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + require.NoError(t, logs.ConsumeLogs(context.Background(), ld)) } // Wait for all batches. diff --git a/processor/batchprocessor/generated_component_telemetry_test.go b/processor/batchprocessor/generated_component_telemetry_test.go index cad0f0fff51..8dabcd5d3db 100644 --- a/processor/batchprocessor/generated_component_telemetry_test.go +++ b/processor/batchprocessor/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() processor.Settings { set := processortest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("batch")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/processor/batchprocessor/internal/metadata/generated_telemetry.go b/processor/batchprocessor/internal/metadata/generated_telemetry.go index b04fc7694fc..01294294f87 100644 --- a/processor/batchprocessor/internal/metadata/generated_telemetry.go +++ b/processor/batchprocessor/internal/metadata/generated_telemetry.go @@ -7,17 +7,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/processor/batchprocessor") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/processor/batchprocessor") } @@ -36,7 +37,6 @@ type TelemetryBuilder struct { ProcessorBatchMetadataCardinality metric.Int64ObservableUpDownCounter observeProcessorBatchMetadataCardinality func(context.Context, metric.Observer) error ProcessorBatchTimeoutTriggerSend metric.Int64Counter - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -63,42 +63,41 @@ func WithProcessorBatchMetadataCardinalityCallback(cb func() int64, opts ...metr // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) - builder.meters[configtelemetry.LevelDetailed] = LeveledMeter(settings, configtelemetry.LevelDetailed) + builder.meter = Meter(settings) var err, errs error - builder.ProcessorBatchBatchSendSize, err = builder.meters[configtelemetry.LevelBasic].Int64Histogram( + builder.ProcessorBatchBatchSendSize, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Histogram( "otelcol_processor_batch_batch_send_size", metric.WithDescription("Number of units in the batch"), metric.WithUnit("{units}"), metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}...), ) errs = errors.Join(errs, err) - builder.ProcessorBatchBatchSendSizeBytes, err = builder.meters[configtelemetry.LevelDetailed].Int64Histogram( + builder.ProcessorBatchBatchSendSizeBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelDetailed, settings.MetricsLevel).Int64Histogram( "otelcol_processor_batch_batch_send_size_bytes", metric.WithDescription("Number of bytes in batch that was sent"), metric.WithUnit("By"), metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1e+06, 2e+06, 3e+06, 4e+06, 5e+06, 6e+06, 7e+06, 8e+06, 9e+06}...), ) errs = errors.Join(errs, err) - builder.ProcessorBatchBatchSizeTriggerSend, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorBatchBatchSizeTriggerSend, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_batch_batch_size_trigger_send", metric.WithDescription("Number of times the batch was sent due to a size trigger"), metric.WithUnit("{times}"), ) errs = errors.Join(errs, err) - builder.ProcessorBatchMetadataCardinality, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableUpDownCounter( + builder.ProcessorBatchMetadataCardinality, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableUpDownCounter( "otelcol_processor_batch_metadata_cardinality", metric.WithDescription("Number of distinct metadata value combinations being processed"), metric.WithUnit("{combinations}"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessorBatchMetadataCardinality, builder.ProcessorBatchMetadataCardinality) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessorBatchMetadataCardinality, builder.ProcessorBatchMetadataCardinality) errs = errors.Join(errs, err) - builder.ProcessorBatchTimeoutTriggerSend, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorBatchTimeoutTriggerSend, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_batch_timeout_trigger_send", metric.WithDescription("Number of times the batch was sent due to a timeout trigger"), metric.WithUnit("{times}"), @@ -106,3 +105,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/processor/batchprocessor/metadata.yaml b/processor/batchprocessor/metadata.yaml index 6f72f5d4d79..8196e44769b 100644 --- a/processor/batchprocessor/metadata.yaml +++ b/processor/batchprocessor/metadata.yaml @@ -4,8 +4,8 @@ github_project: open-telemetry/opentelemetry-collector status: class: processor stability: - beta: [traces, metrics, logs] - distributions: [core, contrib, k8s] + beta: [ traces, metrics, logs ] + distributions: [ core, contrib, k8s ] tests: @@ -32,7 +32,7 @@ telemetry: unit: "{units}" histogram: value_type: int - bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000] + bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000 ] processor_batch_batch_send_size_bytes: level: detailed enabled: true @@ -40,7 +40,7 @@ telemetry: unit: By histogram: value_type: int - bucket_boundaries: [10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000] + bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000 ] processor_batch_metadata_cardinality: enabled: true description: Number of distinct metadata value combinations being processed diff --git a/processor/memorylimiterprocessor/generated_component_telemetry_test.go b/processor/memorylimiterprocessor/generated_component_telemetry_test.go index 97ac806a5c9..61358049d6c 100644 --- a/processor/memorylimiterprocessor/generated_component_telemetry_test.go +++ b/processor/memorylimiterprocessor/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() processor.Settings { set := processortest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("memory_limiter")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/processor/memorylimiterprocessor/internal/metadata/generated_telemetry.go b/processor/memorylimiterprocessor/internal/metadata/generated_telemetry.go index b3aa5a31053..c2220b34325 100644 --- a/processor/memorylimiterprocessor/internal/metadata/generated_telemetry.go +++ b/processor/memorylimiterprocessor/internal/metadata/generated_telemetry.go @@ -6,17 +6,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/processor/memorylimiterprocessor") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/processor/memorylimiterprocessor") } @@ -35,7 +36,6 @@ type TelemetryBuilder struct { ProcessorRefusedLogRecords metric.Int64Counter ProcessorRefusedMetricPoints metric.Int64Counter ProcessorRefusedSpans metric.Int64Counter - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -52,43 +52,43 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.ProcessorAcceptedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorAcceptedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_accepted_log_records", metric.WithDescription("Number of log records successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]"), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ProcessorAcceptedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorAcceptedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_accepted_metric_points", metric.WithDescription("Number of metric points successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]"), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ProcessorAcceptedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorAcceptedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_accepted_spans", metric.WithDescription("Number of spans successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]"), metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) - builder.ProcessorRefusedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorRefusedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_refused_log_records", metric.WithDescription("Number of log records that were rejected by the next component in the pipeline. [deprecated since v0.110.0]"), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ProcessorRefusedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorRefusedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_refused_metric_points", metric.WithDescription("Number of metric points that were rejected by the next component in the pipeline. [deprecated since v0.110.0]"), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ProcessorRefusedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorRefusedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_refused_spans", metric.WithDescription("Number of spans that were rejected by the next component in the pipeline. [deprecated since v0.110.0]"), metric.WithUnit("{spans}"), @@ -96,3 +96,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 73fcfdd4d9d..055efce43b1 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -205,6 +207,74 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { }) } +func TestMetricsTelemetry(t *testing.T) { + tel := setupTestTelemetry() + cfg := &Config{ + CheckInterval: time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 10, + } + metrics, err := NewFactory().CreateMetrics(context.Background(), tel.NewSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) + + md := pmetric.NewMetrics() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + for requestNum := 0; requestNum < 10; requestNum++ { + require.NoError(t, metrics.ConsumeMetrics(context.Background(), md)) + } + require.NoError(t, metrics.Shutdown(context.Background())) + + tel.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_accepted_metric_points", + Description: "Number of metric points successfully pushed into the next component in the pipeline. [deprecated since v0.110.0]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 10, + Attributes: attribute.NewSet(attribute.String("processor", "memory_limiter")), + }, + }, + }, + }, + { + Name: "otelcol_processor_incoming_items", + Description: "Number of items passed to the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 10, + Attributes: attribute.NewSet(attribute.String("processor", "memory_limiter"), attribute.String("otel.signal", "metrics")), + }, + }, + }, + }, + { + Name: "otelcol_processor_outgoing_items", + Description: "Number of items emitted from the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 10, + Attributes: attribute.NewSet(attribute.String("processor", "memory_limiter"), attribute.String("otel.signal", "metrics")), + }, + }, + }, + }, + }) + require.NoError(t, tel.Shutdown(context.Background())) +} + // TestTraceMemoryPressureResponse manipulates results from querying memory and // check expected side effects. func TestTraceMemoryPressureResponse(t *testing.T) { diff --git a/processor/processorhelper/generated_component_telemetry_test.go b/processor/processorhelper/generated_component_telemetry_test.go index 2424d1ce6e9..fa517fc19d6 100644 --- a/processor/processorhelper/generated_component_telemetry_test.go +++ b/processor/processorhelper/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() processor.Settings { set := processortest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("processorhelper")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/processor/processorhelper/internal/metadata/generated_telemetry.go b/processor/processorhelper/internal/metadata/generated_telemetry.go index cb5d9fb7ae6..e008730be1b 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry.go @@ -6,17 +6,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/processor/processorhelper") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/processor/processorhelper") } @@ -31,7 +32,6 @@ type TelemetryBuilder struct { meter metric.Meter ProcessorIncomingItems metric.Int64Counter ProcessorOutgoingItems metric.Int64Counter - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -48,19 +48,19 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.ProcessorIncomingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorIncomingItems, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_incoming_items", metric.WithDescription("Number of items passed to the processor. [alpha]"), metric.WithUnit("{items}"), ) errs = errors.Join(errs, err) - builder.ProcessorOutgoingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ProcessorOutgoingItems, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_processor_outgoing_items", metric.WithDescription("Number of items emitted from the processor. [alpha]"), metric.WithUnit("{items}"), @@ -68,3 +68,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/receiver/receiverhelper/generated_component_telemetry_test.go b/receiver/receiverhelper/generated_component_telemetry_test.go index d7ca234a19d..0f49a1f7af3 100644 --- a/receiver/receiverhelper/generated_component_telemetry_test.go +++ b/receiver/receiverhelper/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() receiver.Settings { set := receivertest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("receiverhelper")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/receiver/receiverhelper/internal/metadata/generated_telemetry.go b/receiver/receiverhelper/internal/metadata/generated_telemetry.go index 86c263792cb..4a848a50f84 100644 --- a/receiver/receiverhelper/internal/metadata/generated_telemetry.go +++ b/receiver/receiverhelper/internal/metadata/generated_telemetry.go @@ -6,17 +6,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/receiver/receiverhelper") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/receiver/receiverhelper") } @@ -35,7 +36,6 @@ type TelemetryBuilder struct { ReceiverRefusedLogRecords metric.Int64Counter ReceiverRefusedMetricPoints metric.Int64Counter ReceiverRefusedSpans metric.Int64Counter - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -52,43 +52,43 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.ReceiverAcceptedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ReceiverAcceptedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_receiver_accepted_log_records", metric.WithDescription("Number of log records successfully pushed into the pipeline."), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ReceiverAcceptedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ReceiverAcceptedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_receiver_accepted_metric_points", metric.WithDescription("Number of metric points successfully pushed into the pipeline."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ReceiverAcceptedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ReceiverAcceptedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_receiver_accepted_spans", metric.WithDescription("Number of spans successfully pushed into the pipeline."), metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) - builder.ReceiverRefusedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ReceiverRefusedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_receiver_refused_log_records", metric.WithDescription("Number of log records that could not be pushed into the pipeline."), metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) - builder.ReceiverRefusedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ReceiverRefusedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_receiver_refused_metric_points", metric.WithDescription("Number of metric points that could not be pushed into the pipeline."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ReceiverRefusedSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ReceiverRefusedSpans, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_receiver_refused_spans", metric.WithDescription("Number of spans that could not be pushed into the pipeline."), metric.WithUnit("{spans}"), @@ -96,3 +96,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/receiver/scraperhelper/generated_component_telemetry_test.go b/receiver/scraperhelper/generated_component_telemetry_test.go index d2ee87bf3bc..fca2fa07ae1 100644 --- a/receiver/scraperhelper/generated_component_telemetry_test.go +++ b/receiver/scraperhelper/generated_component_telemetry_test.go @@ -26,14 +26,15 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) NewSettings() receiver.Settings { set := receivertest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() set.ID = component.NewID(component.MustNewType("scraperhelper")) + set.TelemetrySettings = tt.newTelemetrySettings() return set } func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/receiver/scraperhelper/internal/metadata/generated_telemetry.go b/receiver/scraperhelper/internal/metadata/generated_telemetry.go index 5bcbf186364..b0b257345a7 100644 --- a/receiver/scraperhelper/internal/metadata/generated_telemetry.go +++ b/receiver/scraperhelper/internal/metadata/generated_telemetry.go @@ -6,17 +6,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/receiver/scraperhelper") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/receiver/scraperhelper") } @@ -31,7 +32,6 @@ type TelemetryBuilder struct { meter metric.Meter ScraperErroredMetricPoints metric.Int64Counter ScraperScrapedMetricPoints metric.Int64Counter - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -48,19 +48,19 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.ScraperErroredMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ScraperErroredMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_scraper_errored_metric_points", metric.WithDescription("Number of metric points that were unable to be scraped."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) - builder.ScraperScrapedMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + builder.ScraperScrapedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_scraper_scraped_metric_points", metric.WithDescription("Number of metric points successfully scraped."), metric.WithUnit("{datapoints}"), @@ -68,3 +68,10 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/service/generated_component_telemetry_test.go b/service/generated_component_telemetry_test.go index ef527afcaee..fe23dfe934b 100644 --- a/service/generated_component_telemetry_test.go +++ b/service/generated_component_telemetry_test.go @@ -25,6 +25,7 @@ type componentTestTelemetry struct { func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider + set.MetricsLevel = configtelemetry.LevelDetailed set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider } diff --git a/service/internal/metadata/generated_telemetry.go b/service/internal/metadata/generated_telemetry.go index 5c60998cb83..c66271e550d 100644 --- a/service/internal/metadata/generated_telemetry.go +++ b/service/internal/metadata/generated_telemetry.go @@ -7,17 +7,18 @@ import ( "errors" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" ) -// Deprecated: [v0.108.0] use LeveledMeter instead. func Meter(settings component.TelemetrySettings) metric.Meter { return settings.MeterProvider.Meter("go.opentelemetry.io/collector/service") } +// Deprecated: [v0.114.0] use Meter instead. func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { return settings.LeveledMeterProvider(level).Meter("go.opentelemetry.io/collector/service") } @@ -42,7 +43,6 @@ type TelemetryBuilder struct { observeProcessRuntimeTotalSysMemoryBytes func(context.Context, metric.Observer) error ProcessUptime metric.Float64ObservableCounter observeProcessUptime func(context.Context, metric.Observer) error - meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -119,59 +119,66 @@ func WithProcessUptimeCallback(cb func() float64, opts ...metric.ObserveOption) // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + builder := TelemetryBuilder{} for _, op := range options { op.apply(&builder) } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + builder.meter = Meter(settings) var err, errs error - builder.ProcessCPUSeconds, err = builder.meters[configtelemetry.LevelBasic].Float64ObservableCounter( + builder.ProcessCPUSeconds, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Float64ObservableCounter( "otelcol_process_cpu_seconds", metric.WithDescription("Total CPU user and system time in seconds"), metric.WithUnit("s"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessCPUSeconds, builder.ProcessCPUSeconds) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessCPUSeconds, builder.ProcessCPUSeconds) errs = errors.Join(errs, err) - builder.ProcessMemoryRss, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge( + builder.ProcessMemoryRss, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableGauge( "otelcol_process_memory_rss", metric.WithDescription("Total physical memory (resident set size)"), metric.WithUnit("By"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessMemoryRss, builder.ProcessMemoryRss) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessMemoryRss, builder.ProcessMemoryRss) errs = errors.Join(errs, err) - builder.ProcessRuntimeHeapAllocBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge( + builder.ProcessRuntimeHeapAllocBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableGauge( "otelcol_process_runtime_heap_alloc_bytes", metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"), metric.WithUnit("By"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeHeapAllocBytes, builder.ProcessRuntimeHeapAllocBytes) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeHeapAllocBytes, builder.ProcessRuntimeHeapAllocBytes) errs = errors.Join(errs, err) - builder.ProcessRuntimeTotalAllocBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableCounter( + builder.ProcessRuntimeTotalAllocBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableCounter( "otelcol_process_runtime_total_alloc_bytes", metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), metric.WithUnit("By"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeTotalAllocBytes, builder.ProcessRuntimeTotalAllocBytes) errs = errors.Join(errs, err) - builder.ProcessRuntimeTotalSysMemoryBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableGauge( + builder.ProcessRuntimeTotalSysMemoryBytes, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64ObservableGauge( "otelcol_process_runtime_total_sys_memory_bytes", metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"), metric.WithUnit("By"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessRuntimeTotalSysMemoryBytes, builder.ProcessRuntimeTotalSysMemoryBytes) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessRuntimeTotalSysMemoryBytes, builder.ProcessRuntimeTotalSysMemoryBytes) errs = errors.Join(errs, err) - builder.ProcessUptime, err = builder.meters[configtelemetry.LevelBasic].Float64ObservableCounter( + builder.ProcessUptime, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Float64ObservableCounter( "otelcol_process_uptime", metric.WithDescription("Uptime of the process"), metric.WithUnit("s"), ) errs = errors.Join(errs, err) - _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeProcessUptime, builder.ProcessUptime) + _, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).RegisterCallback(builder.observeProcessUptime, builder.ProcessUptime) errs = errors.Join(errs, err) return &builder, errs } + +func getLeveledMeter(meter metric.Meter, cfgLevel, srvLevel configtelemetry.Level) metric.Meter { + if cfgLevel <= srvLevel { + return meter + } + return noop.Meter{} +} diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index 9411e7bb398..ed6f45decca 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -12,7 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - io_prometheus_client "github.com/prometheus/client_model/go" + promclient "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,8 +27,8 @@ import ( ) type testTelemetry struct { - component.TelemetrySettings - promHandler http.Handler + TelemetrySettings component.TelemetrySettings + promHandler http.Handler } var expectedMetrics = []string{ @@ -54,10 +54,8 @@ func setupTelemetry(t *testing.T) testTelemetry { sdkmetric.WithReader(exporter), ) - settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { - return meterProvider - } - + settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelDetailed + settings.TelemetrySettings.MeterProvider = meterProvider settings.TelemetrySettings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return meterProvider } @@ -69,7 +67,7 @@ func setupTelemetry(t *testing.T) testTelemetry { return settings } -func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) { +func fetchPrometheusMetrics(handler http.Handler) (map[string]*promclient.MetricFamily, error) { req, err := http.NewRequest(http.MethodGet, "/metrics", nil) if err != nil { return nil, err @@ -95,7 +93,7 @@ func TestProcessTelemetry(t *testing.T) { require.True(t, ok) require.Len(t, metric.Metric, 1) var metricValue float64 - if metric.GetType() == io_prometheus_client.MetricType_COUNTER { + if metric.GetType() == promclient.MetricType_COUNTER { metricValue = metric.Metric[0].GetCounter().GetValue() } else { metricValue = metric.Metric[0].GetGauge().GetValue()