diff --git a/.chloggen/11637-exporter-queue-batcher.yaml b/.chloggen/11637-exporter-queue-batcher.yaml new file mode 100644 index 00000000000..b543696457a --- /dev/null +++ b/.chloggen/11637-exporter-queue-batcher.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterqueue + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change exporter queue batching to use a pulling model. + +# One or more tracking issues or pull requests related to the change +issues: [8122, 10368] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + If both queuing and batching is enabled for exporter, we now use a pulling model instead of a + pushing model. num_consumer in queue configuration is now used to specify the maximum number of + concurrent workers that are sending out the request. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 98b096ed9b7..229c997ebd1 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -27,7 +27,7 @@ import ( var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister( "telemetry.UsePullingBasedExporterQueueBatcher", - featuregate.StageBeta, + featuregate.StageAlpha, featuregate.WithRegisterFromVersion("v0.114.0"), featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), ) @@ -102,13 +102,14 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe ExporterSettings: be.Set, }, be.queueCfg) - be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) + be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) for _, op := range options { err = multierr.Append(err, op(be)) } } - if be.BatcherCfg.Enabled { + if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || + usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 61edb5cf9fd..020b2a8d169 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -158,7 +158,6 @@ func TestBatchSender_BatchExportError(t *testing.T) { assert.Eventually(t, func() bool { return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems && - be.BatchSender.(*BatchSender).activeRequests.Load() == 0 && be.QueueSender.(*QueueSender).queue.Size() == 0 }, 100*time.Millisecond, 10*time.Millisecond) }) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 9b53a340146..d4ff714afc4 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/exporter/internal/queue" @@ -71,14 +72,20 @@ type QueueSender struct { queue exporterqueue.Queue[internal.Request] numConsumers int traceAttribute attribute.KeyValue + batcher queue.Batcher consumers *queue.Consumers[internal.Request] obsrep *ObsReport exporterID component.ID } -func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int, - exportFailureMessage string, obsrep *ObsReport) *QueueSender { +func NewQueueSender( + q exporterqueue.Queue[internal.Request], + set exporter.Settings, + numConsumers int, + exportFailureMessage string, + obsrep *ObsReport, + batcherCfg exporterbatcher.Config) *QueueSender { qs := &QueueSender{ queue: q, numConsumers: numConsumers, @@ -86,15 +93,28 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin obsrep: obsrep, exporterID: set.ID, } - consumeFunc := func(ctx context.Context, req internal.Request) error { - err := qs.NextSender.Send(ctx, req) - if err != nil { - set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, - zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + + if usePullingBasedExporterQueueBatcher.IsEnabled() { + exportFunc := func(ctx context.Context, req internal.Request) error { + err := qs.NextSender.Send(ctx, req) + if err != nil { + set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + } + return err } - return err + qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers) + } else { + consumeFunc := func(ctx context.Context, req internal.Request) error { + err := qs.NextSender.Send(ctx, req) + if err != nil { + set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + } + return err + } + qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc) } - qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc) return qs } @@ -103,8 +123,15 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { if err := qs.queue.Start(ctx, host); err != nil { return err } - if err := qs.consumers.Start(ctx, host); err != nil { - return err + + if usePullingBasedExporterQueueBatcher.IsEnabled() { + if err := qs.batcher.Start(ctx, host); err != nil { + return err + } + } else { + if err := qs.consumers.Start(ctx, host); err != nil { + return err + } } dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String()) @@ -123,6 +150,9 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { if err := qs.queue.Shutdown(ctx); err != nil { return err } + if usePullingBasedExporterQueueBatcher.IsEnabled() { + return qs.batcher.Shutdown(ctx) + } return qs.consumers.Shutdown(ctx) } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 44735eaebfa..41dbbdbc38d 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" @@ -540,7 +541,7 @@ func TestQueueSenderNoStartShutdown(t *testing.T) { ExporterCreateSettings: exportertest.NewNopSettings(), }) require.NoError(t, err) - qs := NewQueueSender(queue, set, 1, "", obsrep) + qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig()) assert.NoError(t, qs.Shutdown(context.Background())) }) } diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index a815a83bba9..2884159341b 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -132,6 +132,11 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { + // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. + if qb.maxWorkers == -1 { + return nil + } + qb.startWorkerPool() qb.shutdownCh = make(chan bool, 1) diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 97a3fd32510..db08c546b4a 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -17,6 +17,11 @@ type DisabledBatcher struct { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { + // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. + if qb.maxWorkers == -1 { + return nil + } + qb.startWorkerPool() // This goroutine reads and then flushes.