Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] [exporterqueue] Add splitting to the experimental pull-based batcher #11580

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -44,10 +43,6 @@ func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request],
}, nil
}

if batchCfg.MaxSizeConfig.MaxSizeItems != 0 {
return nil, errors.ErrUnsupported
}

return &DefaultBatcher{
BaseBatcher: BaseBatcher{
batchCfg: batchCfg,
Expand Down
87 changes: 63 additions & 24 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/internal"
)

// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout.
Expand Down Expand Up @@ -41,35 +42,73 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}

qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx}}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)

if qb.batchCfg.MaxSizeItems > 0 {
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
} else {
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || reqList == nil {
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx)}
}

if qb.currentBatch.req.ItemsCount() > qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flushAsync(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx}})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
req: reqList[0],
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
ctx: ctx,
idxList: []uint64{idx}}
qb.currentBatchMu.Unlock()
}
} else {
qb.currentBatchMu.Unlock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx}}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx)}
}

if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
}
}
}
}()
Expand Down
84 changes: 69 additions & 15 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) {
func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) {
}
}

func TestDefaultBatcher_TimeoutDisabled(t *testing.T) {
func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestDefaultBatcher_TimeoutDisabled(t *testing.T) {
}
}

func TestDefaultBatcher_WithTimeout(t *testing.T) {
func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
tests := []struct {
name string
maxWorkers int
Expand Down Expand Up @@ -200,18 +200,72 @@ func TestDefaultBatcher_WithTimeout(t *testing.T) {
}
}

func TestDisabledBatcher_SplitNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0
cfg.MaxSizeConfig.MaxSizeItems = 1
func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
},
{
name: "three_workers",
maxWorkers: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
cfg.FlushTimeout = 0
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{
MinSizeItems: 100,
}
cfg.MaxSizeConfig = exporterbatcher.MaxSizeConfig{
MaxSizeItems: 100,
}

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})
q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))

// This request will be dropped because of merge error
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink}))

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 30, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 105
}, 100*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 900, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 11 && sink.itemsCount.Load() == 1005
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}
51 changes: 47 additions & 4 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import (
"context"
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -68,7 +67,51 @@
}, nil
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
_ internal.Request) ([]internal.Request, error) {
return nil, errors.New("not implemented")
func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig,
r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
}

Check warning on line 74 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L73-L74

Added lines #L73 - L74 were not covered by tests

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
r, err := r.Merge(ctx, r2)
return []internal.Request{r}, err
}

Check warning on line 80 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L78-L80

Added lines #L78 - L80 were not covered by tests

var fr2 *fakeRequest
if r2 == nil {
fr2 = &fakeRequest{sink: r.sink, exportErr: r.exportErr, delay: r.delay}
} else {
if r2.(*fakeRequest).mergeErr != nil {
return nil, r2.(*fakeRequest).mergeErr
}
fr2 = r2.(*fakeRequest)
fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}
}
var res []internal.Request
r = &fakeRequest{items: r.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}
if fr2.items <= maxItems-r.items {
r.items += fr2.items
if fr2.exportErr != nil {
r.exportErr = fr2.exportErr
}

Check warning on line 98 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L97-L98

Added lines #L97 - L98 were not covered by tests
return []internal.Request{r}, nil
}
// if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases
fr2.items -= maxItems - r.items
r.items = maxItems
res = append(res, r)

// split fr2 to maxItems
for {
if fr2.items <= maxItems {
res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay})
break
}
res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay})
fr2.items -= maxItems
}

return res, nil
}
Loading