Skip to content

Commit

Permalink
[exporterhelper] fix bug with queue size and capacity metrics (open-t…
Browse files Browse the repository at this point in the history
…elemetry#8716)

This change ensures the queue_size and queue_capacity metrics are
available when using OpenTelemetry for the collector's internal metrics.

Fixes open-telemetry#8682

---------

Signed-off-by: Alex Boten <aboten@lightstep.com>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
  • Loading branch information
Alex Boten and dmitryax authored Oct 26, 2023
1 parent 5c225d9 commit f0bba0e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 3 deletions.
25 changes: 25 additions & 0 deletions .chloggen/codeboten_fix-queue-size-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: fix bug with queue size and capacity metrics

# One or more tracking issues or pull requests related to the change
issues: [8682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
49 changes: 47 additions & 2 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@ import (

"go.opencensus.io/metric/metricdata"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const defaultQueueSize = 1000

var errSendingQueueIsFull = errors.New("sending_queue is full")
var (
errSendingQueueIsFull = errors.New("sending_queue is full")
scopeName = "go.opentelemetry.io/collector/exporterhelper"
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
Expand Down Expand Up @@ -74,6 +80,9 @@ type queueSender struct {
traceAttribute attribute.KeyValue
logger *zap.Logger
requeuingEnabled bool

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender {
Expand Down Expand Up @@ -131,8 +140,44 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor
return err
}

if obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled() {
return qs.recordWithOtel(set.MeterProvider.Meter(scopeName))
}
return qs.recordWithOC()
}

func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error {
var err, errs error

attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName)))

qs.metricSize, err = meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_size",
otelmetric.WithDescription("Current size of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error {
o.Observe(int64(qs.queue.Size()), attrs)
return nil
}),
)
errs = multierr.Append(errs, err)

qs.metricCapacity, err = meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_capacity",
otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error {
o.Observe(int64(qs.queue.Capacity()), attrs)
return nil
}))

errs = multierr.Append(errs, err)
return errs
}

func (qs *queueSender) recordWithOC() error {
// Start reporting queue length metric
err = globalInstruments.queueSize.UpsertEntry(func() int64 {
err := globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(qs.queue.Size())
}, metricdata.NewLabelValue(qs.fullName))
if err != nil {
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
Expand Down Expand Up @@ -132,11 +134,24 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

// Force the state of feature gate for a test
func setFeatureGateForTest(t testing.TB, gate *featuregate.Gate, enabled bool) func() {
originalValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), enabled))
return func() {
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), originalValue))
}
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := NewDefaultRetrySettings()
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -150,6 +165,31 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
checkValueForGlobalManager(t, defaultExporterTags, int64(0), "exporter/queue_size")
}

func TestQueuedRetry_QueueMetricsReportedUsingOTel(t *testing.T) {
resetFlag := setFeatureGateForTest(t, obsreportconfig.UseOtelForInternalMetricsfeatureGate, true)
defer resetFlag()

tt, err := obsreporttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := NewDefaultRetrySettings()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("exporter_queue_capacity", int64(defaultQueueSize)))

for i := 0; i < 7; i++ {
require.NoError(t, be.send(newErrorRequest(context.Background())))
}
require.NoError(t, tt.CheckExporterMetricGauge("exporter_queue_size", int64(7)))

assert.NoError(t, be.Shutdown(context.Background()))
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down
4 changes: 4 additions & 0 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords
return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords)
}

func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error {
return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val)
}

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
Expand Down
18 changes: 18 additions & 0 deletions obsreport/obsreporttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
}

func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)
// Forces a flush for the opencensus view data.
_, _ = view.RetrieveData(metric)

ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
if err != nil {
return err
}

expected := float64(val)
if math.Abs(ts.GetGauge().GetValue()-expected) > 0.0001 {
return fmt.Errorf("values for metric '%s' did not match, expected '%f' got '%f'", metric, expected, ts.GetGauge().GetValue())
}

return nil
}

func (pc *prometheusChecker) checkCounter(expectedMetric string, value int64, attrs []attribute.KeyValue) error {
// Forces a flush for the opencensus view data.
_, _ = view.RetrieveData(expectedMetric)
Expand Down

0 comments on commit f0bba0e

Please sign in to comment.