Skip to content

Commit

Permalink
Propagate shutdown error from retry sender to fix data loss problem i…
Browse files Browse the repository at this point in the history
…n persistent storage (open-telemetry#6815)

Fix a data loss bug occuring in persistent queue when otel collector restart.
When otel collector restart, the retry_sender will mark a request as finished and cause persistent storage to remove it from currentlyDispatchedItems. This causes data loss.

This change propagates an interrupted error when shutdown is called. The error propagates to the consumer callback function and each consumer can determine action against the error. The persistent storage is updated to check and ensure those requests are not marked as finished.
  • Loading branch information
splunkericl authored Jan 17, 2023
1 parent f3f9020 commit edc08ce
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
16 changes: 16 additions & 0 deletions .chloggen/persistent_queue_dataloss_shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix a dataloss bug in persistent storage when collector shuts down or restarts

# One or more tracking issues or pull requests related to the change
issues: [6771]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (rs *retrySender) send(req internal.Request) error {
case <-req.Context().Done():
return fmt.Errorf("Request is cancelled or timed out %w", err)
case <-rs.stopCh:
return fmt.Errorf("interrupted due to shutdown %w", err)
return rs.onTemporaryFailure(rs.logger, req, fmt.Errorf("interrupted due to shutdown %w", err))
case <-time.After(backoffDelay):
}
}
Expand Down
63 changes: 63 additions & 0 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,51 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
}

func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
produceCounter := atomic.NewUint32(0)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered

req := newMockRequest(context.Background(), 3, errors.New("some error"))

be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
require.NoError(t, err)

require.NoError(t, be.Start(context.Background(), &mockHost{}))

// wraps original queue so we can count operations
be.qrSender.queue = &producerConsumerQueueWithCounter{
ProducerConsumerQueue: be.qrSender.queue,
produceCounter: produceCounter,
}
be.qrSender.requeuingEnabled = true

// replace nextSender inside retrySender to always return error so it doesn't exit send loop
castedSender, ok := be.qrSender.consumerSender.(*retrySender)
require.True(t, ok, "consumerSender should be a retrySender type")
castedSender.nextSender = &errorRequestSender{
errToReturn: errors.New("some error"),
}

// Invoke queuedRetrySender so the producer will put the item for consumer to poll
require.NoError(t, be.sender.send(req))

// first wait for the item to be produced to the queue initially
assert.Eventually(t, func() bool {
return produceCounter.Load() == uint32(1)
}, time.Second, 1*time.Millisecond)

// shuts down and ensure the item is produced in the queue again
require.NoError(t, be.Shutdown(context.Background()))
assert.Eventually(t, func() bool {
return produceCounter.Load() == uint32(2)
}, time.Second, 1*time.Millisecond)
}

type mockErrorRequest struct {
baseRequest
}
Expand Down Expand Up @@ -776,3 +821,21 @@ func (mse *mockStorageExtension) GetClient(_ context.Context, _ component.Kind,
}
return storage.NewNopClient(), nil
}

type producerConsumerQueueWithCounter struct {
internal.ProducerConsumerQueue
produceCounter *atomic.Uint32
}

func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool {
pcq.produceCounter.Add(1)
return pcq.ProducerConsumerQueue.Produce(item)
}

type errorRequestSender struct {
errToReturn error
}

func (rs *errorRequestSender) send(_ internal.Request) error {
return rs.errToReturn
}

0 comments on commit edc08ce

Please sign in to comment.