Skip to content

Commit

Permalink
[exporterhelper] Add data_type attribute to internal queue metrics (#…
Browse files Browse the repository at this point in the history
…10593)

Add `data_type` attribute to the internal otelcol_exporter_queue_size
metric to report the type of data being processed.

All other metrics have the data type reported as part of their names. We
could've done the same for queue metrics, but that would introduce a
significant breaking change. We want to avoid that until we have all the
metrics standardized with OpenTelemetry semantic conventions.

Fixes
#9943
  • Loading branch information
dmitryax authored Jul 23, 2024
1 parent 6171720 commit c239e73
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 45 deletions.
18 changes: 18 additions & 0 deletions .chloggen/componenttest-extra-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: component/componenttest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional ...attribute.KeyValue argument to TestTelemetry.CheckExporterMetricGauge.

# One or more tracking issues or pull requests related to the change
issues: [10593]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 20 additions & 0 deletions .chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data_type attribute to `otelcol_exporter_queue_size` metric to report the type of data being processed.

# One or more tracking issues or pull requests related to the change
issues: [9943]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 5 additions & 2 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -72,8 +73,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords
return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords)
}

func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error {
return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val)
func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, extraAttrs ...attribute.KeyValue) error {
attrs := attributesForExporterMetrics(tts.id)
attrs = append(attrs, extraAttrs...)
return tts.prometheusChecker.checkGauge(metric, val, attrs)
}

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
Expand Down
6 changes: 2 additions & 4 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
}

func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)

ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
func (pc *prometheusChecker) checkGauge(metric string, val int64, attrs []attribute.KeyValue) error {
ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, attrs)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option {
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand All @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
DataType: o.signal,
ExporterSettings: o.set,
}
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ type baseExporter struct {
}

func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set, DataType: signal})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ObsReport struct {
level configtelemetry.Level
spanNamePrefix string
tracer trace.Tracer
dataType component.DataType

otelAttrs []attribute.KeyValue
telemetryBuilder *metadata.TelemetryBuilder
Expand All @@ -38,6 +39,7 @@ type ObsReport struct {
type ObsReportSettings struct {
ExporterID component.ID
ExporterCreateSettings exporter.Settings
DataType component.DataType
}

// NewObsReport creates a new Exporter.
Expand All @@ -58,7 +60,7 @@ func newExporter(cfg ObsReportSettings) (*ObsReport, error) {
level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel,
spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(),
tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()),

dataType: cfg.DataType,
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()),
},
Expand Down
1 change: 0 additions & 1 deletion exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
)

func TestExportEnqueueFailure(t *testing.T) {
exporterID := component.MustNewID("fakeExporter")
tt, err := componenttest.SetupTelemetry(exporterID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
Expand Down
25 changes: 13 additions & 12 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
Expand Down Expand Up @@ -74,18 +73,18 @@ type queueSender struct {
traceAttribute attribute.KeyValue
consumers *queue.Consumers[Request]

telemetryBuilder *metadata.TelemetryBuilder
exporterID component.ID
obsrep *ObsReport
exporterID component.ID
}

func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int,
exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender {
exportFailureMessage string, obsrep *ObsReport) *queueSender {
qs := &queueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
telemetryBuilder: telemetryBuilder,
exporterID: set.ID,
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
Expand All @@ -105,10 +104,12 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
return err
}

opts := metric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.exporterID.String())))
dataTypeAttr := attribute.String(obsmetrics.DataTypeKey, qs.obsrep.dataType.String())
return multierr.Append(
qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, opts),
qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, opts),
qs.obsrep.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))),
qs.obsrep.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))),
)
}

Expand Down
53 changes: 31 additions & 22 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
Expand Down Expand Up @@ -202,28 +203,33 @@ func TestQueuedRetryHappyPath(t *testing.T) {
})
}
}
func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender,
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
dataTypes := []component.DataType{component.DataTypeLogs, component.DataTypeTraces, component.DataTypeMetrics}
for _, dataType := range dataTypes {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, dataType, newObservabilityConsumerSender,
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))

for i := 0; i < 7; i++ {
require.NoError(t, be.send(context.Background(), newErrorRequest()))
}
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7),
attribute.String(obsmetrics.DataTypeKey, dataType.String())))

for i := 0; i < 7; i++ {
require.NoError(t, be.send(context.Background(), newErrorRequest()))
assert.NoError(t, be.Shutdown(context.Background()))
}
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7)))

assert.NoError(t, be.Shutdown(context.Background()))
}

func TestNoCancellationContext(t *testing.T) {
Expand Down Expand Up @@ -426,9 +432,12 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
func TestQueueSenderNoStartShutdown(t *testing.T) {
queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{})
set := exportertest.NewNopSettings()
builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
obsrep, err := NewObsReport(ObsReportSettings{
ExporterID: exporterID,
ExporterCreateSettings: exportertest.NewNopSettings(),
})
assert.NoError(t, err)
qs := newQueueSender(queue, set, 1, "", builder)
qs := newQueueSender(queue, set, 1, "", obsrep)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down
3 changes: 3 additions & 0 deletions internal/obsreportconfig/obsmetrics/obs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const (
// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"

// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"

// SentSpansKey used to track spans sent by exporters.
SentSpansKey = "sent_spans"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
Expand Down

0 comments on commit c239e73

Please sign in to comment.