Skip to content

Commit

Permalink
Flip on queue batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Nov 22, 2024
1 parent f74890a commit 415dcb8
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 16 deletions.
28 changes: 28 additions & 0 deletions .chloggen/11637-exporter-queue-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterqueue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change exporter queue batching to use a pulling model.

# One or more tracking issues or pull requests related to the change
issues: [8122, 10368]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
If both queuing and batching is enabled for exporter, we now use a pulling model instead of a
pushing model. num_consumer in queue configuration is now used to specify the maximum number of
concurrent workers that are sending out the request.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 4 additions & 3 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister(
"telemetry.UsePullingBasedExporterQueueBatcher",
featuregate.StageBeta,
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.114.0"),
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)
Expand Down Expand Up @@ -102,13 +102,14 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if be.BatcherCfg.Enabled {
if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
1 change: 0 additions & 1 deletion exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func TestBatchSender_BatchExportError(t *testing.T) {
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == tt.expectedRequests &&
sink.itemsCount.Load() == tt.expectedItems &&
be.BatchSender.(*BatchSender).activeRequests.Load() == 0 &&
be.QueueSender.(*QueueSender).queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)
})
Expand Down
52 changes: 41 additions & 11 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/queue"
Expand Down Expand Up @@ -71,30 +72,49 @@ type QueueSender struct {
queue exporterqueue.Queue[internal.Request]
numConsumers int
traceAttribute attribute.KeyValue
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]

obsrep *ObsReport
exporterID component.ID
}

func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int,
exportFailureMessage string, obsrep *ObsReport) *QueueSender {
func NewQueueSender(
q exporterqueue.Queue[internal.Request],
set exporter.Settings,
numConsumers int,
exportFailureMessage string,
obsrep *ObsReport,
batcherCfg exporterbatcher.Config) *QueueSender {
qs := &QueueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))

if usePullingBasedExporterQueueBatcher.IsEnabled() {
exportFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
return err
qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers)
} else {
consumeFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)
return qs
}

Expand All @@ -103,8 +123,15 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.queue.Start(ctx, host); err != nil {
return err
}
if err := qs.consumers.Start(ctx, host); err != nil {
return err

if usePullingBasedExporterQueueBatcher.IsEnabled() {
if err := qs.batcher.Start(ctx, host); err != nil {
return err
}

Check warning on line 130 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L129-L130

Added lines #L129 - L130 were not covered by tests
} else {
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}

Check warning on line 134 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L133-L134

Added lines #L133 - L134 were not covered by tests
}

dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String())
Expand All @@ -123,6 +150,9 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
if err := qs.queue.Shutdown(ctx); err != nil {
return err
}
if usePullingBasedExporterQueueBatcher.IsEnabled() {
return qs.batcher.Shutdown(ctx)
}
return qs.consumers.Shutdown(ctx)
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
Expand Down Expand Up @@ -540,7 +541,7 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
ExporterCreateSettings: exportertest.NewNopSettings(),
})
require.NoError(t, err)
qs := NewQueueSender(queue, set, 1, "", obsrep)
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig())
assert.NoError(t, qs.Shutdown(context.Background()))
})
}
Expand Down
5 changes: 5 additions & 0 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
}

Check warning on line 138 in exporter/internal/queue/default_batcher.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/default_batcher.go#L137-L138

Added lines #L137 - L138 were not covered by tests

qb.startWorkerPool()
qb.shutdownCh = make(chan bool, 1)

Expand Down
5 changes: 5 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type DisabledBatcher struct {

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
// maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics.
if qb.maxWorkers == -1 {
return nil
}

qb.startWorkerPool()

// This goroutine reads and then flushes.
Expand Down

0 comments on commit 415dcb8

Please sign in to comment.