Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] rework memorylimiter test to avoid flaky tests #9733

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
destAvailable atomic.Bool
acceptedLogCount atomic.Int64
deliveredLogCount atomic.Int64
Logs consumertest.LogsSink
}

var _ consumer.Logs = (*MockExporter)(nil)
Expand All @@ -24,44 +25,48 @@ func (e *MockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))
func (e *MockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.acceptedLogCount.Add(int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
if e.destAvailable.Load() {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
e.deliveredLogCount.Add(int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
return e.Logs.ConsumeLogs(ctx, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount))
e.deliveredLogCount.Add(int64(e.Logs.LogRecordCount()))

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil
e.Logs.Reset()

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

e.destAvailable.Store(true)
} else {
atomic.StoreInt64(&e.destAvailable, 0)
e.destAvailable.Store(false)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
return int(e.acceptedLogCount.Load())
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
return int(e.deliveredLogCount.Load())
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
return &MockExporter{
destAvailable: atomic.Bool{},
acceptedLogCount: atomic.Int64{},
deliveredLogCount: atomic.Int64{},
Logs: consumertest.LogsSink{},
}
}
6 changes: 4 additions & 2 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ func TestNoDataLoss(t *testing.T) {

// And eventually the exporter must confirm that it delivered exact number of produced logs.
require.Eventually(t, func() bool {
return receiver.ProduceCount == exporter.DeliveredLogCount()
}, 5*time.Second, 1*time.Millisecond)
d := exporter.DeliveredLogCount()
t.Logf("received: %d, expected: %d\n", d, receiver.ProduceCount)
return receiver.ProduceCount == d
}, 5*time.Second, 100*time.Millisecond)

// Double check that the number of logs accepted by exporter matches the number of produced by receiver.
assert.Equal(t, receiver.ProduceCount, exporter.AcceptedLogCount())
Expand Down
Loading