Skip to content

Commit

Permalink
Flip on queue batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Nov 11, 2024
1 parent 1703ce6 commit 2900101
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 144 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if be.BatcherCfg.Enabled {
if be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
100 changes: 0 additions & 100 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func TestBatchSender_BatchExportError(t *testing.T) {
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests &&
sink.itemsCount.Load() == tt.expectedItems &&
be.BatchSender.(*BatchSender).activeRequests.Load() == 0 &&
be.QueueSender.(*QueueSender).queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)
})
Expand Down Expand Up @@ -272,105 +271,6 @@ func TestBatchSender_PostShutdown(t *testing.T) {
assert.Equal(t, int64(8), sink.itemsCount.Load())
}

func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810")
}
tests := []struct {
name string
batcherCfg exporterbatcher.Config
expectedRequests int64
expectedItems int64
}{
{
name: "merge_only",
batcherCfg: func() exporterbatcher.Config {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 20 * time.Millisecond
return cfg
}(),
expectedRequests: 6,
expectedItems: 51,
},
{
name: "merge_without_split_triggered",
batcherCfg: func() exporterbatcher.Config {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 20 * time.Millisecond
cfg.MaxSizeItems = 200
return cfg
}(),
expectedRequests: 6,
expectedItems: 51,
},
{
name: "merge_with_split_triggered",
batcherCfg: func() exporterbatcher.Config {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 50 * time.Millisecond
cfg.MaxSizeItems = 10
return cfg
}(),
expectedRequests: 8,
expectedItems: 51,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(tt.batcherCfg),
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.NotNil(t, be)
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

sink := newFakeRequestSink()
// the 1st and 2nd request should be flushed in the same batched request by max concurrency limit.
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4
}, 100*time.Millisecond, 10*time.Millisecond)

// the 3rd request should be flushed by itself due to flush interval
require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6
}, 100*time.Millisecond, 10*time.Millisecond)

// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10
}, 100*time.Millisecond, 10*time.Millisecond)

// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 5, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink}))
if tt.batcherCfg.MaxSizeItems == 10 {
// in case of MaxSizeItems=10, wait for the leftover request to send
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21
}, 50*time.Millisecond, 10*time.Millisecond)
}

assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink}))
assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 20, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestBatchSender_BatchBlocking(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 3
Expand Down
21 changes: 14 additions & 7 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/queue"
Expand Down Expand Up @@ -71,30 +72,36 @@ type QueueSender struct {
queue exporterqueue.Queue[internal.Request]
numConsumers int
traceAttribute attribute.KeyValue
consumers *queue.Consumers[internal.Request]
batcher queue.Batcher

obsrep *ObsReport
exporterID component.ID
}

func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int,
exportFailureMessage string, obsrep *ObsReport) *QueueSender {
func NewQueueSender(
q exporterqueue.Queue[internal.Request],
set exporter.Settings,
numConsumers int,
exportFailureMessage string,
obsrep *ObsReport,
batcherCfg exporterbatcher.Config) *QueueSender {
qs := &QueueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req internal.Request) error {

exportFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)
qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers)
return qs
}

Expand All @@ -103,7 +110,7 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.queue.Start(ctx, host); err != nil {
return err
}
if err := qs.consumers.Start(ctx, host); err != nil {
if err := qs.batcher.Start(ctx, host); err != nil {
return err
}

Expand All @@ -123,7 +130,7 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
if err := qs.queue.Shutdown(ctx); err != nil {
return err
}
return qs.consumers.Shutdown(ctx)
return qs.batcher.Shutdown(ctx)
}

// send implements the requestSender interface. It puts the request in the queue.
Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
Expand Down Expand Up @@ -212,7 +213,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
require.NoError(t, err)

qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
qCfg.NumConsumers = -1 // to make QueueMetricsReportedvery request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := NewBaseExporter(set, dataType, newObservabilityConsumerSender,
Expand Down Expand Up @@ -437,6 +438,6 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
ExporterCreateSettings: exportertest.NewNopSettings(),
})
require.NoError(t, err)
qs := NewQueueSender(queue, set, 1, "", obsrep)
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig())
assert.NoError(t, qs.Shutdown(context.Background()))
}
9 changes: 2 additions & 7 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type BaseBatcher struct {
stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config,
func NewBatcher(
batchCfg exporterbatcher.Config,
queue Queue[internal.Request],
exportFunc func(ctx context.Context, req internal.Request) error,
maxWorkers int) (Batcher, error) {
Expand Down Expand Up @@ -94,9 +95,3 @@ func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.workerPool <- true
}()
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *BaseBatcher) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}
40 changes: 28 additions & 12 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,19 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.currentBatchMu.Unlock()
continue
}
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
qb.flushCurrentBatchIfNecessary()
}
}
}()
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
}

qb.startWorkerPool()
qb.shutdownCh = make(chan bool, 1)

Expand All @@ -155,6 +149,28 @@ func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {

qb.startReadingFlushingGoroutine()
qb.startTimeBasedFlushingGoroutine()
return nil
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.currentBatchMu.Unlock()
return
}
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *DefaultBatcher) Shutdown(_ context.Context) error {
qb.flushCurrentBatchIfNecessary()
qb.stopWG.Wait()
return nil
}
24 changes: 12 additions & 12 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -110,9 +110,9 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -176,9 +176,9 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -242,9 +242,9 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
Capacity: 10,
})

ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
10 changes: 10 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type DisabledBatcher struct {

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
}

qb.startWorkerPool()

// This goroutine reads and then flushes.
Expand All @@ -38,3 +43,8 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
}()
return nil
}

func (qb *DisabledBatcher) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}
5 changes: 3 additions & 2 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
})

ba, err := NewBatcher(cfg, q,
func(ctx context.Context, req internal.Request) error { return req.Export(ctx) },
tt.maxWorkers)
func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
Expand Down

0 comments on commit 2900101

Please sign in to comment.