From 7704414027d82f2f8ae4034c4cab886d2df5cf66 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Mon, 10 Jun 2024 12:25:57 -0700 Subject: [PATCH] [exporterhelper] move queue metrics to mdatagen (#10318) This uses mdatagen to generate the queue metrics. This will allow users to see the metric in the documentation for exporter helper --------- Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- exporter/exporterhelper/common.go | 4 +- exporter/exporterhelper/documentation.md | 16 ++++++ .../internal/metadata/generated_telemetry.go | 42 +++++++++++++++ exporter/exporterhelper/metadata.yaml | 18 +++++++ exporter/exporterhelper/obsexporter.go | 4 +- exporter/exporterhelper/queue_sender.go | 52 ++++--------------- exporter/exporterhelper/queue_sender_test.go | 6 ++- 7 files changed, 96 insertions(+), 46 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index e07a1a37189..4ed1dd3cb39 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option { NumConsumers: config.NumConsumers, QueueSize: config.QueueSize, }) - o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage) + o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder) return nil } } @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto DataType: o.signal, ExporterSettings: o.set, } - o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage) + o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder) return nil } } diff --git a/exporter/exporterhelper/documentation.md b/exporter/exporterhelper/documentation.md index ac974e01dd2..df11c1af81e 100644 --- a/exporter/exporterhelper/documentation.md +++ b/exporter/exporterhelper/documentation.md @@ -30,6 +30,22 @@ Number of spans failed to be added to the sending queue. | ---- | ----------- | ---------- | --------- | | 1 | Sum | Int | true | +### exporter_queue_capacity + +Fixed capacity of the retry queue (in batches) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### exporter_queue_size + +Current size of the retry queue (in batches) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + ### exporter_send_failed_log_records Number of log records in failed attempts to send to destination. diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index 076b84bf9a8..4383809885e 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -3,8 +3,10 @@ package metadata import ( + "context" "errors" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" @@ -28,6 +30,8 @@ type TelemetryBuilder struct { ExporterEnqueueFailedLogRecords metric.Int64Counter ExporterEnqueueFailedMetricPoints metric.Int64Counter ExporterEnqueueFailedSpans metric.Int64Counter + ExporterQueueCapacity metric.Int64ObservableGauge + ExporterQueueSize metric.Int64ObservableGauge ExporterSendFailedLogRecords metric.Int64Counter ExporterSendFailedMetricPoints metric.Int64Counter ExporterSendFailedSpans metric.Int64Counter @@ -35,6 +39,7 @@ type TelemetryBuilder struct { ExporterSentMetricPoints metric.Int64Counter ExporterSentSpans metric.Int64Counter level configtelemetry.Level + attributeSet attribute.Set } // telemetryBuilderOption applies changes to default builder. @@ -47,6 +52,43 @@ func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { } } +// WithAttributeSet applies a set of attributes for asynchronous instruments. +func WithAttributeSet(set attribute.Set) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.attributeSet = set + } +} + +// InitExporterQueueCapacity configures the ExporterQueueCapacity metric. +func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64) error { + var err error + builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge( + "exporter_queue_capacity", + metric.WithDescription("Fixed capacity of the retry queue (in batches)"), + metric.WithUnit("1"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(cb(), metric.WithAttributeSet(builder.attributeSet)) + return nil + }), + ) + return err +} + +// InitExporterQueueSize configures the ExporterQueueSize metric. +func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64) error { + var err error + builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge( + "exporter_queue_size", + metric.WithDescription("Current size of the retry queue (in batches)"), + metric.WithUnit("1"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(cb(), metric.WithAttributeSet(builder.attributeSet)) + return nil + }), + ) + return err +} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { diff --git a/exporter/exporterhelper/metadata.yaml b/exporter/exporterhelper/metadata.yaml index 26b06a37536..0703902f6f4 100644 --- a/exporter/exporterhelper/metadata.yaml +++ b/exporter/exporterhelper/metadata.yaml @@ -80,3 +80,21 @@ telemetry: sum: value_type: int monotonic: true + + exporter_queue_size: + enabled: true + description: Current size of the retry queue (in batches) + unit: 1 + optional: true + gauge: + value_type: int + async: true + + exporter_queue_capacity: + enabled: true + description: Fixed capacity of the retry queue (in batches) + unit: 1 + optional: true + gauge: + value_type: int + async: true diff --git a/exporter/exporterhelper/obsexporter.go b/exporter/exporterhelper/obsexporter.go index 0730c394667..8093bd1f834 100644 --- a/exporter/exporterhelper/obsexporter.go +++ b/exporter/exporterhelper/obsexporter.go @@ -40,7 +40,9 @@ func NewObsReport(cfg ObsReportSettings) (*ObsReport, error) { } func newExporter(cfg ObsReportSettings) (*ObsReport, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(cfg.ExporterCreateSettings.TelemetrySettings) + telemetryBuilder, err := metadata.NewTelemetryBuilder(cfg.ExporterCreateSettings.TelemetrySettings, + metadata.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()))), + ) if err != nil { return nil, err } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 48dd0476572..dbbf71f7bc5 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -9,13 +9,13 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" @@ -23,10 +23,6 @@ import ( const defaultQueueSize = 1000 -var ( - scopeName = "go.opentelemetry.io/collector/exporterhelper" -) - // QueueSettings defines configuration for queueing batches before sending to the consumerSender. type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. @@ -73,27 +69,21 @@ func (qCfg *QueueSettings) Validate() error { type queueSender struct { baseRequestSender - fullName string queue exporterqueue.Queue[Request] numConsumers int traceAttribute attribute.KeyValue - logger *zap.Logger - meter otelmetric.Meter consumers *queue.Consumers[Request] - metricCapacity otelmetric.Int64ObservableGauge - metricSize otelmetric.Int64ObservableGauge + telemetryBuilder *metadata.TelemetryBuilder } func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int, - exportFailureMessage string) *queueSender { + exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender { qs := &queueSender{ - fullName: set.ID.String(), - queue: q, - numConsumers: numConsumers, - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - logger: set.TelemetrySettings.Logger, - meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), + queue: q, + numConsumers: numConsumers, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + telemetryBuilder: telemetryBuilder, } consumeFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) @@ -113,32 +103,10 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error { return err } - var err, errs error - - attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName))) - - qs.metricSize, err = qs.meter.Int64ObservableGauge( - obsmetrics.ExporterKey+"/queue_size", - otelmetric.WithDescription("Current size of the retry queue (in batches)"), - otelmetric.WithUnit("1"), - otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(int64(qs.queue.Size()), attrs) - return nil - }), + return multierr.Append( + qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }), + qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }), ) - errs = multierr.Append(errs, err) - - qs.metricCapacity, err = qs.meter.Int64ObservableGauge( - obsmetrics.ExporterKey+"/queue_capacity", - otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"), - otelmetric.WithUnit("1"), - otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(int64(qs.queue.Capacity()), attrs) - return nil - })) - - errs = multierr.Append(errs, err) - return errs } // Shutdown is invoked during service shutdown. diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index e04f8b3c2bc..c76b136a4e4 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal/queue" @@ -424,7 +425,10 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { func TestQueueSenderNoStartShutdown(t *testing.T) { queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{}) - qs := newQueueSender(queue, exportertest.NewNopSettings(), 1, "") + set := exportertest.NewNopSettings() + builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + assert.NoError(t, err) + qs := newQueueSender(queue, set, 1, "", builder) assert.NoError(t, qs.Shutdown(context.Background())) }