Skip to content

Commit

Permalink
work on TestNoDataLoss
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Feb 15, 2024
1 parent 5fa8d69 commit f5cb3e3
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
destAvailable atomic.Bool
acceptedLogCount atomic.Int64
deliveredLogCount atomic.Int64
Logs consumertest.LogsSink
}

var _ consumer.Logs = (*MockExporter)(nil)
Expand All @@ -24,44 +25,48 @@ func (e *MockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))
func (e *MockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.acceptedLogCount.Add(int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
if e.destAvailable.Load() {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
e.deliveredLogCount.Add(int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
return e.Logs.ConsumeLogs(ctx, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount))
e.deliveredLogCount.Add(int64(e.Logs.LogRecordCount()))

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil
e.Logs.Reset()

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

e.destAvailable.Store(true)
} else {
atomic.StoreInt64(&e.destAvailable, 0)
e.destAvailable.Store(false)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
return int(e.acceptedLogCount.Load())
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
return int(e.deliveredLogCount.Load())
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
return &MockExporter{
destAvailable: atomic.Bool{},
acceptedLogCount: atomic.Int64{},
deliveredLogCount: atomic.Int64{},
Logs: consumertest.LogsSink{},
}
}

0 comments on commit f5cb3e3

Please sign in to comment.