diff --git a/exporter/internal/queue/fake_request.go b/exporter/internal/queue/fake_request_test.go similarity index 100% rename from exporter/internal/queue/fake_request.go rename to exporter/internal/queue/fake_request_test.go diff --git a/exporter/internal/queue/mock_storage.go b/exporter/internal/queue/mock_storage.go index 26e2ae994e0..88d7c4438e0 100644 --- a/exporter/internal/queue/mock_storage.go +++ b/exporter/internal/queue/mock_storage.go @@ -6,10 +6,8 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" "errors" - "fmt" "sync" "sync/atomic" - "syscall" "time" "go.opentelemetry.io/collector/component" @@ -97,161 +95,3 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e func (m *mockStorageClient) isClosed() bool { return m.closed.Load() } - -func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient { - return &fakeBoundedStorageClient{ - st: map[string][]byte{}, - MaxSizeInBytes: maxSizeInBytes, - } -} - -// this storage client mimics the behavior of actual storage engines with limited storage space available -// in general, real storage engines often have a per-write-transaction storage overhead, needing to keep -// both the old and the new value stored until the transaction is committed -// this is useful for testing the persistent queue queue behavior with a full disk -type fakeBoundedStorageClient struct { - MaxSizeInBytes int - st map[string][]byte - sizeInBytes int - mux sync.Mutex -} - -func (m *fakeBoundedStorageClient) Get(ctx context.Context, key string) ([]byte, error) { - op := storage.GetOperation(key) - if err := m.Batch(ctx, op); err != nil { - return nil, err - } - - return op.Value, nil -} - -func (m *fakeBoundedStorageClient) Set(ctx context.Context, key string, value []byte) error { - return m.Batch(ctx, storage.SetOperation(key, value)) -} - -func (m *fakeBoundedStorageClient) Delete(ctx context.Context, key string) error { - return m.Batch(ctx, storage.DeleteOperation(key)) -} - -func (m *fakeBoundedStorageClient) Close(context.Context) error { - return nil -} - -func (m *fakeBoundedStorageClient) Batch(_ context.Context, ops ...storage.Operation) error { - m.mux.Lock() - defer m.mux.Unlock() - - totalAdded, totalRemoved := m.getTotalSizeChange(ops) - - // the assumption here is that the new data needs to coexist with the old data on disk - // for the transaction to succeed - // this seems to be true for the file storage extension at least - if m.sizeInBytes+totalAdded > m.MaxSizeInBytes { - return fmt.Errorf("insufficient space available: %w", syscall.ENOSPC) - } - - for _, op := range ops { - switch op.Type { - case storage.Get: - op.Value = m.st[op.Key] - case storage.Set: - m.st[op.Key] = op.Value - case storage.Delete: - delete(m.st, op.Key) - default: - return errors.New("wrong operation type") - } - } - - m.sizeInBytes += totalAdded - totalRemoved - - return nil -} - -func (m *fakeBoundedStorageClient) SetMaxSizeInBytes(newMaxSize int) { - m.mux.Lock() - defer m.mux.Unlock() - m.MaxSizeInBytes = newMaxSize -} - -func (m *fakeBoundedStorageClient) GetSizeInBytes() int { - m.mux.Lock() - defer m.mux.Unlock() - return m.sizeInBytes -} - -func (m *fakeBoundedStorageClient) getTotalSizeChange(ops []storage.Operation) (totalAdded int, totalRemoved int) { - totalAdded, totalRemoved = 0, 0 - for _, op := range ops { - switch op.Type { - case storage.Set: - if oldValue, ok := m.st[op.Key]; ok { - totalRemoved += len(oldValue) - } else { - totalAdded += len(op.Key) - } - totalAdded += len(op.Value) - case storage.Delete: - if value, ok := m.st[op.Key]; ok { - totalRemoved += len(op.Key) - totalRemoved += len(value) - } - default: - } - } - return totalAdded, totalRemoved -} - -func newFakeStorageClientWithErrors(errors []error) *fakeStorageClientWithErrors { - return &fakeStorageClientWithErrors{ - errors: errors, - } -} - -// this storage client just returns errors from a list in order -// used for testing error handling -type fakeStorageClientWithErrors struct { - errors []error - nextErrorIndex int - mux sync.Mutex -} - -func (m *fakeStorageClientWithErrors) Get(ctx context.Context, key string) ([]byte, error) { - op := storage.GetOperation(key) - err := m.Batch(ctx, op) - if err != nil { - return nil, err - } - - return op.Value, nil -} - -func (m *fakeStorageClientWithErrors) Set(ctx context.Context, key string, value []byte) error { - return m.Batch(ctx, storage.SetOperation(key, value)) -} - -func (m *fakeStorageClientWithErrors) Delete(ctx context.Context, key string) error { - return m.Batch(ctx, storage.DeleteOperation(key)) -} - -func (m *fakeStorageClientWithErrors) Close(context.Context) error { - return nil -} - -func (m *fakeStorageClientWithErrors) Batch(context.Context, ...storage.Operation) error { - m.mux.Lock() - defer m.mux.Unlock() - - if m.nextErrorIndex >= len(m.errors) { - return nil - } - - m.nextErrorIndex++ - return m.errors[m.nextErrorIndex-1] -} - -func (m *fakeStorageClientWithErrors) Reset() { - m.mux.Lock() - defer m.mux.Unlock() - m.nextErrorIndex = 0 -} diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index c6b19e0f2ad..fbabd0b83a4 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -66,6 +66,164 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { return nh.ext } +func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient { + return &fakeBoundedStorageClient{ + st: map[string][]byte{}, + MaxSizeInBytes: maxSizeInBytes, + } +} + +// this storage client mimics the behavior of actual storage engines with limited storage space available +// in general, real storage engines often have a per-write-transaction storage overhead, needing to keep +// both the old and the new value stored until the transaction is committed +// this is useful for testing the persistent queue queue behavior with a full disk +type fakeBoundedStorageClient struct { + MaxSizeInBytes int + st map[string][]byte + sizeInBytes int + mux sync.Mutex +} + +func (m *fakeBoundedStorageClient) Get(ctx context.Context, key string) ([]byte, error) { + op := storage.GetOperation(key) + if err := m.Batch(ctx, op); err != nil { + return nil, err + } + + return op.Value, nil +} + +func (m *fakeBoundedStorageClient) Set(ctx context.Context, key string, value []byte) error { + return m.Batch(ctx, storage.SetOperation(key, value)) +} + +func (m *fakeBoundedStorageClient) Delete(ctx context.Context, key string) error { + return m.Batch(ctx, storage.DeleteOperation(key)) +} + +func (m *fakeBoundedStorageClient) Close(context.Context) error { + return nil +} + +func (m *fakeBoundedStorageClient) Batch(_ context.Context, ops ...storage.Operation) error { + m.mux.Lock() + defer m.mux.Unlock() + + totalAdded, totalRemoved := m.getTotalSizeChange(ops) + + // the assumption here is that the new data needs to coexist with the old data on disk + // for the transaction to succeed + // this seems to be true for the file storage extension at least + if m.sizeInBytes+totalAdded > m.MaxSizeInBytes { + return fmt.Errorf("insufficient space available: %w", syscall.ENOSPC) + } + + for _, op := range ops { + switch op.Type { + case storage.Get: + op.Value = m.st[op.Key] + case storage.Set: + m.st[op.Key] = op.Value + case storage.Delete: + delete(m.st, op.Key) + default: + return errors.New("wrong operation type") + } + } + + m.sizeInBytes += totalAdded - totalRemoved + + return nil +} + +func (m *fakeBoundedStorageClient) SetMaxSizeInBytes(newMaxSize int) { + m.mux.Lock() + defer m.mux.Unlock() + m.MaxSizeInBytes = newMaxSize +} + +func (m *fakeBoundedStorageClient) GetSizeInBytes() int { + m.mux.Lock() + defer m.mux.Unlock() + return m.sizeInBytes +} + +func (m *fakeBoundedStorageClient) getTotalSizeChange(ops []storage.Operation) (totalAdded int, totalRemoved int) { + totalAdded, totalRemoved = 0, 0 + for _, op := range ops { + switch op.Type { + case storage.Set: + if oldValue, ok := m.st[op.Key]; ok { + totalRemoved += len(oldValue) + } else { + totalAdded += len(op.Key) + } + totalAdded += len(op.Value) + case storage.Delete: + if value, ok := m.st[op.Key]; ok { + totalRemoved += len(op.Key) + totalRemoved += len(value) + } + default: + } + } + return totalAdded, totalRemoved +} + +func newFakeStorageClientWithErrors(errors []error) *fakeStorageClientWithErrors { + return &fakeStorageClientWithErrors{ + errors: errors, + } +} + +// this storage client just returns errors from a list in order +// used for testing error handling +type fakeStorageClientWithErrors struct { + errors []error + nextErrorIndex int + mux sync.Mutex +} + +func (m *fakeStorageClientWithErrors) Get(ctx context.Context, key string) ([]byte, error) { + op := storage.GetOperation(key) + err := m.Batch(ctx, op) + if err != nil { + return nil, err + } + + return op.Value, nil +} + +func (m *fakeStorageClientWithErrors) Set(ctx context.Context, key string, value []byte) error { + return m.Batch(ctx, storage.SetOperation(key, value)) +} + +func (m *fakeStorageClientWithErrors) Delete(ctx context.Context, key string) error { + return m.Batch(ctx, storage.DeleteOperation(key)) +} + +func (m *fakeStorageClientWithErrors) Close(context.Context) error { + return nil +} + +func (m *fakeStorageClientWithErrors) Batch(context.Context, ...storage.Operation) error { + m.mux.Lock() + defer m.mux.Unlock() + + if m.nextErrorIndex >= len(m.errors) { + return nil + } + + m.nextErrorIndex++ + return m.errors[m.nextErrorIndex-1] +} + +func (m *fakeStorageClientWithErrors) Reset() { + m.mux.Lock() + defer m.mux.Unlock() + m.nextErrorIndex = 0 +} + // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int64, numConsumers int, consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] {