diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index dce35d7b16a..08d51aa83d9 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -94,9 +94,3 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) { qb.workerPool <- true }() } - -// Shutdown ensures that queue and all Batcher are stopped. -func (qb *BaseBatcher) Shutdown(_ context.Context) error { - qb.stopWG.Wait() - return nil -} diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index 744b067bc2c..a815a83bba9 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -124,18 +124,7 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { case <-qb.shutdownCh: return case <-qb.timer.C: - qb.currentBatchMu.Lock() - if qb.currentBatch == nil || qb.currentBatch.req == nil { - qb.currentBatchMu.Unlock() - continue - } - batchToFlush := *qb.currentBatch - qb.currentBatch = nil - qb.currentBatchMu.Unlock() - - // flushAsync() blocks until successfully started a goroutine for flushing. - qb.flushAsync(batchToFlush) - qb.resetTimer() + qb.flushCurrentBatchIfNecessary() } } }() @@ -155,6 +144,28 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { qb.startReadingFlushingGoroutine() qb.startTimeBasedFlushingGoroutine() + return nil +} + +// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil +func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { + qb.currentBatchMu.Lock() + if qb.currentBatch == nil || qb.currentBatch.req == nil { + qb.currentBatchMu.Unlock() + return + } + batchToFlush := *qb.currentBatch + qb.currentBatch = nil + qb.currentBatchMu.Unlock() + + // flushAsync() blocks until successfully started a goroutine for flushing. + qb.flushAsync(batchToFlush) + qb.resetTimer() +} +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *DefaultBatcher) Shutdown(_ context.Context) error { + qb.flushCurrentBatchIfNecessary() + qb.stopWG.Wait() return nil } diff --git a/exporter/internal/queue/default_batcher_test.go b/exporter/internal/queue/default_batcher_test.go index 3909ae02070..fd415d5fec9 100644 --- a/exporter/internal/queue/default_batcher_test.go +++ b/exporter/internal/queue/default_batcher_test.go @@ -277,3 +277,40 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) { }) } } + +func TestDefaultBatcher_Shutdown(t *testing.T) { + batchCfg := exporterbatcher.NewDefaultConfig() + batchCfg.MinSizeItems = 10 + batchCfg.FlushTimeout = 100 * time.Second + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + ba, err := NewBatcher(batchCfg, q, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + 2) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 1, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) + + // Give the batcher some time to read from queue + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, int64(0), sink.requestsCount.Load()) + assert.Equal(t, int64(0), sink.itemsCount.Load()) + + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(3), sink.itemsCount.Load()) +} diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 6eb1df1dace..97a3fd32510 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -38,3 +38,9 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { }() return nil } + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *DisabledBatcher) Shutdown(_ context.Context) error { + qb.stopWG.Wait() + return nil +}