diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 9736ec59c2b..c4afc64155d 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -40,13 +40,14 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) { type errorLoggingRequestSender struct { baseRequestSender - logger *zap.Logger + logger *zap.Logger + message string } func (l *errorLoggingRequestSender) send(req internal.Request) error { err := l.baseRequestSender.send(req) if err != nil { - l.logger.Error("Exporting failed", zap.Error(err)) + l.logger.Error(l.message, zap.Int("dropped_items", req.Count()), zap.Error(err)) } return err } @@ -106,9 +107,16 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option { // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. -func WithRetry(retrySettings RetrySettings) Option { +func WithRetry(config RetrySettings) Option { return func(o *baseExporter) { - o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure) + if !config.Enabled { + o.retrySender = &errorLoggingRequestSender{ + logger: o.set.Logger, + message: "Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors", + } + return + } + o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure) } } @@ -120,15 +128,14 @@ func WithQueue(config QueueSettings) Option { if o.requestExporter { panic("queueing is not available for the new request exporters yet") } - var queue internal.Queue - if config.Enabled { - if config.StorageID == nil { - queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) - } else { - queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler, o.set) + if !config.Enabled { + o.queueSender = &errorLoggingRequestSender{ + logger: o.set.Logger, + message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", } + return } - qs := newQueueSender(o.set, o.signal, queue) + qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) o.queueSender = qs o.setOnTemporaryFailure(qs.onTemporaryFailure) } @@ -187,7 +194,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), - retrySender: &errorLoggingRequestSender{logger: set.Logger}, + retrySender: &baseRequestSender{}, timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, set: set, diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 10166c32974..095497532a0 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -86,7 +86,9 @@ func TestBaseExporterLogging(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.DebugLevel) set.Logger = zap.New(logger) - bs, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender) + rCfg := NewDefaultRetrySettings() + rCfg.Enabled = false + bs, err := newBaseExporter(set, "", true, nil, nil, newNoopObsrepSender, WithRetry(rCfg)) require.Nil(t, err) require.True(t, bs.requestExporter) sendErr := bs.send(newErrorRequest(context.Background())) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 13ff4aff09e..4c153b4fb17 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -86,7 +86,14 @@ type queueSender struct { metricSize otelmetric.Int64ObservableGauge } -func newQueueSender(set exporter.CreateSettings, signal component.DataType, queue internal.Queue) *queueSender { +func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, + marshaler internal.RequestMarshaler, unmarshaler internal.RequestUnmarshaler) *queueSender { + var queue internal.Queue + if config.StorageID == nil { + queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + } else { + queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, marshaler, unmarshaler, set) + } return &queueSender{ fullName: set.ID.String(), signal: signal, @@ -95,12 +102,12 @@ func newQueueSender(set exporter.CreateSettings, signal component.DataType, queu logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: queue != nil && queue.IsPersistent(), + requeuingEnabled: queue.IsPersistent(), } } func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { - if !qs.requeuingEnabled || qs.queue == nil { + if !qs.requeuingEnabled { logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), @@ -126,10 +133,6 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - if qs.queue == nil { - return nil - } - err := qs.queue.Start(ctx, host, internal.QueueSettings{ DataType: qs.signal, Callback: func(item internal.Request) { @@ -196,9 +199,6 @@ func (qs *queueSender) recordWithOC() error { // Shutdown is invoked during service shutdown. func (qs *queueSender) Shutdown(ctx context.Context) error { - if qs.queue == nil { - return nil - } // Cleanup queue metrics reporting _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) @@ -211,17 +211,6 @@ func (qs *queueSender) Shutdown(ctx context.Context) error { // send implements the requestSender interface func (qs *queueSender) send(req internal.Request) error { - if qs.queue == nil { - err := qs.nextSender.send(req) - if err != nil { - qs.logger.Error( - "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", req.Count()), - ) - } - return err - } - // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. req.SetContext(noCancellationContext{Context: req.Context()}) diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index c0d46db2dfc..9b3ce452c5e 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -283,13 +283,12 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) - require.Nil(t, be.queueSender.(*queueSender).queue) + require.IsType(t, &errorLoggingRequestSender{}, be.queueSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) mockR := newMockRequest(context.Background(), 2, errors.New("some error")) ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. require.Error(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 72c95f94db2..43babe5e2f5 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -14,8 +14,8 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -85,17 +85,17 @@ type retrySender struct { onTemporaryFailure onRequestHandlingFinishedFunc } -func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { +func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { if onTemporaryFailure == nil { onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { return err } } return &retrySender{ - traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), - cfg: rCfg, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + cfg: config, stopCh: make(chan struct{}), - logger: logger, + logger: set.Logger, onTemporaryFailure: onTemporaryFailure, } } @@ -107,17 +107,6 @@ func (rs *retrySender) Shutdown(context.Context) error { // send implements the requestSender interface func (rs *retrySender) send(req internal.Request) error { - if !rs.cfg.Enabled { - err := rs.nextSender.send(req) - if err != nil { - rs.logger.Error( - "Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors", - zap.Error(err), - ) - } - return err - } - // Do not use NewExponentialBackOff since it calls Reset and the code here must // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). expBackoff := backoff.ExponentialBackOff{ @@ -186,7 +175,7 @@ func (rs *retrySender) send(req internal.Request) error { // back-off, but get interrupted when shutting down or request is cancelled or timed out. select { case <-req.Context().Done(): - return fmt.Errorf("Request is cancelled or timed out %w", err) + return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: return rs.onTemporaryFailure(rs.logger, req, fmt.Errorf("interrupted due to shutdown %w", err)) case <-time.After(backoffDelay): diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 72a21bc1311..4bc2ca63dd7 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -227,7 +227,25 @@ func TestQueueRetryWithNoQueue(t *testing.T) { ocs := be.obsrepSender.(*observabilityConsumerSender) mockR := newMockRequest(context.Background(), 2, errors.New("some error")) ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. + require.Error(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestQueueRetryWithDisabledRetires(t *testing.T) { + rCfg := NewDefaultRetrySettings() + rCfg.Enabled = false + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) + require.IsType(t, &errorLoggingRequestSender{}, be.retrySender) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs.run(func() { require.Error(t, be.send(mockR)) }) ocs.awaitAsyncProcessing()