forked from open-telemetry/opentelemetry-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[exporterqueue] Bare minimum frame of queue batcher + unit test. (ope…
…n-telemetry#11532) #### Description This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace `consumers` in `queue_sender`, and thus moving queue-batch from a pulling model instead of pushing model. Limitations of the current code * This implements only the case where batching is disabled, which means no merge of splitting of requests + no timeout flushing. * This implementation does not enforce an upper bound on concurrency All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from open-telemetry#11507 for easier review. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue open-telemetry#8122 open-telemetry#10368
- Loading branch information
1 parent
7b0a4a4
commit d012680
Showing
4 changed files
with
264 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
type batch struct { | ||
ctx context.Context | ||
req internal.Request | ||
idxList []uint64 | ||
} | ||
|
||
// Batcher is in charge of reading items from the queue and send them out asynchronously. | ||
type Batcher interface { | ||
component.Component | ||
} | ||
|
||
type BaseBatcher struct { | ||
batchCfg exporterbatcher.Config | ||
queue Queue[internal.Request] | ||
maxWorkers int | ||
stopWG sync.WaitGroup | ||
} | ||
|
||
func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) { | ||
if maxWorkers != 0 { | ||
return nil, errors.ErrUnsupported | ||
} | ||
|
||
if batchCfg.Enabled { | ||
return nil, errors.ErrUnsupported | ||
} | ||
|
||
return &DisabledBatcher{ | ||
BaseBatcher{ | ||
batchCfg: batchCfg, | ||
queue: queue, | ||
maxWorkers: maxWorkers, | ||
stopWG: sync.WaitGroup{}, | ||
}, | ||
}, nil | ||
} | ||
|
||
// flush exports the incoming batch synchronously. | ||
func (qb *BaseBatcher) flush(batchToFlush batch) { | ||
err := batchToFlush.req.Export(batchToFlush.ctx) | ||
for _, idx := range batchToFlush.idxList { | ||
qb.queue.OnProcessingFinished(idx, err) | ||
} | ||
} | ||
|
||
// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. | ||
func (qb *BaseBatcher) flushAsync(batchToFlush batch) { | ||
qb.stopWG.Add(1) | ||
go func() { | ||
defer qb.stopWG.Done() | ||
qb.flush(batchToFlush) | ||
}() | ||
} | ||
|
||
// Shutdown ensures that queue and all Batcher are stopped. | ||
func (qb *BaseBatcher) Shutdown(_ context.Context) error { | ||
qb.stopWG.Wait() | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
) | ||
|
||
// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will | ||
// be sent out (asynchronously) immediately regardless of the size. | ||
type DisabledBatcher struct { | ||
BaseBatcher | ||
} | ||
|
||
// Start starts the goroutine that reads from the queue and flushes asynchronously. | ||
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { | ||
// This goroutine reads and then flushes. | ||
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. | ||
// 2. flushAsync() blocks until there are idle workers in the worker pool. | ||
qb.stopWG.Add(1) | ||
go func() { | ||
defer qb.stopWG.Done() | ||
for { | ||
idx, _, req, ok := qb.queue.Read(context.Background()) | ||
if !ok { | ||
return | ||
} | ||
qb.flushAsync(batch{ | ||
req: req, | ||
ctx: context.Background(), | ||
idxList: []uint64{idx}}) | ||
} | ||
}() | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = false | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
maxWorkers := 0 | ||
ba, err := NewBatcher(cfg, q, maxWorkers) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) | ||
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) | ||
t.Cleanup(func() { | ||
require.NoError(t, q.Shutdown(context.Background())) | ||
require.NoError(t, ba.Shutdown(context.Background())) | ||
}) | ||
|
||
sink := newFakeRequestSink() | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 | ||
}, 30*time.Millisecond, 10*time.Millisecond) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 | ||
}, 30*time.Millisecond, 10*time.Millisecond) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) | ||
|
||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 | ||
}, 30*time.Millisecond, 10*time.Millisecond) | ||
} | ||
|
||
func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = false | ||
maxWorkers := 1 | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
_, err := NewBatcher(cfg, q, maxWorkers) | ||
require.Error(t, err) | ||
} | ||
|
||
func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = true | ||
maxWorkers := 0 | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
_, err := NewBatcher(cfg, q, maxWorkers) | ||
require.Error(t, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync/atomic" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
type fakeRequestSink struct { | ||
requestsCount *atomic.Int64 | ||
itemsCount *atomic.Int64 | ||
} | ||
|
||
func newFakeRequestSink() *fakeRequestSink { | ||
return &fakeRequestSink{ | ||
requestsCount: new(atomic.Int64), | ||
itemsCount: new(atomic.Int64), | ||
} | ||
} | ||
|
||
type fakeRequest struct { | ||
items int | ||
exportErr error | ||
delay time.Duration | ||
sink *fakeRequestSink | ||
} | ||
|
||
func (r *fakeRequest) Export(ctx context.Context) error { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-time.After(r.delay): | ||
} | ||
if r.exportErr != nil { | ||
return r.exportErr | ||
} | ||
if r.sink != nil { | ||
r.sink.requestsCount.Add(1) | ||
r.sink.itemsCount.Add(int64(r.items)) | ||
} | ||
return nil | ||
} | ||
|
||
func (r *fakeRequest) ItemsCount() int { | ||
return r.items | ||
} | ||
|
||
func (r *fakeRequest) Merge(_ context.Context, | ||
_ internal.Request) (internal.Request, error) { | ||
return nil, errors.New("not implemented") | ||
} | ||
|
||
func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, | ||
_ internal.Request) ([]internal.Request, error) { | ||
return nil, errors.New("not implemented") | ||
} |