From b440529711139ceee598b7bb7a38276a8b7ee24f Mon Sep 17 00:00:00 2001 From: "gustavo.paiva" Date: Thu, 27 Oct 2022 15:15:23 -0300 Subject: [PATCH] Instrument `batch` processor with Otel Go --- ...strument-batch-processor-with-otel-go.yaml | 16 + processor/batchprocessor/batch_processor.go | 48 +-- .../batchprocessor/batch_processor_test.go | 402 ++++++++---------- processor/batchprocessor/factory.go | 7 +- processor/batchprocessor/metrics.go | 176 +++++++- processor/batchprocessor/metrics_test.go | 218 ++++++++++ service/telemetry.go | 12 +- 7 files changed, 630 insertions(+), 249 deletions(-) create mode 100755 .chloggen/instrument-batch-processor-with-otel-go.yaml diff --git a/.chloggen/instrument-batch-processor-with-otel-go.yaml b/.chloggen/instrument-batch-processor-with-otel-go.yaml new file mode 100755 index 00000000000..c2cc2b2733e --- /dev/null +++ b/.chloggen/instrument-batch-processor-with-otel-go.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: batchprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "instrument the `batch` processor with OpenTelemetry Go SDK" + +# One or more tracking issues or pull requests related to the change +issues: [6423] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index a7443a95df6..e083ef7ea5f 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -16,17 +16,17 @@ package batchprocessor // import "go.opentelemetry.io/collector/processor/batchp import ( "context" + "fmt" "runtime" "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -54,7 +54,7 @@ type batchProcessor struct { shutdownC chan struct{} goroutines sync.WaitGroup - telemetryLevel configtelemetry.Level + telemetry *batchProcessorTelemetry } type batch interface { @@ -72,15 +72,16 @@ var _ consumer.Traces = (*batchProcessor)(nil) var _ consumer.Metrics = (*batchProcessor)(nil) var _ consumer.Logs = (*batchProcessor)(nil) -func newBatchProcessor(set component.ProcessorCreateSettings, cfg *Config, batch batch, telemetryLevel configtelemetry.Level) (*batchProcessor, error) { - exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, cfg.ID().String())) +func newBatchProcessor(set component.ProcessorCreateSettings, cfg *Config, batch batch, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) { + bpt, err := newBatchProcessorTelemetry(set.MeterProvider, cfg, telemetryLevel, registry) if err != nil { - return nil, err + return nil, fmt.Errorf("error to create batch processor telemetry %w", err) } + return &batchProcessor{ - logger: set.Logger, - exportCtx: exportCtx, - telemetryLevel: telemetryLevel, + logger: set.Logger, + exportCtx: bpt.exportCtx, + telemetry: bpt, sendBatchSize: int(cfg.SendBatchSize), sendBatchMaxSize: int(cfg.SendBatchMaxSize), @@ -130,7 +131,7 @@ func (bp *batchProcessor) startProcessingCycle() { if bp.batch.itemCount() > 0 { // TODO: Set a timeout on sendTraces or // make it cancellable using the context that Shutdown gets as a parameter - bp.sendItems(statTimeoutTriggerSend) + bp.sendItems(triggerTimeout) } return case item := <-bp.newItem: @@ -140,7 +141,7 @@ func (bp *batchProcessor) startProcessingCycle() { bp.processItem(item) case <-bp.timer.C: if bp.batch.itemCount() > 0 { - bp.sendItems(statTimeoutTriggerSend) + bp.sendItems(triggerTimeout) } bp.resetTimer() } @@ -152,7 +153,7 @@ func (bp *batchProcessor) processItem(item interface{}) { sent := false for bp.batch.itemCount() >= bp.sendBatchSize { sent = true - bp.sendItems(statBatchSizeTriggerSend) + bp.sendItems(triggerBatchSize) } if sent { @@ -171,17 +172,12 @@ func (bp *batchProcessor) resetTimer() { bp.timer.Reset(bp.timeout) } -func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) { - detailed := bp.telemetryLevel == configtelemetry.LevelDetailed - sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, detailed) +func (bp *batchProcessor) sendItems(trigger trigger) { + sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, bp.telemetry.detailed) if err != nil { bp.logger.Warn("Sender failed", zap.Error(err)) } else { - // Add that it came form the trace pipeline? - stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(sent))) - if detailed { - stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bytes))) - } + bp.telemetry.record(trigger, int64(sent), int64(bytes)) } } @@ -205,18 +201,18 @@ func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld plog.Logs) error { } // newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout -func newBatchTracesProcessor(set component.ProcessorCreateSettings, next consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, newBatchTraces(next), telemetryLevel) +func newBatchTracesProcessor(set component.ProcessorCreateSettings, next consumer.Traces, cfg *Config, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) { + return newBatchProcessor(set, cfg, newBatchTraces(next), telemetryLevel, registry) } // newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout -func newBatchMetricsProcessor(set component.ProcessorCreateSettings, next consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, newBatchMetrics(next), telemetryLevel) +func newBatchMetricsProcessor(set component.ProcessorCreateSettings, next consumer.Metrics, cfg *Config, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) { + return newBatchProcessor(set, cfg, newBatchMetrics(next), telemetryLevel, registry) } // newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout -func newBatchLogsProcessor(set component.ProcessorCreateSettings, next consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, newBatchLogs(next), telemetryLevel) +func newBatchLogsProcessor(set component.ProcessorCreateSettings, next consumer.Logs, cfg *Config, telemetryLevel configtelemetry.Level, registry *featuregate.Registry) (*batchProcessor, error) { + return newBatchProcessor(set, cfg, newBatchLogs(next), telemetryLevel, registry) } type batchTraces struct { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index ee406106777..4bd2e49f082 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -24,13 +24,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -42,7 +42,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -84,7 +84,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelBasic) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelBasic, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -123,114 +123,100 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { func TestBatchProcessorSentBySize(t *testing.T) { sizer := &ptrace.ProtoMarshaler{} - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) - - sink := new(consumertest.TracesSink) - cfg := createDefaultConfig().(*Config) - sendBatchSize := 20 - cfg.SendBatchSize = uint32(sendBatchSize) - cfg.Timeout = 500 * time.Millisecond - creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed) - require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) { + sink := new(consumertest.TracesSink) + cfg := createDefaultConfig().(*Config) + sendBatchSize := 20 + cfg.SendBatchSize = uint32(sendBatchSize) + cfg.Timeout = 500 * time.Millisecond + creationSet := tel.NewProcessorCreateSettings() + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, registry) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 100 + spansPerRequest := 5 + + start := time.Now() + sizeSum := 0 + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + sizeSum += sizer.TracesSize(td) + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + } - requestCount := 100 - spansPerRequest := 5 + require.NoError(t, batcher.Shutdown(context.Background())) - start := time.Now() - sizeSum := 0 - for requestNum := 0; requestNum < requestCount; requestNum++ { - td := testdata.GenerateTraces(spansPerRequest) - sizeSum += sizer.TracesSize(td) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - } + elapsed := time.Since(start) + require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - require.NoError(t, batcher.Shutdown(context.Background())) + expectedBatchesNum := requestCount * spansPerRequest / sendBatchSize + expectedBatchingFactor := sendBatchSize / spansPerRequest - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - - expectedBatchesNum := requestCount * spansPerRequest / sendBatchSize - expectedBatchingFactor := sendBatchSize / spansPerRequest - - require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) - receivedTraces := sink.AllTraces() - require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) - for _, td := range receivedTraces { - rss := td.ResourceSpans() - require.Equal(t, expectedBatchingFactor, rss.Len()) - for i := 0; i < expectedBatchingFactor; i++ { - require.Equal(t, spansPerRequest, rss.At(i).ScopeSpans().At(0).Spans().Len()) + require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) + receivedTraces := sink.AllTraces() + require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) + for _, td := range receivedTraces { + rss := td.ResourceSpans() + require.Equal(t, expectedBatchingFactor, rss.Len()) + for i := 0; i < expectedBatchingFactor; i++ { + require.Equal(t, spansPerRequest, rss.At(i).ScopeSpans().At(0).Spans().Len()) + } } - } - viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, sink.SpanCount(), int(distData.Sum())) - assert.Equal(t, sendBatchSize, int(distData.Min)) - assert.Equal(t, sendBatchSize, int(distData.Max)) - - viewData, err = view.RetrieveData("processor/batch/" + statBatchSendSizeBytes.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, sizeSum, int(distData.Sum())) + tel.assertMetrics(t, expectedMetrics{ + sendCount: float64(expectedBatchesNum), + sendSizeSum: float64(sink.SpanCount()), + sendSizeBytesSum: float64(sizeSum), + sizeTrigger: float64(expectedBatchesNum), + }) + }) + } func TestBatchProcessorSentBySize_withMaxSize(t *testing.T) { - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) - - sink := new(consumertest.TracesSink) - cfg := createDefaultConfig().(*Config) - sendBatchSize := 20 - sendBatchMaxSize := 37 - cfg.SendBatchSize = uint32(sendBatchSize) - cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) - cfg.Timeout = 500 * time.Millisecond - creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed) - require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - - requestCount := 1 - spansPerRequest := 500 - totalSpans := requestCount * spansPerRequest + telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) { + sink := new(consumertest.TracesSink) + cfg := createDefaultConfig().(*Config) + sendBatchSize := 20 + sendBatchMaxSize := 37 + cfg.SendBatchSize = uint32(sendBatchSize) + cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) + cfg.Timeout = 500 * time.Millisecond + creationSet := tel.NewProcessorCreateSettings() + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, registry) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 1 + spansPerRequest := 500 + totalSpans := requestCount * spansPerRequest + + start := time.Now() + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + } - start := time.Now() - for requestNum := 0; requestNum < requestCount; requestNum++ { - td := testdata.GenerateTraces(spansPerRequest) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - } + require.NoError(t, batcher.Shutdown(context.Background())) - require.NoError(t, batcher.Shutdown(context.Background())) + elapsed := time.Since(start) + require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) + // The max batch size is not a divisor of the total number of spans + expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize))) - // The max batch size is not a divisor of the total number of spans - expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize))) + require.Equal(t, totalSpans, sink.SpanCount()) + receivedTraces := sink.AllTraces() + require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) - require.Equal(t, totalSpans, sink.SpanCount()) - receivedTraces := sink.AllTraces() - require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) - - viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, sink.SpanCount(), int(distData.Sum())) - assert.Equal(t, totalSpans%sendBatchMaxSize, int(distData.Min)) - assert.Equal(t, sendBatchMaxSize, int(distData.Max)) + tel.assertMetrics(t, expectedMetrics{ + sendCount: float64(expectedBatchesNum), + sendSizeSum: float64(sink.SpanCount()), + sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), + timeoutTrigger: 1, + }) + }) } func TestBatchProcessorSentByTimeout(t *testing.T) { @@ -245,7 +231,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { spansPerRequest := 10 start := time.Now() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -292,7 +278,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { sink := new(consumertest.TracesSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -323,7 +309,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { sink := new(consumertest.MetricsSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -360,69 +346,59 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { func TestBatchMetricProcessor_BatchSize(t *testing.T) { sizer := &pmetric.ProtoMarshaler{} - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) - - // Instantiate the batch processor with low config values to test data - // gets sent through the processor. - cfg := Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), - Timeout: 100 * time.Millisecond, - SendBatchSize: 50, - } - - requestCount := 100 - metricsPerRequest := 5 - dataPointsPerMetric := 2 // Since the int counter uses two datapoints. - dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric - sink := new(consumertest.MetricsSink) - - creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) - require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) { + + // Instantiate the batch processor with low config values to test data + // gets sent through the processor. + cfg := Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), + Timeout: 100 * time.Millisecond, + SendBatchSize: 50, + } - start := time.Now() - size := 0 - for requestNum := 0; requestNum < requestCount; requestNum++ { - md := testdata.GenerateMetrics(metricsPerRequest) - size += sizer.MetricsSize(md) - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - } - require.NoError(t, batcher.Shutdown(context.Background())) + requestCount := 100 + metricsPerRequest := 5 + dataPointsPerMetric := 2 // Since the int counter uses two datapoints. + dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric + sink := new(consumertest.MetricsSink) + + creationSet := tel.NewProcessorCreateSettings() + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, registry) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + start := time.Now() + size := 0 + for requestNum := 0; requestNum < requestCount; requestNum++ { + md := testdata.GenerateMetrics(metricsPerRequest) + size += sizer.MetricsSize(md) + assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + } + require.NoError(t, batcher.Shutdown(context.Background())) - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) + elapsed := time.Since(start) + require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize) - expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest + expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize) + expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest - require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount()) - receivedMds := sink.AllMetrics() - require.Equal(t, expectedBatchesNum, len(receivedMds)) - for _, md := range receivedMds { - require.Equal(t, expectedBatchingFactor, md.ResourceMetrics().Len()) - for i := 0; i < expectedBatchingFactor; i++ { - require.Equal(t, metricsPerRequest, md.ResourceMetrics().At(i).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount()) + receivedMds := sink.AllMetrics() + require.Equal(t, expectedBatchesNum, len(receivedMds)) + for _, md := range receivedMds { + require.Equal(t, expectedBatchingFactor, md.ResourceMetrics().Len()) + for i := 0; i < expectedBatchingFactor; i++ { + require.Equal(t, metricsPerRequest, md.ResourceMetrics().At(i).ScopeMetrics().At(0).Metrics().Len()) + } } - } - viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, sink.DataPointCount(), int(distData.Sum())) - assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min)) - assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max)) - - viewData, err = view.RetrieveData("processor/batch/" + statBatchSendSizeBytes.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, size, int(distData.Sum())) + tel.assertMetrics(t, expectedMetrics{ + sendCount: float64(expectedBatchesNum), + sendSizeSum: float64(sink.DataPointCount()), + sendSizeBytesSum: float64(size), + sizeTrigger: 20, + }) + }) } func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { @@ -455,7 +431,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { sink := new(consumertest.MetricsSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -504,7 +480,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { sink := new(consumertest.MetricsSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -590,7 +566,7 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { creationSet := componenttest.NewNopProcessorCreateSettings() metricsPerRequest := 1000 - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(b, err) require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost())) @@ -641,7 +617,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -678,67 +654,57 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { func TestBatchLogProcessor_BatchSize(t *testing.T) { sizer := &plog.ProtoMarshaler{} - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) - - // Instantiate the batch processor with low config values to test data - // gets sent through the processor. - cfg := Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), - Timeout: 100 * time.Millisecond, - SendBatchSize: 50, - } - - requestCount := 100 - logsPerRequest := 5 - sink := new(consumertest.LogsSink) - - creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) - require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + telemetryTest(t, func(t *testing.T, tel testTelemetry, registry *featuregate.Registry) { + + // Instantiate the batch processor with low config values to test data + // gets sent through the processor. + cfg := Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), + Timeout: 100 * time.Millisecond, + SendBatchSize: 50, + } - start := time.Now() - size := 0 - for requestNum := 0; requestNum < requestCount; requestNum++ { - ld := testdata.GenerateLogs(logsPerRequest) - size += sizer.LogsSize(ld) - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - } - require.NoError(t, batcher.Shutdown(context.Background())) + requestCount := 100 + logsPerRequest := 5 + sink := new(consumertest.LogsSink) + + creationSet := tel.NewProcessorCreateSettings() + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, registry) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + start := time.Now() + size := 0 + for requestNum := 0; requestNum < requestCount; requestNum++ { + ld := testdata.GenerateLogs(logsPerRequest) + size += sizer.LogsSize(ld) + assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + } + require.NoError(t, batcher.Shutdown(context.Background())) - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) + elapsed := time.Since(start) + require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize) - expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest + expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize) + expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest - require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount()) - receivedMds := sink.AllLogs() - require.Equal(t, expectedBatchesNum, len(receivedMds)) - for _, ld := range receivedMds { - require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len()) - for i := 0; i < expectedBatchingFactor; i++ { - require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount()) + receivedMds := sink.AllLogs() + require.Equal(t, expectedBatchesNum, len(receivedMds)) + for _, ld := range receivedMds { + require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len()) + for i := 0; i < expectedBatchingFactor; i++ { + require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).ScopeLogs().At(0).LogRecords().Len()) + } } - } - viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, sink.LogRecordCount(), int(distData.Sum())) - assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min)) - assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max)) - - viewData, err = view.RetrieveData("processor/batch/" + statBatchSendSizeBytes.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.DistributionData) - assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, size, int(distData.Sum())) + tel.assertMetrics(t, expectedMetrics{ + sendCount: float64(expectedBatchesNum), + sendSizeSum: float64(sink.LogRecordCount()), + sendSizeBytesSum: float64(size), + sizeTrigger: float64(expectedBatchesNum), + }) + }) } func TestBatchLogsProcessor_Timeout(t *testing.T) { @@ -752,7 +718,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -801,7 +767,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := componenttest.NewNopProcessorCreateSettings() - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed, featuregate.GetRegistry()) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/batchprocessor/factory.go b/processor/batchprocessor/factory.go index 8316b928ecc..0c8ce55dbb5 100644 --- a/processor/batchprocessor/factory.go +++ b/processor/batchprocessor/factory.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/featuregate" ) const ( @@ -56,7 +57,7 @@ func createTracesProcessor( nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { level := set.MetricsLevel - return newBatchTracesProcessor(set, nextConsumer, cfg.(*Config), level) + return newBatchTracesProcessor(set, nextConsumer, cfg.(*Config), level, featuregate.GetRegistry()) } func createMetricsProcessor( @@ -66,7 +67,7 @@ func createMetricsProcessor( nextConsumer consumer.Metrics, ) (component.MetricsProcessor, error) { level := set.MetricsLevel - return newBatchMetricsProcessor(set, nextConsumer, cfg.(*Config), level) + return newBatchMetricsProcessor(set, nextConsumer, cfg.(*Config), level, featuregate.GetRegistry()) } func createLogsProcessor( @@ -76,5 +77,5 @@ func createLogsProcessor( nextConsumer consumer.Logs, ) (component.LogsProcessor, error) { level := set.MetricsLevel - return newBatchLogsProcessor(set, nextConsumer, cfg.(*Config), level) + return newBatchLogsProcessor(set, nextConsumer, cfg.(*Config), level, featuregate.GetRegistry()) } diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index a2a5a109c73..1cc6ded5197 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -15,14 +15,30 @@ package batchprocessor // import "go.opentelemetry.io/collector/processor/batchprocessor" import ( + "context" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + otelview "go.opentelemetry.io/otel/sdk/metric/view" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/obsreport" ) +const ( + scopeName = "go.opentelemetry.io/collector/processor/batchprocessor" +) + var ( processorTagKey = tag.MustNewKey(obsmetrics.ProcessorKey) statBatchSizeTriggerSend = stats.Int64("batch_size_trigger_send", "Number of times the batch was sent due to a size trigger", stats.UnitDimensionless) @@ -31,6 +47,13 @@ var ( statBatchSendSizeBytes = stats.Int64("batch_send_size_bytes", "Number of bytes in batch that was sent", stats.UnitBytes) ) +type trigger int + +const ( + triggerTimeout trigger = iota + triggerBatchSize +) + // MetricViews returns the metrics views related to batching func MetricViews() []*view.View { processorTagKeys := []tag.Key{processorTagKey} @@ -65,7 +88,7 @@ func MetricViews() []*view.View { Description: statBatchSendSizeBytes.Description(), TagKeys: processorTagKeys, Aggregation: view.Distribution(10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, - 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_00, 900_000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000), } @@ -76,3 +99,154 @@ func MetricViews() []*view.View { distributionBatchSendSizeBytesView, } } + +func OtelMetricsViews() ([]otelview.View, error) { + var views []otelview.View + var err error + + v, err := otelview.New( + otelview.MatchInstrumentName(obsreport.BuildProcessorCustomMetricName(typeStr, "batch_send_size")), + otelview.WithSetAggregation(aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + }), + ) + if err != nil { + return nil, err + } + views = append(views, v) + + v, err = otelview.New( + otelview.MatchInstrumentName(obsreport.BuildProcessorCustomMetricName(typeStr, "batch_send_size_bytes")), + otelview.WithSetAggregation(aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000}, + }), + ) + if err != nil { + return nil, err + } + views = append(views, v) + + return views, nil +} + +type batchProcessorTelemetry struct { + level configtelemetry.Level + detailed bool + useOtel bool + + exportCtx context.Context + + processorAttr []attribute.KeyValue + batchSizeTriggerSend syncint64.Counter + timeoutTriggerSend syncint64.Counter + batchSendSize syncint64.Histogram + batchSendSizeBytes syncint64.Histogram +} + +func newBatchProcessorTelemetry(mp metric.MeterProvider, cfg *Config, level configtelemetry.Level, registry *featuregate.Registry) (*batchProcessorTelemetry, error) { + exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, cfg.ID().String())) + if err != nil { + return nil, err + } + + bpt := &batchProcessorTelemetry{ + useOtel: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID), + processorAttr: []attribute.KeyValue{attribute.String(obsmetrics.ProcessorKey, cfg.ID().String())}, + exportCtx: exportCtx, + level: level, + detailed: level == configtelemetry.LevelDetailed, + } + + err = bpt.createOtelMetrics(mp) + if err != nil { + return nil, err + } + + return bpt, nil +} + +func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider) error { + if !bpt.useOtel { + return nil + } + + var err error + meter := mp.Meter(scopeName) + + bpt.batchSizeTriggerSend, err = meter.SyncInt64().Counter( + obsreport.BuildProcessorCustomMetricName(typeStr, "batch_size_trigger_send"), + instrument.WithDescription("Number of times the batch was sent due to a size trigger"), + instrument.WithUnit(unit.Dimensionless), + ) + if err != nil { + return err + } + + bpt.timeoutTriggerSend, err = meter.SyncInt64().Counter( + obsreport.BuildProcessorCustomMetricName(typeStr, "timeout_trigger_send"), + instrument.WithDescription("Number of times the batch was sent due to a timeout trigger"), + instrument.WithUnit(unit.Dimensionless), + ) + if err != nil { + return err + } + + bpt.batchSendSize, err = meter.SyncInt64().Histogram( + obsreport.BuildProcessorCustomMetricName(typeStr, "batch_send_size"), + instrument.WithDescription("Number of units in the batch"), + instrument.WithUnit(unit.Dimensionless), + ) + if err != nil { + return err + } + + bpt.batchSendSizeBytes, err = meter.SyncInt64().Histogram( + obsreport.BuildProcessorCustomMetricName(typeStr, "batch_send_size_bytes"), + instrument.WithDescription("Number of bytes in batch that was sent"), + instrument.WithUnit(unit.Bytes), + ) + if err != nil { + return err + } + + return nil +} + +func (bpt *batchProcessorTelemetry) record(trigger trigger, sent, bytes int64) { + if bpt.useOtel { + bpt.recordWithOtel(trigger, sent, bytes) + } else { + bpt.recordWithOC(trigger, sent, bytes) + } +} + +func (bpt *batchProcessorTelemetry) recordWithOC(trigger trigger, sent, bytes int64) { + var triggerMeasure *stats.Int64Measure + switch trigger { + case triggerBatchSize: + triggerMeasure = statBatchSizeTriggerSend + case triggerTimeout: + triggerMeasure = statTimeoutTriggerSend + } + + stats.Record(bpt.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(sent)) + if bpt.detailed { + stats.Record(bpt.exportCtx, statBatchSendSizeBytes.M(bytes)) + } +} + +func (bpt *batchProcessorTelemetry) recordWithOtel(trigger trigger, sent int64, bytes int64) { + switch trigger { + case triggerBatchSize: + bpt.batchSizeTriggerSend.Add(bpt.exportCtx, 1, bpt.processorAttr...) + case triggerTimeout: + bpt.timeoutTriggerSend.Add(bpt.exportCtx, 1, bpt.processorAttr...) + } + + bpt.batchSendSize.Record(bpt.exportCtx, sent, bpt.processorAttr...) + if bpt.detailed { + bpt.batchSendSizeBytes.Record(bpt.exportCtx, bytes, bpt.processorAttr...) + } +} diff --git a/processor/batchprocessor/metrics_test.go b/processor/batchprocessor/metrics_test.go index 167186e37b1..8517fd68eb0 100644 --- a/processor/batchprocessor/metrics_test.go +++ b/processor/batchprocessor/metrics_test.go @@ -15,9 +15,29 @@ package batchprocessor import ( + "context" + "fmt" + "math" + "net/http" + "net/http/httptest" "testing" + ocprom "contrib.go.opencensus.io/exporter/prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opencensus.io/stats/view" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/obsreportconfig" ) func TestBatchProcessorMetrics(t *testing.T) { @@ -32,3 +52,201 @@ func TestBatchProcessorMetrics(t *testing.T) { assert.Equal(t, "processor/batch/"+viewName, views[i].Name) } } + +type testTelemetry struct { + promHandler http.Handler + useOtel bool + meterProvider *sdkmetric.MeterProvider +} + +type expectedMetrics struct { + // processor_batch_batch_send_size_count + // processor_batch_batch_send_size_bytes_count + sendCount float64 + // processor_batch_batch_send_size_sum + sendSizeSum float64 + // processor_batch_batch_send_size_bytes_sum + sendSizeBytesSum float64 + + // processor_batch_batch_size_trigger_send + sizeTrigger float64 + + // processor_batch_batch_timeout_trigger_send + timeoutTrigger float64 +} + +func telemetryTest(t *testing.T, testFunc func(t *testing.T, tel testTelemetry, registry *featuregate.Registry)) { + t.Run("WithOC", func(t *testing.T) { + testFunc(t, setupTelemetry(t, false), featuregate.NewRegistry()) + }) + + t.Run("WithOTel", func(t *testing.T) { + registry := featuregate.NewRegistry() + obsreportconfig.RegisterInternalMetricFeatureGate(registry) + require.NoError(t, registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true})) + + testFunc(t, setupTelemetry(t, true), registry) + }) +} + +func setupTelemetry(t *testing.T, useOtel bool) testTelemetry { + views := MetricViews() + require.NoError(t, view.Register(views...)) + t.Cleanup(func() { view.Unregister(views...) }) + + telemetry := testTelemetry{ + useOtel: useOtel, + } + + if useOtel { + otelViews, err := OtelMetricsViews() + require.NoError(t, err) + + promReg := prometheus.NewRegistry() + exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits()) + require.NoError(t, err) + + telemetry.meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(resource.Empty()), + sdkmetric.WithReader(exporter, otelViews...), + ) + + telemetry.promHandler = promhttp.HandlerFor(promReg, promhttp.HandlerOpts{}) + + t.Cleanup(func() { assert.NoError(t, telemetry.meterProvider.Shutdown(context.Background())) }) + } else { + promReg := prometheus.NewRegistry() + + ocExporter, err := ocprom.NewExporter(ocprom.Options{Registry: promReg}) + require.NoError(t, err) + + telemetry.promHandler = ocExporter + + view.RegisterExporter(ocExporter) + t.Cleanup(func() { view.UnregisterExporter(ocExporter) }) + } + + return telemetry +} + +func (tt *testTelemetry) NewProcessorCreateSettings() component.ProcessorCreateSettings { + settings := componenttest.NewNopProcessorCreateSettings() + settings.MeterProvider = tt.meterProvider + + return settings +} + +func (tt *testTelemetry) assertMetrics(t *testing.T, expected expectedMetrics) { + for _, v := range MetricViews() { + // Forces a flush for the opencensus view data. + _, _ = view.RetrieveData(v.Name) + } + + req, err := http.NewRequest("GET", "/metrics", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + tt.promHandler.ServeHTTP(rr, req) + + var parser expfmt.TextParser + metrics, err := parser.TextToMetricFamilies(rr.Body) + require.NoError(t, err) + + if expected.sendSizeBytesSum > 0 { + name := "processor_batch_batch_send_size_bytes" + metric := tt.getMetric(t, name, io_prometheus_client.MetricType_HISTOGRAM, metrics) + + assertFloat(t, expected.sendSizeBytesSum, metric.GetHistogram().GetSampleSum(), name) + assertFloat(t, expected.sendCount, float64(metric.GetHistogram().GetSampleCount()), name) + + tt.assertBoundaries(t, + []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, + 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, + 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000, math.Inf(1)}, + metric.GetHistogram(), + name, + ) + } + + if expected.sendSizeSum > 0 { + name := "processor_batch_batch_send_size" + metric := tt.getMetric(t, name, io_prometheus_client.MetricType_HISTOGRAM, metrics) + + assertFloat(t, expected.sendSizeSum, metric.GetHistogram().GetSampleSum(), name) + assertFloat(t, expected.sendCount, float64(metric.GetHistogram().GetSampleCount()), name) + + tt.assertBoundaries(t, + []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000, math.Inf(1)}, + metric.GetHistogram(), + name, + ) + } + + if expected.sizeTrigger > 0 { + name := "processor_batch_batch_size_trigger_send" + metric := tt.getMetric(t, name, io_prometheus_client.MetricType_COUNTER, metrics) + + assertFloat(t, expected.sizeTrigger, metric.GetCounter().GetValue(), name) + } + + if expected.timeoutTrigger > 0 { + name := "processor_batch_timeout_trigger_send" + metric := tt.getMetric(t, name, io_prometheus_client.MetricType_COUNTER, metrics) + + assertFloat(t, expected.timeoutTrigger, metric.GetCounter().GetValue(), name) + } +} + +func (tt *testTelemetry) assertBoundaries(t *testing.T, expected []float64, histogram *io_prometheus_client.Histogram, metric string) { + var got []float64 + for _, bucket := range histogram.GetBucket() { + got = append(got, bucket.GetUpperBound()) + } + + if len(expected) != len(got) { + assert.Failf(t, "different boundaries size", "metric '%s' expected boundaries '%x' but got '%x'", metric, expected, got) + return + } + + for i := range expected { + if math.Abs(expected[i]-got[i]) > 0.00001 { + assert.Failf(t, "unexpected boundary", "boundary for metric '%s' did no match, expected '%f' got '%f'", metric, expected[i], got[i]) + } + } + +} + +func (tt *testTelemetry) getMetric(t *testing.T, name string, mtype io_prometheus_client.MetricType, got map[string]*io_prometheus_client.MetricFamily) *io_prometheus_client.Metric { + if tt.useOtel && mtype == io_prometheus_client.MetricType_COUNTER { + // OTel Go suffixes counters with `_total` + name += "_total" + } + + metricFamily, ok := got[name] + require.True(t, ok, "expected metric '%s' not found", name) + require.Equal(t, mtype, metricFamily.GetType()) + + metric, err := getSingleMetric(metricFamily) + require.NoError(t, err) + + return metric +} + +func getSingleMetric(metric *io_prometheus_client.MetricFamily) (*io_prometheus_client.Metric, error) { + if l := len(metric.Metric); l != 1 { + return nil, fmt.Errorf("expected metric '%s' with one set of attributes, but found %d", metric.GetName(), l) + } + first := metric.Metric[0] + + if len(first.Label) != 1 || "processor" != first.Label[0].GetName() || "batch" != first.Label[0].GetValue() { + return nil, fmt.Errorf("expected metric '%s' with a single `batch=processor` attribute but got '%s'", metric.GetName(), first.GetLabel()) + } + + return first, nil +} + +func assertFloat(t *testing.T, expected, got float64, metric string) { + if math.Abs(expected-got) > 0.00001 { + assert.Failf(t, "unexpected metric value", "value for metric '%s' did no match, expected '%f' got '%f'", metric, expected, got) + } +} diff --git a/service/telemetry.go b/service/telemetry.go index 91ce9643638..7c93d97916b 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -23,6 +23,8 @@ import ( "sync" "unicode" + otelview "go.opentelemetry.io/otel/sdk/metric/view" + ocprom "contrib.go.opencensus.io/exporter/prometheus" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" @@ -229,6 +231,14 @@ func (tel *telemetryInitializer) initOpenTelemetry(attrs map[string]string, prom resAttrs = append(resAttrs, attribute.String(k, v)) } + var views []otelview.View + + batchViews, err := batchprocessor.OtelMetricsViews() + if err != nil { + return fmt.Errorf("error creating otel metrics views for batch processor: %w", err) + } + views = append(views, batchViews...) + res, err := resource.New(context.Background(), resource.WithAttributes(resAttrs...)) if err != nil { return fmt.Errorf("error creating otel resources: %w", err) @@ -241,7 +251,7 @@ func (tel *telemetryInitializer) initOpenTelemetry(attrs map[string]string, prom } tel.mp = sdkmetric.NewMeterProvider( sdkmetric.WithResource(res), - sdkmetric.WithReader(exporter), + sdkmetric.WithReader(exporter, views...), ) return nil