Skip to content

Commit

Permalink
[chore] Fix TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued (#…
Browse files Browse the repository at this point in the history
…8986)

Fix flaky TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued by
moving it to persistent queue.
- It makes the test easy to validate given that the size of the
persistent queue is always available even if it's closed.
- It brings behavior closer to the name of the test
- It removes the flakiness associated with data race specific to
re-enqueuing to the bounded memory queue by shutdown which should be
resolved separately once the re-enqueue option is available for the
memory queue

Fixes
#8124
  • Loading branch information
dmitryax authored Nov 23, 2023
1 parent 7e3e725 commit 3b56bd0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 26 deletions.
28 changes: 14 additions & 14 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package exporterhelper
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -365,38 +364,39 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
}

func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
produceCounter := &atomic.Uint32{}

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown

rCfg := NewDefaultRetrySettings()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered

be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
mockReq := newErrorRequest()
be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockReq),
newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

// wraps original queue so we can count operations
be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{
Queue: be.queueSender.(*queueSender).queue,
produceCounter: produceCounter,
var extensions = map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
}
be.queueSender.(*queueSender).requeuingEnabled = true
host := &mockHost{ext: extensions}

require.NoError(t, be.Start(context.Background(), &mockHost{}))
require.NoError(t, be.Start(context.Background(), host))

// Invoke queuedRetrySender so the producer will put the item for consumer to poll
require.NoError(t, be.send(context.Background(), newErrorRequest()))
require.NoError(t, be.send(context.Background(), mockReq))

// first wait for the item to be produced to the queue initially
// first wait for the item to be consumed from the queue
assert.Eventually(t, func() bool {
return produceCounter.Load() == uint32(1)
return be.queueSender.(*queueSender).queue.Size() == 0
}, time.Second, 1*time.Millisecond)

// shuts down and ensure the item is produced in the queue again
require.NoError(t, be.Shutdown(context.Background()))
assert.Eventually(t, func() bool {
return produceCounter.Load() == uint32(2)
return be.queueSender.(*queueSender).queue.Size() == 1
}, time.Second, 1*time.Millisecond)
}

Expand Down
13 changes: 1 addition & 12 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/testdata"
)

func mockRequestUnmarshaler(mr *mockRequest) RequestUnmarshaler {
func mockRequestUnmarshaler(mr Request) RequestUnmarshaler {
return func(bytes []byte) (Request, error) {
return mr, nil
}
Expand Down Expand Up @@ -405,13 +404,3 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met
}
return true
}

type producerConsumerQueueWithCounter struct {
internal.Queue[Request]
produceCounter *atomic.Uint32
}

func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item Request) error {
pcq.produceCounter.Add(1)
return pcq.Queue.Offer(ctx, item)
}

0 comments on commit 3b56bd0

Please sign in to comment.