From c44d32d65a0842a9e3552b7b91a45cc1fc67e8de Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Mon, 12 Jun 2023 00:06:10 -0700 Subject: [PATCH 1/3] [exporterhelper] New exporter helper with custom requests Introduce a new exporter helper that operates over client provided requests instead of pdata. It opens a door for moving batching to the exporter where batches will be built from clients data format, instead of pdata. The batches can be properly sized by custom request size which can be different from OTLP. The same custom request sizing will be applied to the sending queue. It will also improve performance of the sending queue retries for non-OTLP exporters, they don't need to translate pdata on every retry. This is an experimental API, once stabilized it's intended to replace the existing helpers. --- .chloggen/exporter-helper-v2.yaml | 12 ++ exporter/exporterhelper/common.go | 26 ++- exporter/exporterhelper/common_test.go | 16 +- exporter/exporterhelper/constants.go | 8 + .../internal/persistent_queue.go | 14 +- .../internal/persistent_queue_test.go | 10 +- .../internal/persistent_storage.go | 14 +- .../internal/persistent_storage_batch.go | 6 +- .../internal/persistent_storage_test.go | 15 +- exporter/exporterhelper/internal/request.go | 10 +- exporter/exporterhelper/logs.go | 80 +++++++- exporter/exporterhelper/logs_test.go | 169 ++++++++++++++++- exporter/exporterhelper/metrics.go | 80 +++++++- exporter/exporterhelper/metrics_test.go | 177 +++++++++++++++++- exporter/exporterhelper/queued_retry.go | 77 ++++---- exporter/exporterhelper/queued_retry_test.go | 53 ++++-- exporter/exporterhelper/request.go | 39 ++++ exporter/exporterhelper/request_test.go | 44 +++++ exporter/exporterhelper/traces.go | 80 +++++++- exporter/exporterhelper/traces_test.go | 167 ++++++++++++++++- 20 files changed, 985 insertions(+), 112 deletions(-) create mode 100644 .chloggen/exporter-helper-v2.yaml create mode 100644 exporter/exporterhelper/request.go create mode 100644 exporter/exporterhelper/request_test.go diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..600a71f1b0c --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,12 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce a new exporter helper that operates over client-provided requests instead of pdata + +# One or more tracking issues or pull requests related to the change +issues: [7874] + diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 08b3e015e41..be5a08014a5 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,14 +56,25 @@ func (req *baseRequest) OnProcessingFinished() { } } +type queueSettings struct { + config QueueSettings + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler +} + +func (qs *queueSettings) persistenceEnabled() bool { + return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil +} + // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - QueueSettings + queueSettings RetrySettings + RequestSender } // fromOptions returns the internal options starting from the default and applying all configured options. @@ -72,7 +83,9 @@ func fromOptions(options ...Option) *baseSettings { opts := &baseSettings{ TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) - QueueSettings: QueueSettings{Enabled: false}, + queueSettings: queueSettings{ + config: QueueSettings{Enabled: false}, + }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, } @@ -121,9 +134,10 @@ func WithRetry(retrySettings RetrySettings) Option { // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) Option { +// Permanent queue is not yet available for exporter helpers V2, they ignore StorageID field. +func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { - o.QueueSettings = queueSettings + o.queueSettings.config = config } } @@ -145,7 +159,7 @@ type baseExporter struct { qrSender *queuedRetrySender } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { be := &baseExporter{} var err error @@ -154,7 +168,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8f5a5376c39..47447ac8a56 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -14,11 +14,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/ptrace" ) var ( @@ -35,7 +32,7 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -50,7 +47,6 @@ func TestBaseExporterWithOptions(t *testing.T) { WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), "", - nopRequestUnmarshaler(), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) @@ -65,13 +61,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd) } } - -func nopTracePusher() consumer.ConsumeTracesFunc { - return func(ctx context.Context, ld ptrace.Traces) error { - return nil - } -} - -func nopRequestUnmarshaler() internal.RequestUnmarshaler { - return newTraceRequestUnmarshalerFunc(nopTracePusher()) -} diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index bdcbf1a4fd6..a449a309fbc 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -18,4 +18,12 @@ var ( errNilPushMetricsData = errors.New("nil PushMetrics") // errNilPushLogsData is returned when a nil PushLogs is given. errNilPushLogsData = errors.New("nil PushLogs") + // errNilTracesConverter is returned when a nil TracesConverter is given. + errNilTracesConverter = errors.New("nil TracesConverter") + // errNilMetricsConverter is returned when a nil MetricsConverter is given. + errNilMetricsConverter = errors.New("nil MetricsConverter") + // errNilLogsConverter is returned when a nil LogsConverter is given. + errNilLogsConverter = errors.New("nil LogsConverter") + // errNilRequestSender is returned when a nil RequestSender is given. + errNilRequestSender = errors.New("nil RequestSender") ) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index a7956d6f569..63a617daebd 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } +type PersistentQueueSettings struct { + Name string + Signal component.DataType + Capacity uint64 + Logger *zap.Logger + Client storage.Client + Unmarshaler RequestUnmarshaler + Marshaler RequestMarshaler +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { return &persistentQueue{ stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 13505440580..8b1ffb7674e 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ + Name: "foo", + Signal: component.DataTypeTraces, + Capacity: uint64(capacity), + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) return wq.(*persistentQueue) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1d79d63a124..cbbcf2e03c5 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -43,6 +43,7 @@ type persistentContiguousStorage struct { queueName string client storage.Client unmarshaler RequestUnmarshaler + marshaler RequestMarshaler putChan chan struct{} stopChan chan struct{} @@ -80,14 +81,15 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, + logger: set.Logger, + client: set.Client, queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), + unmarshaler: set.Unmarshaler, + marshaler: set.Marshaler, + capacity: set.Capacity, + putChan: make(chan struct{}, set.Capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 85c99cf51e9..a80ba93c5c3 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { - return bof.set(key, value, requestToBytes) + return bof.set(key, value, bof.requestToBytes) } // setItemIndex adds Set operation over a given itemIndex to the batch @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) { return val, err } -func requestToBytes(req any) ([]byte, error) { - return req.(Request).Marshal() +func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { + return bof.pcs.marshaler(req.(Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index b27836fd2b6..5d33fc11064 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ + Capacity: capacity, + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func newFakeTracesRequestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + marshaler := ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + } +} + func TestPersistentStorage_CorruptedData(t *testing.T) { path := t.TempDir() diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 390b35e94bd..3120d7db175 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -19,11 +19,8 @@ type Request interface { // Otherwise, it should return the original Request. OnError(error) Request - // Count returns the count of spans/metric points or log records. - Count() int - - // Marshal serializes the current request into a byte stream - Marshal() ([]byte, error) + // ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP) + ItemsCount() int // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() @@ -34,3 +31,6 @@ type Request interface { // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request type RequestUnmarshaler func([]byte) (Request, error) + +// RequestMarshaler defines a function which takes a request and marshals it into a byte slice +type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b53bfc8addb..293037b4ccd 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } +func logsRequestMarshaler(req internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +} + func (req *logsRequest) OnError(err error) internal.Request { var logError consumererror.Logs if errors.As(err, &logError) { @@ -58,7 +64,7 @@ func (req *logsRequest) Marshal() ([]byte, error) { return logsMarshaler.MarshalLogs(req.ld) } -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -88,7 +94,9 @@ func NewLogsExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher)) + bs.marshaler = logsRequestMarshaler + bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err } @@ -103,7 +111,7 @@ func NewLogsExporter( req := newLogsRequest(ctx, ld, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -114,6 +122,70 @@ func NewLogsExporter( }, err } +type LogsConverter interface { + // RequestFromLogs converts plog.Logs data into a request. + RequestFromLogs(context.Context, plog.Logs) (Request, error) +} + +// NewLogsExporterV2 creates new logs exporter based on custom LogsConverter and RequestSender. +func NewLogsExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter LogsConverter, + sender RequestSender, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilLogsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeLogs) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + req, cErr := converter.RequestFromLogs(ctx, ld) + if cErr != nil { + set.Logger.Error("Failed to convert logs. Dropping data.", + zap.Int("dropped_log_records", ld.LogRecordCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordLogsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &logsExporter{ + baseExporter: be, + Logs: lc, + }, err +} + type logsExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -122,6 +194,6 @@ type logsExporterWithObservability struct { func (lewo *logsExporterWithObservability) send(req internal.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) - lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) + lewo.obsrep.EndLogsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 3aa50b26e02..dc4808f7ab1 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -33,6 +33,7 @@ const ( var ( fakeLogsExporterName = component.NewIDWithName("fake_logs_exporter", "with_name") + fakeLogsExporterV2Name = component.NewIDWithName("fake_logs_exporter_v2", "with_name") fakeLogsExporterConfig = struct{}{} ) @@ -59,12 +60,30 @@ func TestLogsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestLogsExporterV2_NilLogger(t *testing.T) { + le, err := NewLogsExporterV2(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + func TestLogsExporter_NilPushLogsData(t *testing.T) { le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, nil) require.Nil(t, le) require.Equal(t, errNilPushLogsData, err) } +func TestLogsExporterV2_NilLogsConverter(t *testing.T) { + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) + require.Nil(t, le) + require.Equal(t, errNilLogsConverter, err) +} + +func TestLogsExporterV2_NilRequestSender(t *testing.T) { + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) + require.Nil(t, le) + require.Equal(t, errNilRequestSender, err) +} + func TestLogsExporter_Default(t *testing.T) { ld := plog.NewLogs() le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil)) @@ -77,6 +96,18 @@ func TestLogsExporter_Default(t *testing.T) { assert.NoError(t, le.Shutdown(context.Background())) } +func TestLogsExporterV2_Default(t *testing.T) { + ld := plog.NewLogs() + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + func TestLogsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil), WithCapabilities(capabilities)) @@ -86,6 +117,15 @@ func TestLogsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, le.Capabilities()) } +func TestLogsExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + func TestLogsExporter_Default_ReturnError(t *testing.T) { ld := plog.NewLogs() want := errors.New("my_error") @@ -95,7 +135,27 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } -func TestLogsExporter_WithRecordLogs(t *testing.T) { +func TestLogsExporterV2_Default_ConverterError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("convert_error") + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{logsError: want}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsExporterV2_Default_RequestSenderError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("send_error") + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) @@ -107,7 +167,19 @@ func TestLogsExporter_WithRecordLogs(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } -func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { +func TestLogsExporterV2_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, le) + + checkRecordedMetricsForLogsExporter(t, tt, le, nil) +} + +func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -120,6 +192,20 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, want) } +func TestLogsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { + want := errors.New("convert_error") + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(want)) + require.Nil(t, err) + require.NotNil(t, le) + + checkRecordedMetricsForLogsExporter(t, tt, le, want) +} + func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -145,6 +231,32 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterName, int64(15)) } +func TestLogsExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + rCfg := NewDefaultRetrySettings() + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + md := testdata.GenerateLogs(3) + const numBatches = 7 + for i := 0; i < numBatches; i++ { + // errors are checked in the checkExporterEnqueueFailedLogsStats function below. + _ = te.ConsumeLogs(context.Background(), md) + } + + // 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow + checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterV2Name, int64(15)) +} + func TestLogsExporter_WithSpan(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -158,6 +270,19 @@ func TestLogsExporter_WithSpan(t *testing.T) { checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) } +func TestLogsExporterV2_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + le, err := NewLogsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.Nil(t, err) + require.NotNil(t, le) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) +} + func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -172,6 +297,20 @@ func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) } +func TestLogsExporterV2_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + le, err := NewLogsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + require.Nil(t, err) + require.NotNil(t, le) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) +} + func TestLogsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } @@ -184,6 +323,18 @@ func TestLogsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestLogsExporterV2_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Nil(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -195,6 +346,17 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func TestLogsExporterV2_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, le.Shutdown(context.Background()), want) +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(ctx context.Context, td plog.Logs) error { return retError @@ -225,7 +387,8 @@ func generateLogsTraffic(t *testing.T, tracer trace.Tracer, le exporter.Logs, nu } } -func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, wantError error, numLogRecords int64) { +func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, + wantError error, numLogRecords int64) { // nolint: unparam const numRequests = 5 generateLogsTraffic(t, tracer, le, numRequests, wantError) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 1639a45fcab..bb9bc33cee4 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } +func metricsRequestMarshaler(req internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +} + func (req *metricsRequest) OnError(err error) internal.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { @@ -59,7 +65,7 @@ func (req *metricsRequest) Marshal() ([]byte, error) { return metricsMarshaler.MarshalMetrics(req.md) } -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -89,7 +95,9 @@ func NewMetricsExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher)) + bs.marshaler = metricsRequestMarshaler + bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err } @@ -104,7 +112,7 @@ func NewMetricsExporter( req := newMetricsRequest(ctx, md, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +123,70 @@ func NewMetricsExporter( }, err } +type MetricsConverter interface { + // RequestFromMetrics converts pdata.Metrics into a request. + RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) +} + +// NewMetricsExporterV2 creates a new metrics exporter based on a custom TracesConverter and RequestSender. +func NewMetricsExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter MetricsConverter, + sender RequestSender, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilMetricsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + req, cErr := converter.RequestFromMetrics(ctx, md) + if cErr != nil { + set.Logger.Error("Failed to convert metrics. Dropping data.", + zap.Int("dropped_data_points", md.DataPointCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + sErr := be.sender.send(&request{ + Request: req, + baseRequest: baseRequest{ctx: ctx}, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordMetricsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &metricsExporter{ + baseExporter: be, + Metrics: mc, + }, err +} + type metricsSenderWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -123,6 +195,6 @@ type metricsSenderWithObservability struct { func (mewo *metricsSenderWithObservability) send(req internal.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) - mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) + mewo.obsrep.EndMetricsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5ebc2d9ec65..21e3d290d55 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -33,6 +33,7 @@ const ( var ( fakeMetricsExporterName = component.NewIDWithName("fake_metrics_exporter", "with_name") + fakeMetricsExporterV2Name = component.NewIDWithName("fake_metrics_exporter_v2", "with_name") fakeMetricsExporterConfig = struct{}{} ) @@ -47,7 +48,7 @@ func TestMetricsRequest(t *testing.T) { ) } -func TestMetricsExporter_InvalidName(t *testing.T) { +func TestMetricsExporter_NilConfig(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) @@ -59,12 +60,33 @@ func TestMetricsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestMetricsExporterV2_NilLogger(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + func TestMetricsExporter_NilPushMetricsData(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetricsData, err) } +func TestMetricsExporterV2_NilMetricsConverter(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilMetricsConverter, err) +} + +func TestMetricsExporterV2_NilRequestSender(t *testing.T) { + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + nil) + require.Nil(t, me) + require.Equal(t, errNilRequestSender, err) +} + func TestMetricsExporter_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil)) @@ -77,6 +99,19 @@ func TestMetricsExporter_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } +func TestMetricsExporterV2_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + func TestMetricsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -86,6 +121,16 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } +func TestMetricsExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil), WithCapabilities(capabilities)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -95,6 +140,26 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } +func TestMetricsExporterV2_Default_ConverterError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("convert_error") + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + fakeRequestConverter{metricsError: want}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsExporterV2_Default_RequestSenderError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("send_error") + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -107,6 +172,19 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, nil) } +func TestMetricsExporterV2_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, me) + + checkRecordedMetricsForMetricsExporter(t, tt, me, nil) +} + func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) @@ -120,6 +198,20 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, want) } +func TestMetricsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { + want := errors.New("my_error") + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + + checkRecordedMetricsForMetricsExporter(t, tt, me, want) +} + func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -145,6 +237,32 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterName, int64(10)) } +func TestMetricsExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + rCfg := NewDefaultRetrySettings() + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewMetricsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + md := testdata.GenerateMetrics(1) + const numBatches = 7 + for i := 0; i < numBatches; i++ { + // errors are checked in the checkExporterEnqueueFailedMetricsStats function below. + _ = te.ConsumeMetrics(context.Background(), md) + } + + // 2 batched must be in queue, and 10 metric points rejected due to queue overflow + checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterV2Name, int64(10)) +} + func TestMetricsExporter_WithSpan(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -158,6 +276,19 @@ func TestMetricsExporter_WithSpan(t *testing.T) { checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) } +func TestMetricsExporterV2_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + me, err := NewMetricsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, me) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) +} + func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -172,6 +303,20 @@ func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) } +func TestMetricsExporterV2_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + me, err := NewMetricsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) +} + func TestMetricsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } @@ -185,6 +330,20 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestMetricsExporterV2_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +356,19 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } +func TestMetricsExporterV2_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) +} + func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(ctx context.Context, td pmetric.Metrics) error { return retError @@ -228,7 +400,8 @@ func generateMetricsTraffic(t *testing.T, tracer trace.Tracer, me exporter.Metri } } -func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, me exporter.Metrics, wantError error, numMetricPoints int64) { +func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + me exporter.Metrics, wantError error, numMetricPoints int64) { // nolint: unparam const numRequests = 5 generateMetricsTraffic(t, tracer, me, numRequests, wantError) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2cf5f627d46..e21d7a580a2 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -70,33 +70,32 @@ func (qCfg *QueueSettings) Validate() error { } type queuedRetrySender struct { - fullName string - id component.ID - signal component.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler + fullName string + id component.ID + signal component.DataType + queueSettings queueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, + nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, + fullName: id.String(), + id: id, + signal: signal, + queueSettings: qs, + retryStopCh: retryStopCh, + traceAttribute: traceAttr, + logger: sampledLogger, } qrs.consumerSender = &retrySender{ @@ -109,8 +108,8 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue onTemporaryFailure: qrs.onTemporaryFailure, } - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize) + if !qs.persistenceEnabled() { + qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -143,16 +142,24 @@ func toStorageClient(ctx context.Context, storageID component.ID, host component // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID == nil { + if !qrs.queueSettings.persistenceEnabled() { return nil } - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ + Name: qrs.fullName, + Signal: qrs.signal, + Capacity: uint64(qrs.queueSettings.config.QueueSize), + Logger: qrs.logger, + Client: storageClient, + Marshaler: qrs.queueSettings.marshaler, + Unmarshaler: qrs.queueSettings.unmarshaler, + }) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -165,7 +172,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } @@ -179,7 +186,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. Queue did not accept requeuing request. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -191,13 +198,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return err } - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item internal.Request) { + qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { _ = qrs.consumerSender.send(item) item.OnProcessingFinished() }) // Start reporting queue length metric - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { err := globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(qrs.queue.Size()) }, metricdata.NewLabelValue(qrs.fullName)) @@ -205,7 +212,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(qrs.queueSettings.config.QueueSize) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -218,7 +225,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -287,12 +294,12 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.cfg.Enabled { + if !qrs.queueSettings.config.Enabled { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -306,7 +313,7 @@ func (qrs *queuedRetrySender) send(req internal.Request) error { if !qrs.queue.Produce(req) { qrs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttribute)) return errSendingQueueIsFull @@ -391,7 +398,7 @@ func (rs *retrySender) send(req internal.Request) error { rs.logger.Error( "Exporting failed. The error is not retryable. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index d3e8f4b8c64..97c4253a5a6 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -36,11 +36,18 @@ func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { } } +func mockRequestMarshaler(_ internal.Request) ([]byte, error) { + return nil, nil +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR)) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(mockR) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -64,7 +71,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -90,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -117,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -151,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -181,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -228,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -261,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -288,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -309,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -344,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -478,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -510,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -535,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -559,7 +569,10 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) + be, err := newBaseExporter(set, bs, "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -583,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -633,7 +646,7 @@ func (mer *mockErrorRequest) Marshal() ([]byte, error) { return nil, nil } -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } @@ -684,7 +697,7 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } @@ -716,9 +729,9 @@ func newObservabilityConsumerSender(nextSender requestSender) *observabilityCons func (ocs *observabilityConsumerSender) send(req internal.Request) error { err := ocs.nextSender.send(req) if err != nil { - ocs.droppedItemsCount.Add(int64(req.Count())) + ocs.droppedItemsCount.Add(int64(req.ItemsCount())) } else { - ocs.sentItemsCount.Add(int64(req.Count())) + ocs.sentItemsCount.Add(int64(req.ItemsCount())) } ocs.waitGroup.Done() return err diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go new file mode 100644 index 00000000000..187d58f559b --- /dev/null +++ b/exporter/exporterhelper/request.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// Request represents a single request that can be sent to the endpoint. +type Request interface { + // ItemsCount returns the count basic item in the request, the smallest peaces of data that can be sent to the endpoint. + // For example, for OTLP exporter, this value represents the number of spans, metric data points or log records. + ItemsCount() int +} + +// RequestSender is a helper function that sends a request. +type RequestSender func(ctx context.Context, req Request) error + +type request struct { + Request + baseRequest + sender RequestSender +} + +var _ internal.Request = (*request)(nil) + +func (req *request) Export(ctx context.Context) error { + return req.sender(ctx, req.Request) +} + +func (req *request) OnError(_ error) internal.Request { + // Potentially we could introduce a new RequestError type that would represent partially succeeded request. + // In that case we should consider returning them back to the pipeline converted back to pdata in case if + // sending queue is disabled. We leave it as a future improvement if decided that it's needed. + return req +} diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go new file mode 100644 index 00000000000..5963f7e0a6c --- /dev/null +++ b/exporter/exporterhelper/request_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type fakeRequest struct { + items int +} + +func (r fakeRequest) ItemsCount() int { + return r.items +} + +type fakeRequestConverter struct { + metricsError error + tracesError error + logsError error +} + +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { + return fakeRequest{items: md.DataPointCount()}, c.metricsError +} + +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { + return fakeRequest{items: td.SpanCount()}, c.tracesError +} + +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { + return fakeRequest{items: ld.LogRecordCount()}, c.logsError +} + +func newFakeRequestSender(err error) RequestSender { + return func(_ context.Context, _ Request) error { + return err + } +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 978ece2201c..02559202132 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } +func tracesRequestMarshaler(req internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) +} + // Marshal provides serialization capabilities required by persistent queue func (req *tracesRequest) Marshal() ([]byte, error) { return tracesMarshaler.MarshalTraces(req.td) @@ -59,7 +65,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -89,7 +95,9 @@ func NewTracesExporter( } bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher)) + bs.marshaler = tracesRequestMarshaler + bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err } @@ -104,7 +112,7 @@ func NewTracesExporter( req := newTracesRequest(ctx, td, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +123,70 @@ func NewTracesExporter( }, err } +type TracesConverter interface { + // RequestFromTraces converts ptrace.Traces into a Request. + RequestFromTraces(context.Context, ptrace.Traces) (Request, error) +} + +// NewTracesExporterV2 creates a new traces exporter based on a custom TracesConverter and RequestSender. +func NewTracesExporterV2( + _ context.Context, + set exporter.CreateSettings, + converter TracesConverter, + sender RequestSender, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilTracesConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := fromOptions(options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeTraces) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + req, cErr := converter.RequestFromTraces(ctx, td) + if cErr != nil { + set.Logger.Error("Failed to convert traces. Dropping data.", + zap.Int("dropped_spans", td.SpanCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordTracesEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &traceExporter{ + baseExporter: be, + Traces: tc, + }, err +} + type tracesExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -124,6 +196,6 @@ func (tewo *tracesExporterWithObservability) send(req internal.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req) - tewo.obsrep.EndTracesOp(req.Context(), req.Count(), err) + tewo.obsrep.EndTracesOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 3b850ce6be2..347063f83d9 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -33,6 +33,7 @@ const ( var ( fakeTracesExporterName = component.NewIDWithName("fake_traces_exporter", "with_name") + fakeTracesExporterV2Name = component.NewIDWithName("fake_traces_exporter_v2", "with_name") fakeTracesExporterConfig = struct{}{} ) @@ -55,12 +56,31 @@ func TestTracesExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestTracesExporterV2_NilLogger(t *testing.T) { + te, err := NewTracesExporterV2(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.Nil(t, te) + require.Equal(t, errNilLogger, err) +} + func TestTracesExporter_NilPushTraceData(t *testing.T) { te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, nil) require.Nil(t, te) require.Equal(t, errNilPushTraceData, err) } +func TestTracesExporterV2_NilTracesConverter(t *testing.T) { + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) + require.Nil(t, te) + require.Equal(t, errNilTracesConverter, err) +} + +func TestTracesExporterV2_NilRequestSender(t *testing.T) { + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) + require.Nil(t, te) + require.Equal(t, errNilRequestSender, err) +} + func TestTracesExporter_Default(t *testing.T) { td := ptrace.NewTraces() te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil)) @@ -73,6 +93,18 @@ func TestTracesExporter_Default(t *testing.T) { assert.NoError(t, te.Shutdown(context.Background())) } +func TestTracesExporterV2_Default(t *testing.T) { + td := ptrace.NewTraces() + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + func TestTracesExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil), WithCapabilities(capabilities)) @@ -82,6 +114,15 @@ func TestTracesExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, te.Capabilities()) } +func TestTracesExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) +} + func TestTracesExporter_Default_ReturnError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("my_error") @@ -93,6 +134,25 @@ func TestTracesExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } +func TestTracesExporterV2_Default_ConverterError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("convert_error") + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{tracesError: want}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesExporterV2_Default_RequestSenderError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("send_error") + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) +} + func TestTracesExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -105,6 +165,18 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, nil) } +func TestTracesExporterV2_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, te) + + checkRecordedMetricsForTracesExporter(t, tt, te, nil) +} + func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) @@ -118,6 +190,19 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, want) } +func TestTracesExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { + want := errors.New("my_error") + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, te) + + checkRecordedMetricsForTracesExporter(t, tt, te, want) +} + func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -143,6 +228,31 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { checkExporterEnqueueFailedTracesStats(t, globalInstruments, fakeTracesExporterName, int64(10)) } +func TestTracesExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + rCfg := NewDefaultRetrySettings() + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewTracesExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + td := testdata.GenerateTraces(2) + const numBatches = 7 + for i := 0; i < numBatches; i++ { + // errors are checked in the checkExporterEnqueueFailedTracesStats function below. + _ = te.ConsumeTraces(context.Background(), td) + } + + // 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow + checkExporterEnqueueFailedTracesStats(t, globalInstruments, fakeTracesExporterV2Name, int64(10)) +} + func TestTracesExporter_WithSpan(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -157,6 +267,20 @@ func TestTracesExporter_WithSpan(t *testing.T) { checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1) } +func TestTracesExporterV2_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + te, err := NewTracesExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, te) + + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1) +} + func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -172,6 +296,21 @@ func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) } +func TestTracesExporterV2_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + te, err := NewTracesExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, te) + + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) +} + func TestTracesExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } @@ -185,6 +324,19 @@ func TestTracesExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestTracesExporterV2_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +349,18 @@ func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, te.Shutdown(context.Background()), want) } +func TestTracesExporterV2_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, te.Shutdown(context.Background()), want) +} + func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(ctx context.Context, td ptrace.Traces) error { return retError @@ -228,7 +392,8 @@ func generateTraceTraffic(t *testing.T, tracer trace.Tracer, te exporter.Traces, } } -func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, te exporter.Traces, wantError error, numSpans int64) { +func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + te exporter.Traces, wantError error, numSpans int64) { // nolint: unparam const numRequests = 5 generateTraceTraffic(t, tracer, te, numRequests, wantError) From 2ff5402cd00d06a9b6607c098c5f39eaca7b83da Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Sat, 29 Jul 2023 15:32:55 -0700 Subject: [PATCH 2/3] address review comments --- exporter/exporterhelper/common.go | 17 ++++--- exporter/exporterhelper/common_test.go | 9 +++- exporter/exporterhelper/logs.go | 8 +-- exporter/exporterhelper/logs_test.go | 52 +++++--------------- exporter/exporterhelper/metrics.go | 8 +-- exporter/exporterhelper/metrics_test.go | 52 +++++--------------- exporter/exporterhelper/queued_retry_test.go | 40 +++++++++------ exporter/exporterhelper/request.go | 2 +- exporter/exporterhelper/traces.go | 8 +-- exporter/exporterhelper/traces_test.go | 51 +++++-------------- 10 files changed, 94 insertions(+), 153 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index be5a08014a5..769b28b7f19 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -75,12 +75,14 @@ type baseSettings struct { queueSettings RetrySettings RequestSender + requestExporter bool } -// fromOptions returns the internal options starting from the default and applying all configured options. -func fromOptions(options ...Option) *baseSettings { - // Start from the default options: - opts := &baseSettings{ +// fromOptions returns the baseSettings starting from the default and applying all configured options. +// requestExporter indicates whether the base settings are for a new request exporter or not. +func newBaseSettings(requestExporter bool, options ...Option) *baseSettings { + bs := &baseSettings{ + requestExporter: requestExporter, TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) queueSettings: queueSettings{ @@ -91,10 +93,10 @@ func fromOptions(options ...Option) *baseSettings { } for _, op := range options { - op(opts) + op(bs) } - return opts + return bs } // Option apply changes to baseSettings. @@ -137,6 +139,9 @@ func WithRetry(retrySettings RetrySettings) Option { // Permanent queue is not yet available for exporter helpers V2, they ignore StorageID field. func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { + if o.requestExporter { + panic("queueing is not supported for request exporters yet") + } o.queueSettings.config = config } } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 47447ac8a56..9c5bcbe2513 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -32,7 +32,11 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "") + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) + be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -42,7 +46,8 @@ func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( defaultSettings, - fromOptions( + newBaseSettings( + false, WithStart(func(ctx context.Context, host component.Host) error { return want }), WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 293037b4ccd..62a5971cd79 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -93,7 +93,7 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := fromOptions(options...) + bs := newBaseSettings(false, options...) bs.marshaler = logsRequestMarshaler bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) be, err := newBaseExporter(set, bs, component.DataTypeLogs) @@ -127,8 +127,8 @@ type LogsConverter interface { RequestFromLogs(context.Context, plog.Logs) (Request, error) } -// NewLogsExporterV2 creates new logs exporter based on custom LogsConverter and RequestSender. -func NewLogsExporterV2( +// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. +func NewLogsRequestExporter( _ context.Context, set exporter.CreateSettings, converter LogsConverter, @@ -147,7 +147,7 @@ func NewLogsExporterV2( return nil, errNilRequestSender } - bs := fromOptions(options...) + bs := newBaseSettings(true, options...) bs.RequestSender = sender be, err := newBaseExporter(set, bs, component.DataTypeLogs) diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index dc4808f7ab1..2977ceeccaf 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -61,7 +61,7 @@ func TestLogsExporter_NilLogger(t *testing.T) { } func TestLogsExporterV2_NilLogger(t *testing.T) { - le, err := NewLogsExporterV2(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil)) + le, err := NewLogsRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil)) require.Nil(t, le) require.Equal(t, errNilLogger, err) } @@ -73,13 +73,13 @@ func TestLogsExporter_NilPushLogsData(t *testing.T) { } func TestLogsExporterV2_NilLogsConverter(t *testing.T) { - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) require.Nil(t, le) require.Equal(t, errNilLogsConverter, err) } func TestLogsExporterV2_NilRequestSender(t *testing.T) { - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) require.Nil(t, le) require.Equal(t, errNilRequestSender, err) } @@ -98,7 +98,7 @@ func TestLogsExporter_Default(t *testing.T) { func TestLogsExporterV2_Default(t *testing.T) { ld := plog.NewLogs() - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) assert.NotNil(t, le) assert.NoError(t, err) @@ -119,7 +119,7 @@ func TestLogsExporter_WithCapabilities(t *testing.T) { func TestLogsExporterV2_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) require.NoError(t, err) require.NotNil(t, le) @@ -138,7 +138,7 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) { func TestLogsExporterV2_Default_ConverterError(t *testing.T) { ld := plog.NewLogs() want := errors.New("convert_error") - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{logsError: want}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, le) @@ -148,7 +148,7 @@ func TestLogsExporterV2_Default_ConverterError(t *testing.T) { func TestLogsExporterV2_Default_RequestSenderError(t *testing.T) { ld := plog.NewLogs() want := errors.New("send_error") - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, le) @@ -172,7 +172,7 @@ func TestLogsExporterV2_WithRecordMetrics(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - le, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, le) @@ -198,7 +198,7 @@ func TestLogsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - le, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), + le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) require.Nil(t, err) require.NotNil(t, le) @@ -231,32 +231,6 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterName, int64(15)) } -func TestLogsExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - rCfg := NewDefaultRetrySettings() - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 2 - wantErr := errors.New("some-error") - te, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, - newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NotNil(t, te) - - md := testdata.GenerateLogs(3) - const numBatches = 7 - for i := 0; i < numBatches; i++ { - // errors are checked in the checkExporterEnqueueFailedLogsStats function below. - _ = te.ConsumeLogs(context.Background(), md) - } - - // 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow - checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterV2Name, int64(15)) -} - func TestLogsExporter_WithSpan(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -277,7 +251,7 @@ func TestLogsExporterV2_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - le, err := NewLogsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) require.Nil(t, err) require.NotNil(t, le) checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) @@ -305,7 +279,7 @@ func TestLogsExporterV2_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) want := errors.New("my_error") - le, err := NewLogsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) require.Nil(t, err) require.NotNil(t, le) checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) @@ -327,7 +301,7 @@ func TestLogsExporterV2_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) assert.NotNil(t, le) assert.NoError(t, err) @@ -350,7 +324,7 @@ func TestLogsExporterV2_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } - le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) assert.NotNil(t, le) assert.NoError(t, err) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index bb9bc33cee4..5a8d655de9c 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -94,7 +94,7 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := fromOptions(options...) + bs := newBaseSettings(false, options...) bs.marshaler = metricsRequestMarshaler bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) be, err := newBaseExporter(set, bs, component.DataTypeMetrics) @@ -128,8 +128,8 @@ type MetricsConverter interface { RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) } -// NewMetricsExporterV2 creates a new metrics exporter based on a custom TracesConverter and RequestSender. -func NewMetricsExporterV2( +// NewMetricsRequestExporter creates a new metrics exporter based on a custom TracesConverter and RequestSender. +func NewMetricsRequestExporter( _ context.Context, set exporter.CreateSettings, converter MetricsConverter, @@ -148,7 +148,7 @@ func NewMetricsExporterV2( return nil, errNilRequestSender } - bs := fromOptions(options...) + bs := newBaseSettings(true, options...) bs.RequestSender = sender be, err := newBaseExporter(set, bs, component.DataTypeMetrics) diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 21e3d290d55..25383ad8a2f 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -61,7 +61,7 @@ func TestMetricsExporter_NilLogger(t *testing.T) { } func TestMetricsExporterV2_NilLogger(t *testing.T) { - me, err := NewMetricsExporterV2(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}, + me, err := NewMetricsRequestExporter(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}, newFakeRequestSender(nil)) require.Nil(t, me) require.Equal(t, errNilLogger, err) @@ -74,14 +74,14 @@ func TestMetricsExporter_NilPushMetricsData(t *testing.T) { } func TestMetricsExporterV2_NilMetricsConverter(t *testing.T) { - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) require.Nil(t, me) require.Equal(t, errNilMetricsConverter, err) } func TestMetricsExporterV2_NilRequestSender(t *testing.T) { - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, nil) require.Nil(t, me) require.Equal(t, errNilRequestSender, err) @@ -101,7 +101,7 @@ func TestMetricsExporter_Default(t *testing.T) { func TestMetricsExporterV2_Default(t *testing.T) { md := pmetric.NewMetrics() - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, newFakeRequestSender(nil)) assert.NoError(t, err) assert.NotNil(t, me) @@ -123,7 +123,7 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { func TestMetricsExporterV2_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) assert.NoError(t, err) assert.NotNil(t, me) @@ -143,7 +143,7 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { func TestMetricsExporterV2_Default_ConverterError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("convert_error") - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{metricsError: want}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, me) @@ -153,7 +153,7 @@ func TestMetricsExporterV2_Default_ConverterError(t *testing.T) { func TestMetricsExporterV2_Default_RequestSenderError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("send_error") - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, me) @@ -177,7 +177,7 @@ func TestMetricsExporterV2_WithRecordMetrics(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, me) @@ -204,7 +204,7 @@ func TestMetricsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, me) @@ -237,32 +237,6 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterName, int64(10)) } -func TestMetricsExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterV2Name) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - rCfg := NewDefaultRetrySettings() - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 2 - wantErr := errors.New("some-error") - te, err := NewMetricsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, - newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NotNil(t, te) - - md := testdata.GenerateMetrics(1) - const numBatches = 7 - for i := 0; i < numBatches; i++ { - // errors are checked in the checkExporterEnqueueFailedMetricsStats function below. - _ = te.ConsumeMetrics(context.Background(), md) - } - - // 2 batched must be in queue, and 10 metric points rejected due to queue overflow - checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterV2Name, int64(10)) -} - func TestMetricsExporter_WithSpan(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -283,7 +257,7 @@ func TestMetricsExporterV2_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - me, err := NewMetricsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + me, err := NewMetricsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) @@ -311,7 +285,7 @@ func TestMetricsExporterV2_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) want := errors.New("my_error") - me, err := NewMetricsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + me, err := NewMetricsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) @@ -334,7 +308,7 @@ func TestMetricsExporterV2_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) assert.NotNil(t, me) assert.NoError(t, err) @@ -360,7 +334,7 @@ func TestMetricsExporterV2_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } - me, err := NewMetricsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) assert.NotNil(t, me) assert.NoError(t, err) diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 97c4253a5a6..3d440f3f955 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -44,7 +44,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) bs.marshaler = mockRequestMarshaler bs.unmarshaler = mockRequestUnmarshaler(mockR) be, err := newBaseExporter(defaultSettings, bs, "") @@ -71,7 +71,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) bs.marshaler = mockRequestMarshaler bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) be, err := newBaseExporter(defaultSettings, bs, "") @@ -100,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -127,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -161,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -191,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -238,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -271,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -298,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -319,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -354,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -488,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -520,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -545,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -569,7 +569,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - bs := fromOptions(WithRetry(rCfg), WithQueue(qCfg)) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) bs.marshaler = mockRequestMarshaler bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) be, err := newBaseExporter(set, bs, "") @@ -596,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -630,6 +630,14 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { + bs := newBaseSettings(true, WithRetry(NewDefaultRetrySettings())) + assert.True(t, bs.requestExporter) + assert.Panics(t, func() { + _ = newBaseSettings(true, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + }) +} + type mockErrorRequest struct { baseRequest } diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index 187d58f559b..d18f0f9b275 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -11,7 +11,7 @@ import ( // Request represents a single request that can be sent to the endpoint. type Request interface { - // ItemsCount returns the count basic item in the request, the smallest peaces of data that can be sent to the endpoint. + // ItemsCount returns the count basic item in the request, the smallest pieces of data that can be sent to the endpoint. // For example, for OTLP exporter, this value represents the number of spans, metric data points or log records. ItemsCount() int } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 02559202132..524c5e4b928 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -94,7 +94,7 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := fromOptions(options...) + bs := newBaseSettings(false, options...) bs.marshaler = tracesRequestMarshaler bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) be, err := newBaseExporter(set, bs, component.DataTypeTraces) @@ -128,8 +128,8 @@ type TracesConverter interface { RequestFromTraces(context.Context, ptrace.Traces) (Request, error) } -// NewTracesExporterV2 creates a new traces exporter based on a custom TracesConverter and RequestSender. -func NewTracesExporterV2( +// NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. +func NewTracesRequestExporter( _ context.Context, set exporter.CreateSettings, converter TracesConverter, @@ -148,7 +148,7 @@ func NewTracesExporterV2( return nil, errNilRequestSender } - bs := fromOptions(options...) + bs := newBaseSettings(true, options...) bs.RequestSender = sender be, err := newBaseExporter(set, bs, component.DataTypeTraces) diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 347063f83d9..3736fac2f29 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -57,7 +57,7 @@ func TestTracesExporter_NilLogger(t *testing.T) { } func TestTracesExporterV2_NilLogger(t *testing.T) { - te, err := NewTracesExporterV2(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, + te, err := NewTracesRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil)) require.Nil(t, te) require.Equal(t, errNilLogger, err) @@ -70,13 +70,13 @@ func TestTracesExporter_NilPushTraceData(t *testing.T) { } func TestTracesExporterV2_NilTracesConverter(t *testing.T) { - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) require.Nil(t, te) require.Equal(t, errNilTracesConverter, err) } func TestTracesExporterV2_NilRequestSender(t *testing.T) { - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) require.Nil(t, te) require.Equal(t, errNilRequestSender, err) } @@ -95,7 +95,7 @@ func TestTracesExporter_Default(t *testing.T) { func TestTracesExporterV2_Default(t *testing.T) { td := ptrace.NewTraces() - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) assert.NotNil(t, te) assert.NoError(t, err) @@ -116,7 +116,7 @@ func TestTracesExporter_WithCapabilities(t *testing.T) { func TestTracesExporterV2_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) assert.NotNil(t, te) assert.NoError(t, err) @@ -137,7 +137,7 @@ func TestTracesExporter_Default_ReturnError(t *testing.T) { func TestTracesExporterV2_Default_ConverterError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("convert_error") - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{tracesError: want}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, te) @@ -147,7 +147,7 @@ func TestTracesExporterV2_Default_ConverterError(t *testing.T) { func TestTracesExporterV2_Default_RequestSenderError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("send_error") - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, te) require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) @@ -170,7 +170,7 @@ func TestTracesExporterV2_WithRecordMetrics(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, te) @@ -196,7 +196,7 @@ func TestTracesExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, te) @@ -228,31 +228,6 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { checkExporterEnqueueFailedTracesStats(t, globalInstruments, fakeTracesExporterName, int64(10)) } -func TestTracesExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterV2Name) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - rCfg := NewDefaultRetrySettings() - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 2 - wantErr := errors.New("some-error") - te, err := NewTracesExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NotNil(t, te) - - td := testdata.GenerateTraces(2) - const numBatches = 7 - for i := 0; i < numBatches; i++ { - // errors are checked in the checkExporterEnqueueFailedTracesStats function below. - _ = te.ConsumeTraces(context.Background(), td) - } - - // 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow - checkExporterEnqueueFailedTracesStats(t, globalInstruments, fakeTracesExporterV2Name, int64(10)) -} - func TestTracesExporter_WithSpan(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -274,7 +249,7 @@ func TestTracesExporterV2_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) - te, err := NewTracesExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) require.NoError(t, err) require.NotNil(t, te) @@ -304,7 +279,7 @@ func TestTracesExporterV2_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) want := errors.New("my_error") - te, err := NewTracesExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) require.NoError(t, err) require.NotNil(t, te) @@ -328,7 +303,7 @@ func TestTracesExporterV2_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) assert.NotNil(t, te) assert.NoError(t, err) @@ -353,7 +328,7 @@ func TestTracesExporterV2_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } - te, err := NewTracesExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) assert.NotNil(t, te) assert.NoError(t, err) From 9b9cc80d9cd11eed2dd9479f0af3a180444907e0 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Sat, 29 Jul 2023 15:54:05 -0700 Subject: [PATCH 3/3] remove redundant methods --- exporter/exporterhelper/logs.go | 4 ---- exporter/exporterhelper/metrics.go | 5 ----- exporter/exporterhelper/traces.go | 5 ----- 3 files changed, 14 deletions(-) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 62a5971cd79..383083b2131 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -60,10 +60,6 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Marshal() ([]byte, error) { - return logsMarshaler.MarshalLogs(req.ld) -} - func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 5a8d655de9c..47190799195 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -60,11 +60,6 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -// Marshal provides serialization capabilities required by persistent queue -func (req *metricsRequest) Marshal() ([]byte, error) { - return metricsMarshaler.MarshalMetrics(req.md) -} - func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 524c5e4b928..3c6dd112ecf 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -48,11 +48,6 @@ func tracesRequestMarshaler(req internal.Request) ([]byte, error) { return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } -// Marshal provides serialization capabilities required by persistent queue -func (req *tracesRequest) Marshal() ([]byte, error) { - return tracesMarshaler.MarshalTraces(req.td) -} - func (req *tracesRequest) OnError(err error) internal.Request { var traceError consumererror.Traces if errors.As(err, &traceError) {