forked from open-telemetry/opentelemetry-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[exporterqueue] Default Batcher that reads from the queue, batches an…
…d exports (open-telemetry#11546) #### Description This PR follows open-telemetry#11540 and implements support for item-count based batching for queue batcher. Limitation: This PR supports merging request but not splitting request. In other words, it support specifying a minimum request size but not a maximum request size. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
- Loading branch information
1 parent
37a214b
commit be60e2e
Showing
5 changed files
with
368 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"sync" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
) | ||
|
||
// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout. | ||
type DefaultBatcher struct { | ||
BaseBatcher | ||
currentBatchMu sync.Mutex | ||
currentBatch *batch | ||
timer *time.Timer | ||
shutdownCh chan bool | ||
} | ||
|
||
func (qb *DefaultBatcher) resetTimer() { | ||
if qb.batchCfg.FlushTimeout != 0 { | ||
qb.timer.Reset(qb.batchCfg.FlushTimeout) | ||
} | ||
} | ||
|
||
// startReadingFlushingGoroutine starts a goroutine that reads and then flushes. | ||
func (qb *DefaultBatcher) startReadingFlushingGoroutine() { | ||
|
||
qb.stopWG.Add(1) | ||
go func() { | ||
defer qb.stopWG.Done() | ||
for { | ||
// Read() blocks until the queue is non-empty or until the queue is stopped. | ||
idx, ctx, req, ok := qb.queue.Read(context.Background()) | ||
if !ok { | ||
qb.shutdownCh <- true | ||
return | ||
} | ||
|
||
qb.currentBatchMu.Lock() | ||
if qb.currentBatch == nil || qb.currentBatch.req == nil { | ||
qb.resetTimer() | ||
qb.currentBatch = &batch{ | ||
req: req, | ||
ctx: ctx, | ||
idxList: []uint64{idx}} | ||
} else { | ||
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req) | ||
if mergeErr != nil { | ||
qb.queue.OnProcessingFinished(idx, mergeErr) | ||
qb.currentBatchMu.Unlock() | ||
continue | ||
} | ||
qb.currentBatch = &batch{ | ||
req: mergedReq, | ||
ctx: qb.currentBatch.ctx, | ||
idxList: append(qb.currentBatch.idxList, idx)} | ||
} | ||
|
||
if qb.currentBatch.req.ItemsCount() > qb.batchCfg.MinSizeItems { | ||
batchToFlush := *qb.currentBatch | ||
qb.currentBatch = nil | ||
qb.currentBatchMu.Unlock() | ||
|
||
// flushAsync() blocks until successfully started a goroutine for flushing. | ||
qb.flushAsync(batchToFlush) | ||
qb.resetTimer() | ||
} else { | ||
qb.currentBatchMu.Unlock() | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout. | ||
func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { | ||
qb.stopWG.Add(1) | ||
go func() { | ||
defer qb.stopWG.Done() | ||
for { | ||
select { | ||
case <-qb.shutdownCh: | ||
return | ||
case <-qb.timer.C: | ||
qb.currentBatchMu.Lock() | ||
if qb.currentBatch == nil || qb.currentBatch.req == nil { | ||
qb.currentBatchMu.Unlock() | ||
continue | ||
} | ||
batchToFlush := *qb.currentBatch | ||
qb.currentBatch = nil | ||
qb.currentBatchMu.Unlock() | ||
|
||
// flushAsync() blocks until successfully started a goroutine for flushing. | ||
qb.flushAsync(batchToFlush) | ||
qb.resetTimer() | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// Start starts the goroutine that reads from the queue and flushes asynchronously. | ||
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { | ||
qb.startWorkerPool() | ||
qb.shutdownCh = make(chan bool, 1) | ||
|
||
if qb.batchCfg.FlushTimeout == 0 { | ||
qb.timer = time.NewTimer(math.MaxInt) | ||
qb.timer.Stop() | ||
} else { | ||
qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout) | ||
} | ||
|
||
qb.startReadingFlushingGoroutine() | ||
qb.startTimeBasedFlushingGoroutine() | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
maxWorkers int | ||
}{ | ||
{ | ||
name: "infinate_workers", | ||
maxWorkers: 0, | ||
}, | ||
{ | ||
name: "one_worker", | ||
maxWorkers: 1, | ||
}, | ||
{ | ||
name: "three_workers", | ||
maxWorkers: 3, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = true | ||
cfg.FlushTimeout = 0 | ||
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ | ||
MinSizeItems: 0, | ||
} | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
ba, err := NewBatcher(cfg, q, tt.maxWorkers) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) | ||
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) | ||
t.Cleanup(func() { | ||
require.NoError(t, q.Shutdown(context.Background())) | ||
require.NoError(t, ba.Shutdown(context.Background())) | ||
}) | ||
|
||
sink := newFakeRequestSink() | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 75 | ||
}, 30*time.Millisecond, 10*time.Millisecond) | ||
}) | ||
} | ||
} | ||
|
||
func TestDefaultBatcher_TimeoutDisabled(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
maxWorkers int | ||
}{ | ||
{ | ||
name: "infinate_workers", | ||
maxWorkers: 0, | ||
}, | ||
{ | ||
name: "one_worker", | ||
maxWorkers: 1, | ||
}, | ||
{ | ||
name: "three_workers", | ||
maxWorkers: 3, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = true | ||
cfg.FlushTimeout = 0 | ||
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ | ||
MinSizeItems: 10, | ||
} | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
ba, err := NewBatcher(cfg, q, tt.maxWorkers) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) | ||
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) | ||
t.Cleanup(func() { | ||
require.NoError(t, q.Shutdown(context.Background())) | ||
require.NoError(t, ba.Shutdown(context.Background())) | ||
}) | ||
|
||
sink := newFakeRequestSink() | ||
|
||
// These two requests will be dropped because of export error. | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 7, sink: sink})) | ||
|
||
// This request will be dropped because of merge error | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink})) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 55 | ||
}, 30*time.Millisecond, 10*time.Millisecond) | ||
}) | ||
} | ||
} | ||
|
||
func TestDefaultBatcher_WithTimeout(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
maxWorkers int | ||
}{ | ||
{ | ||
name: "infinate_workers", | ||
maxWorkers: 0, | ||
}, | ||
{ | ||
name: "one_worker", | ||
maxWorkers: 1, | ||
}, | ||
{ | ||
name: "three_workers", | ||
maxWorkers: 3, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = true | ||
cfg.FlushTimeout = 50 * time.Millisecond | ||
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{ | ||
MinSizeItems: 100, | ||
} | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
ba, err := NewBatcher(cfg, q, tt.maxWorkers) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) | ||
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) | ||
t.Cleanup(func() { | ||
require.NoError(t, q.Shutdown(context.Background())) | ||
require.NoError(t, ba.Shutdown(context.Background())) | ||
}) | ||
|
||
sink := newFakeRequestSink() | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) | ||
|
||
// This request will be dropped because of merge error | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink})) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 75 | ||
}, 100*time.Millisecond, 10*time.Millisecond) | ||
}) | ||
} | ||
} | ||
|
||
func TestDisabledBatcher_SplitNotImplemented(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = true | ||
maxWorkers := 0 | ||
cfg.MaxSizeConfig.MaxSizeItems = 1 | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
_, err := NewBatcher(cfg, q, maxWorkers) | ||
require.Error(t, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.