Skip to content

Commit

Permalink
Avoid nil queue in queue_sender, consolidate implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Nov 13, 2023
1 parent 23500dd commit 05d783c
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 54 deletions.
31 changes: 19 additions & 12 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {

type errorLoggingRequestSender struct {
baseRequestSender
logger *zap.Logger
logger *zap.Logger
message string
}

func (l *errorLoggingRequestSender) send(req internal.Request) error {
err := l.baseRequestSender.send(req)
if err != nil {
l.logger.Error("Exporting failed", zap.Error(err))
l.logger.Error(l.message, zap.Int("dropped_items", req.Count()), zap.Error(err))
}
return err
}
Expand Down Expand Up @@ -106,9 +107,16 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option {

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) Option {
func WithRetry(config RetrySettings) Option {
return func(o *baseExporter) {
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure)
if !config.Enabled {
o.retrySender = &errorLoggingRequestSender{
logger: o.set.Logger,
message: "Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors",
}
return
}
o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure)
}
}

Expand All @@ -120,15 +128,14 @@ func WithQueue(config QueueSettings) Option {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
var queue internal.Queue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler, o.set)
if !config.Enabled {
o.queueSender = &errorLoggingRequestSender{
logger: o.set.Logger,
message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.",
}
return
}
qs := newQueueSender(o.set, o.signal, queue)
qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
}
Expand Down Expand Up @@ -187,7 +194,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req

queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
retrySender: &errorLoggingRequestSender{logger: set.Logger},
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func TestBaseExporterLogging(t *testing.T) {
set := exportertest.NewNopCreateSettings()
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
bs, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender)
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
bs, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender, WithRetry(rCfg))
require.Nil(t, err)
require.True(t, bs.requestExporter)
sendErr := bs.send(newErrorRequest(context.Background()))
Expand Down
31 changes: 10 additions & 21 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ type queueSender struct {
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(set exporter.CreateSettings, signal component.DataType, queue internal.Queue) *queueSender {
func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler internal.RequestMarshaler, unmarshaler internal.RequestUnmarshaler) *queueSender {
var queue internal.Queue
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, marshaler, unmarshaler, set)
}
return &queueSender{
fullName: set.ID.String(),
signal: signal,
Expand All @@ -95,12 +102,12 @@ func newQueueSender(set exporter.CreateSettings, signal component.DataType, queu
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: queue != nil && queue.IsPersistent(),
requeuingEnabled: queue.IsPersistent(),
}
}

func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error {
if !qs.requeuingEnabled || qs.queue == nil {
if !qs.requeuingEnabled {
logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
Expand All @@ -126,10 +133,6 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
if qs.queue == nil {
return nil
}

err := qs.queue.Start(ctx, host, internal.QueueSettings{
DataType: qs.signal,
Callback: func(item internal.Request) {
Expand Down Expand Up @@ -196,9 +199,6 @@ func (qs *queueSender) recordWithOC() error {

// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
if qs.queue == nil {
return nil
}
// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
Expand All @@ -211,17 +211,6 @@ func (qs *queueSender) Shutdown(ctx context.Context) error {

// send implements the requestSender interface
func (qs *queueSender) send(req internal.Request) error {
if qs.queue == nil {
err := qs.nextSender.send(req)
if err != nil {
qs.logger.Error(
"Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.",
zap.Int("dropped_items", req.Count()),
)
}
return err
}

// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
req.SetContext(noCancellationContext{Context: req.Context()})
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,12 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
qs := NewDefaultQueueSettings()
qs.Enabled = false
be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs))
require.Nil(t, be.queueSender.(*queueSender).queue)
require.IsType(t, &errorLoggingRequestSender{}, be.queueSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.obsrepSender.(*observabilityConsumerSender)
mockR := newMockRequest(context.Background(), 2, errors.New("some error"))
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.Error(t, be.send(mockR))
})
ocs.awaitAsyncProcessing()
Expand Down
23 changes: 6 additions & 17 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
Expand Down Expand Up @@ -85,17 +85,17 @@ type retrySender struct {
onTemporaryFailure onRequestHandlingFinishedFunc
}

func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender {
func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender {
if onTemporaryFailure == nil {
onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error {
return err
}
}
return &retrySender{
traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()),
cfg: rCfg,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: logger,
logger: set.Logger,
onTemporaryFailure: onTemporaryFailure,
}
}
Expand All @@ -107,17 +107,6 @@ func (rs *retrySender) Shutdown(context.Context) error {

// send implements the requestSender interface
func (rs *retrySender) send(req internal.Request) error {
if !rs.cfg.Enabled {
err := rs.nextSender.send(req)
if err != nil {
rs.logger.Error(
"Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors",
zap.Error(err),
)
}
return err
}

// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := backoff.ExponentialBackOff{
Expand Down Expand Up @@ -186,7 +175,7 @@ func (rs *retrySender) send(req internal.Request) error {
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-req.Context().Done():
return fmt.Errorf("Request is cancelled or timed out %w", err)
return fmt.Errorf("request is cancelled or timed out %w", err)

Check warning on line 178 in exporter/exporterhelper/retry_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/retry_sender.go#L178

Added line #L178 was not covered by tests
case <-rs.stopCh:
return rs.onTemporaryFailure(rs.logger, req, fmt.Errorf("interrupted due to shutdown %w", err))
case <-time.After(backoffDelay):
Expand Down
20 changes: 19 additions & 1 deletion exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,25 @@ func TestQueueRetryWithNoQueue(t *testing.T) {
ocs := be.obsrepSender.(*observabilityConsumerSender)
mockR := newMockRequest(context.Background(), 2, errors.New("some error"))
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.Error(t, be.send(mockR))
})
ocs.awaitAsyncProcessing()
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
require.NoError(t, be.Shutdown(context.Background()))
}

func TestQueueRetryWithDisabledRetires(t *testing.T) {
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg))
require.IsType(t, &errorLoggingRequestSender{}, be.retrySender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.obsrepSender.(*observabilityConsumerSender)
mockR := newMockRequest(context.Background(), 2, errors.New("some error"))
ocs.run(func() {
require.Error(t, be.send(mockR))
})
ocs.awaitAsyncProcessing()
Expand Down

0 comments on commit 05d783c

Please sign in to comment.