diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 2f728b07366..303683fdf27 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -33,12 +33,23 @@ type BaseBatcher struct { } func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) { - if batchCfg.Enabled { + if !batchCfg.Enabled { + return &DisabledBatcher{ + BaseBatcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + stopWG: sync.WaitGroup{}, + }, + }, nil + } + + if batchCfg.MaxSizeConfig.MaxSizeItems != 0 { return nil, errors.ErrUnsupported } - return &DisabledBatcher{ - BaseBatcher{ + return &DefaultBatcher{ + BaseBatcher: BaseBatcher{ batchCfg: batchCfg, queue: queue, maxWorkers: maxWorkers, diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go new file mode 100644 index 00000000000..092cf6a2bb3 --- /dev/null +++ b/exporter/internal/queue/default_batcher.go @@ -0,0 +1,122 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "math" + "sync" + "time" + + "go.opentelemetry.io/collector/component" +) + +// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout. +type DefaultBatcher struct { + BaseBatcher + currentBatchMu sync.Mutex + currentBatch *batch + timer *time.Timer + shutdownCh chan bool +} + +func (qb *DefaultBatcher) resetTimer() { + if qb.batchCfg.FlushTimeout != 0 { + qb.timer.Reset(qb.batchCfg.FlushTimeout) + } +} + +// startReadingFlushingGoroutine starts a goroutine that reads and then flushes. +func (qb *DefaultBatcher) startReadingFlushingGoroutine() { + + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + // Read() blocks until the queue is non-empty or until the queue is stopped. + idx, ctx, req, ok := qb.queue.Read(context.Background()) + if !ok { + qb.shutdownCh <- true + return + } + + qb.currentBatchMu.Lock() + if qb.currentBatch == nil || qb.currentBatch.req == nil { + qb.resetTimer() + qb.currentBatch = &batch{ + req: req, + ctx: ctx, + idxList: []uint64{idx}} + } else { + mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req) + if mergeErr != nil { + qb.queue.OnProcessingFinished(idx, mergeErr) + qb.currentBatchMu.Unlock() + continue + } + qb.currentBatch = &batch{ + req: mergedReq, + ctx: qb.currentBatch.ctx, + idxList: append(qb.currentBatch.idxList, idx)} + } + + if qb.currentBatch.req.ItemsCount() > qb.batchCfg.MinSizeItems { + batchToFlush := *qb.currentBatch + qb.currentBatch = nil + qb.currentBatchMu.Unlock() + + // flushAsync() blocks until successfully started a goroutine for flushing. + qb.flushAsync(batchToFlush) + qb.resetTimer() + } else { + qb.currentBatchMu.Unlock() + } + } + }() +} + +// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout. +func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + select { + case <-qb.shutdownCh: + return + case <-qb.timer.C: + qb.currentBatchMu.Lock() + if qb.currentBatch == nil || qb.currentBatch.req == nil { + qb.currentBatchMu.Unlock() + continue + } + batchToFlush := *qb.currentBatch + qb.currentBatch = nil + qb.currentBatchMu.Unlock() + + // flushAsync() blocks until successfully started a goroutine for flushing. + qb.flushAsync(batchToFlush) + qb.resetTimer() + } + } + }() +} + +// Start starts the goroutine that reads from the queue and flushes asynchronously. +func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { + qb.startWorkerPool() + qb.shutdownCh = make(chan bool, 1) + + if qb.batchCfg.FlushTimeout == 0 { + qb.timer = time.NewTimer(math.MaxInt) + qb.timer.Stop() + } else { + qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout) + } + + qb.startReadingFlushingGoroutine() + qb.startTimeBasedFlushingGoroutine() + + return nil +} diff --git a/exporter/internal/queue/default_batcher_test.go b/exporter/internal/queue/default_batcher_test.go new file mode 100644 index 00000000000..068d0dec081 --- /dev/null +++ b/exporter/internal/queue/default_batcher_test.go @@ -0,0 +1,217 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) { + tests := []struct { + name string + maxWorkers int + }{ + { + name: "infinate_workers", + maxWorkers: 0, + }, + { + name: "one_worker", + maxWorkers: 1, + }, + { + name: "three_workers", + maxWorkers: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = true + cfg.FlushTimeout = 0 + cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ + MinSizeItems: 0, + } + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + ba, err := NewBatcher(cfg, q, tt.maxWorkers) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 75 + }, 30*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestDefaultBatcher_TimeoutDisabled(t *testing.T) { + tests := []struct { + name string + maxWorkers int + }{ + { + name: "infinate_workers", + maxWorkers: 0, + }, + { + name: "one_worker", + maxWorkers: 1, + }, + { + name: "three_workers", + maxWorkers: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = true + cfg.FlushTimeout = 0 + cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ + MinSizeItems: 10, + } + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + ba, err := NewBatcher(cfg, q, tt.maxWorkers) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // These two requests will be dropped because of export error. + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 7, sink: sink})) + + // This request will be dropped because of merge error + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink})) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 55 + }, 30*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestDefaultBatcher_WithTimeout(t *testing.T) { + tests := []struct { + name string + maxWorkers int + }{ + { + name: "infinate_workers", + maxWorkers: 0, + }, + { + name: "one_worker", + maxWorkers: 1, + }, + { + name: "three_workers", + maxWorkers: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = true + cfg.FlushTimeout = 50 * time.Millisecond + cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ + MinSizeItems: 100, + } + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + ba, err := NewBatcher(cfg, q, tt.maxWorkers) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + + // This request will be dropped because of merge error + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink})) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 75 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestDisabledBatcher_SplitNotImplemented(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = true + maxWorkers := 0 + cfg.MaxSizeConfig.MaxSizeItems = 1 + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + _, err := NewBatcher(cfg, q, maxWorkers) + require.Error(t, err) +} diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go index c9c69a61f05..036fa961720 100644 --- a/exporter/internal/queue/disabled_batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -70,18 +70,3 @@ func TestDisabledBatcher_Basic(t *testing.T) { }) } } - -func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = true - maxWorkers := 0 - - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) - - _, err := NewBatcher(cfg, q, maxWorkers) - require.Error(t, err) -} diff --git a/exporter/internal/queue/fake_request.go b/exporter/internal/queue/fake_request.go index a0983db6d35..6354063af52 100644 --- a/exporter/internal/queue/fake_request.go +++ b/exporter/internal/queue/fake_request.go @@ -28,8 +28,10 @@ func newFakeRequestSink() *fakeRequestSink { type fakeRequest struct { items int exportErr error - delay time.Duration - sink *fakeRequestSink + + mergeErr error + delay time.Duration + sink *fakeRequestSink } func (r *fakeRequest) Export(ctx context.Context) error { @@ -53,8 +55,17 @@ func (r *fakeRequest) ItemsCount() int { } func (r *fakeRequest) Merge(_ context.Context, - _ internal.Request) (internal.Request, error) { - return nil, errors.New("not implemented") + r2 internal.Request) (internal.Request, error) { + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: r.items + fr2.items, + sink: r.sink, + exportErr: fr2.exportErr, + delay: r.delay + fr2.delay, + }, nil } func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,