Skip to content

Commit

Permalink
[exporterhelper] move queue metrics to mdatagen (open-telemetry#10318)
Browse files Browse the repository at this point in the history
This uses mdatagen to generate the queue metrics. This will allow users
to see the metric in the documentation for exporter helper

---------

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
codeboten authored Jun 10, 2024
1 parent be8ca39 commit 7704414
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 46 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option {
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage)
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
return nil
}
}
Expand All @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
DataType: o.signal,
ExporterSettings: o.set,
}
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage)
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
return nil
}
}
Expand Down
16 changes: 16 additions & 0 deletions exporter/exporterhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ Number of spans failed to be added to the sending queue.
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### exporter_queue_capacity

Fixed capacity of the retry queue (in batches)

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### exporter_queue_size

Current size of the retry queue (in batches)

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### exporter_send_failed_log_records

Number of log records in failed attempts to send to destination.
Expand Down
42 changes: 42 additions & 0 deletions exporter/exporterhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions exporter/exporterhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ telemetry:
sum:
value_type: int
monotonic: true

exporter_queue_size:
enabled: true
description: Current size of the retry queue (in batches)
unit: 1
optional: true
gauge:
value_type: int
async: true

exporter_queue_capacity:
enabled: true
description: Fixed capacity of the retry queue (in batches)
unit: 1
optional: true
gauge:
value_type: int
async: true
4 changes: 3 additions & 1 deletion exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func NewObsReport(cfg ObsReportSettings) (*ObsReport, error) {
}

func newExporter(cfg ObsReportSettings) (*ObsReport, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(cfg.ExporterCreateSettings.TelemetrySettings)
telemetryBuilder, err := metadata.NewTelemetryBuilder(cfg.ExporterCreateSettings.TelemetrySettings,
metadata.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()))),
)
if err != nil {
return nil, err
}
Expand Down
52 changes: 10 additions & 42 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,20 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const defaultQueueSize = 1000

var (
scopeName = "go.opentelemetry.io/collector/exporterhelper"
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand Down Expand Up @@ -73,27 +69,21 @@ func (qCfg *QueueSettings) Validate() error {

type queueSender struct {
baseRequestSender
fullName string
queue exporterqueue.Queue[Request]
numConsumers int
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
consumers *queue.Consumers[Request]

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
telemetryBuilder *metadata.TelemetryBuilder
}

func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int,
exportFailureMessage string) *queueSender {
exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender {
qs := &queueSender{
fullName: set.ID.String(),
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
telemetryBuilder: telemetryBuilder,
}
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
Expand All @@ -113,32 +103,10 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
return err
}

var err, errs error

attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName)))

qs.metricSize, err = qs.meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_size",
otelmetric.WithDescription("Current size of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error {
o.Observe(int64(qs.queue.Size()), attrs)
return nil
}),
return multierr.Append(
qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }),
qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }),
)
errs = multierr.Append(errs, err)

qs.metricCapacity, err = qs.meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_capacity",
otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error {
o.Observe(int64(qs.queue.Capacity()), attrs)
return nil
}))

errs = multierr.Append(errs, err)
return errs
}

// Shutdown is invoked during service shutdown.
Expand Down
6 changes: 5 additions & 1 deletion exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
Expand Down Expand Up @@ -424,7 +425,10 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {

func TestQueueSenderNoStartShutdown(t *testing.T) {
queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{})
qs := newQueueSender(queue, exportertest.NewNopSettings(), 1, "")
set := exportertest.NewNopSettings()
builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
assert.NoError(t, err)
qs := newQueueSender(queue, set, 1, "", builder)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down

0 comments on commit 7704414

Please sign in to comment.