diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..600a71f1b0c --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,12 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce a new exporter helper that operates over client-provided requests instead of pdata + +# One or more tracking issues or pull requests related to the change +issues: [7874] + diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 08b3e015e41..769b28b7f19 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,32 +56,47 @@ func (req *baseRequest) OnProcessingFinished() { } } +type queueSettings struct { + config QueueSettings + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler +} + +func (qs *queueSettings) persistenceEnabled() bool { + return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil +} + // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - QueueSettings + queueSettings RetrySettings + RequestSender + requestExporter bool } -// fromOptions returns the internal options starting from the default and applying all configured options. -func fromOptions(options ...Option) *baseSettings { - // Start from the default options: - opts := &baseSettings{ +// fromOptions returns the baseSettings starting from the default and applying all configured options. +// requestExporter indicates whether the base settings are for a new request exporter or not. +func newBaseSettings(requestExporter bool, options ...Option) *baseSettings { + bs := &baseSettings{ + requestExporter: requestExporter, TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) - QueueSettings: QueueSettings{Enabled: false}, + queueSettings: queueSettings{ + config: QueueSettings{Enabled: false}, + }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, } for _, op := range options { - op(opts) + op(bs) } - return opts + return bs } // Option apply changes to baseSettings. @@ -121,9 +136,13 @@ func WithRetry(retrySettings RetrySettings) Option { // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) Option { +// Permanent queue is not yet available for exporter helpers V2, they ignore StorageID field. +func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { - o.QueueSettings = queueSettings + if o.requestExporter { + panic("queueing is not supported for request exporters yet") + } + o.queueSettings.config = config } } @@ -145,7 +164,7 @@ type baseExporter struct { qrSender *queuedRetrySender } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { be := &baseExporter{} var err error @@ -154,7 +173,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8f5a5376c39..9c5bcbe2513 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -14,11 +14,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/ptrace" ) var ( @@ -35,7 +32,11 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "") + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) + be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -45,12 +46,12 @@ func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( defaultSettings, - fromOptions( + newBaseSettings( + false, WithStart(func(ctx context.Context, host component.Host) error { return want }), WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), "", - nopRequestUnmarshaler(), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) @@ -65,13 +66,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd) } } - -func nopTracePusher() consumer.ConsumeTracesFunc { - return func(ctx context.Context, ld ptrace.Traces) error { - return nil - } -} - -func nopRequestUnmarshaler() internal.RequestUnmarshaler { - return newTraceRequestUnmarshalerFunc(nopTracePusher()) -} diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index bdcbf1a4fd6..a449a309fbc 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -18,4 +18,12 @@ var ( errNilPushMetricsData = errors.New("nil PushMetrics") // errNilPushLogsData is returned when a nil PushLogs is given. errNilPushLogsData = errors.New("nil PushLogs") + // errNilTracesConverter is returned when a nil TracesConverter is given. + errNilTracesConverter = errors.New("nil TracesConverter") + // errNilMetricsConverter is returned when a nil MetricsConverter is given. + errNilMetricsConverter = errors.New("nil MetricsConverter") + // errNilLogsConverter is returned when a nil LogsConverter is given. + errNilLogsConverter = errors.New("nil LogsConverter") + // errNilRequestSender is returned when a nil RequestSender is given. + errNilRequestSender = errors.New("nil RequestSender") ) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index a7956d6f569..63a617daebd 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } +type PersistentQueueSettings struct { + Name string + Signal component.DataType + Capacity uint64 + Logger *zap.Logger + Client storage.Client + Unmarshaler RequestUnmarshaler + Marshaler RequestMarshaler +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { return &persistentQueue{ stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 13505440580..8b1ffb7674e 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ + Name: "foo", + Signal: component.DataTypeTraces, + Capacity: uint64(capacity), + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) return wq.(*persistentQueue) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1d79d63a124..cbbcf2e03c5 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -43,6 +43,7 @@ type persistentContiguousStorage struct { queueName string client storage.Client unmarshaler RequestUnmarshaler + marshaler RequestMarshaler putChan chan struct{} stopChan chan struct{} @@ -80,14 +81,15 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, + logger: set.Logger, + client: set.Client, queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), + unmarshaler: set.Unmarshaler, + marshaler: set.Marshaler, + capacity: set.Capacity, + putChan: make(chan struct{}, set.Capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 85c99cf51e9..a80ba93c5c3 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { - return bof.set(key, value, requestToBytes) + return bof.set(key, value, bof.requestToBytes) } // setItemIndex adds Set operation over a given itemIndex to the batch @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) { return val, err } -func requestToBytes(req any) ([]byte, error) { - return req.(Request).Marshal() +func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { + return bof.pcs.marshaler(req.(Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index b27836fd2b6..5d33fc11064 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ + Capacity: capacity, + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func newFakeTracesRequestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + marshaler := ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + } +} + func TestPersistentStorage_CorruptedData(t *testing.T) { path := t.TempDir() diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 390b35e94bd..3120d7db175 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -19,11 +19,8 @@ type Request interface { // Otherwise, it should return the original Request. OnError(error) Request - // Count returns the count of spans/metric points or log records. - Count() int - - // Marshal serializes the current request into a byte stream - Marshal() ([]byte, error) + // ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP) + ItemsCount() int // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() @@ -34,3 +31,6 @@ type Request interface { // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request type RequestUnmarshaler func([]byte) (Request, error) + +// RequestMarshaler defines a function which takes a request and marshals it into a byte slice +type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b53bfc8addb..383083b2131 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } +func logsRequestMarshaler(req internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +} + func (req *logsRequest) OnError(err error) internal.Request { var logError consumererror.Logs if errors.As(err, &logError) { @@ -54,11 +60,7 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Marshal() ([]byte, error) { - return logsMarshaler.MarshalLogs(req.ld) -} - -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -87,8 +89,10 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = logsRequestMarshaler + bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err } @@ -103,7 +107,7 @@ func NewLogsExporter( req := newLogsRequest(ctx, ld, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -114,6 +118,70 @@ func NewLogsExporter( }, err } +type LogsConverter interface { + // RequestFromLogs converts plog.Logs data into a request. + RequestFromLogs(context.Context, plog.Logs) (Request, error) +} + +// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. +func NewLogsRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter LogsConverter, + sender RequestSender, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilLogsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := newBaseSettings(true, options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeLogs) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + req, cErr := converter.RequestFromLogs(ctx, ld) + if cErr != nil { + set.Logger.Error("Failed to convert logs. Dropping data.", + zap.Int("dropped_log_records", ld.LogRecordCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordLogsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &logsExporter{ + baseExporter: be, + Logs: lc, + }, err +} + type logsExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -122,6 +190,6 @@ type logsExporterWithObservability struct { func (lewo *logsExporterWithObservability) send(req internal.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) - lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) + lewo.obsrep.EndLogsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 3aa50b26e02..2977ceeccaf 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -33,6 +33,7 @@ const ( var ( fakeLogsExporterName = component.NewIDWithName("fake_logs_exporter", "with_name") + fakeLogsExporterV2Name = component.NewIDWithName("fake_logs_exporter_v2", "with_name") fakeLogsExporterConfig = struct{}{} ) @@ -59,12 +60,30 @@ func TestLogsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestLogsExporterV2_NilLogger(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + func TestLogsExporter_NilPushLogsData(t *testing.T) { le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, nil) require.Nil(t, le) require.Equal(t, errNilPushLogsData, err) } +func TestLogsExporterV2_NilLogsConverter(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) + require.Nil(t, le) + require.Equal(t, errNilLogsConverter, err) +} + +func TestLogsExporterV2_NilRequestSender(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) + require.Nil(t, le) + require.Equal(t, errNilRequestSender, err) +} + func TestLogsExporter_Default(t *testing.T) { ld := plog.NewLogs() le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil)) @@ -77,6 +96,18 @@ func TestLogsExporter_Default(t *testing.T) { assert.NoError(t, le.Shutdown(context.Background())) } +func TestLogsExporterV2_Default(t *testing.T) { + ld := plog.NewLogs() + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + func TestLogsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil), WithCapabilities(capabilities)) @@ -86,6 +117,15 @@ func TestLogsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, le.Capabilities()) } +func TestLogsExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + func TestLogsExporter_Default_ReturnError(t *testing.T) { ld := plog.NewLogs() want := errors.New("my_error") @@ -95,7 +135,27 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } -func TestLogsExporter_WithRecordLogs(t *testing.T) { +func TestLogsExporterV2_Default_ConverterError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("convert_error") + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{logsError: want}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsExporterV2_Default_RequestSenderError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("send_error") + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) @@ -107,7 +167,19 @@ func TestLogsExporter_WithRecordLogs(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } -func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { +func TestLogsExporterV2_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, le) + + checkRecordedMetricsForLogsExporter(t, tt, le, nil) +} + +func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -120,6 +192,20 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, want) } +func TestLogsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { + want := errors.New("convert_error") + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(want)) + require.Nil(t, err) + require.NotNil(t, le) + + checkRecordedMetricsForLogsExporter(t, tt, le, want) +} + func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -158,6 +244,19 @@ func TestLogsExporter_WithSpan(t *testing.T) { checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) } +func TestLogsExporterV2_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.Nil(t, err) + require.NotNil(t, le) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) +} + func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -172,6 +271,20 @@ func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) } +func TestLogsExporterV2_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + require.Nil(t, err) + require.NotNil(t, le) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) +} + func TestLogsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } @@ -184,6 +297,18 @@ func TestLogsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestLogsExporterV2_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Nil(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -195,6 +320,17 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func TestLogsExporterV2_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, le.Shutdown(context.Background()), want) +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(ctx context.Context, td plog.Logs) error { return retError @@ -225,7 +361,8 @@ func generateLogsTraffic(t *testing.T, tracer trace.Tracer, le exporter.Logs, nu } } -func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, wantError error, numLogRecords int64) { +func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, + wantError error, numLogRecords int64) { // nolint: unparam const numRequests = 5 generateLogsTraffic(t, tracer, le, numRequests, wantError) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 1639a45fcab..47190799195 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } +func metricsRequestMarshaler(req internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +} + func (req *metricsRequest) OnError(err error) internal.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { @@ -54,12 +60,7 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -// Marshal provides serialization capabilities required by persistent queue -func (req *metricsRequest) Marshal() ([]byte, error) { - return metricsMarshaler.MarshalMetrics(req.md) -} - -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -88,8 +89,10 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = metricsRequestMarshaler + bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err } @@ -104,7 +107,7 @@ func NewMetricsExporter( req := newMetricsRequest(ctx, md, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +118,70 @@ func NewMetricsExporter( }, err } +type MetricsConverter interface { + // RequestFromMetrics converts pdata.Metrics into a request. + RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) +} + +// NewMetricsRequestExporter creates a new metrics exporter based on a custom TracesConverter and RequestSender. +func NewMetricsRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter MetricsConverter, + sender RequestSender, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilMetricsConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := newBaseSettings(true, options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + req, cErr := converter.RequestFromMetrics(ctx, md) + if cErr != nil { + set.Logger.Error("Failed to convert metrics. Dropping data.", + zap.Int("dropped_data_points", md.DataPointCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + sErr := be.sender.send(&request{ + Request: req, + baseRequest: baseRequest{ctx: ctx}, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordMetricsEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &metricsExporter{ + baseExporter: be, + Metrics: mc, + }, err +} + type metricsSenderWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -123,6 +190,6 @@ type metricsSenderWithObservability struct { func (mewo *metricsSenderWithObservability) send(req internal.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) - mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) + mewo.obsrep.EndMetricsOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5ebc2d9ec65..25383ad8a2f 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -33,6 +33,7 @@ const ( var ( fakeMetricsExporterName = component.NewIDWithName("fake_metrics_exporter", "with_name") + fakeMetricsExporterV2Name = component.NewIDWithName("fake_metrics_exporter_v2", "with_name") fakeMetricsExporterConfig = struct{}{} ) @@ -47,7 +48,7 @@ func TestMetricsRequest(t *testing.T) { ) } -func TestMetricsExporter_InvalidName(t *testing.T) { +func TestMetricsExporter_NilConfig(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) @@ -59,12 +60,33 @@ func TestMetricsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestMetricsExporterV2_NilLogger(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + func TestMetricsExporter_NilPushMetricsData(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetricsData, err) } +func TestMetricsExporterV2_NilMetricsConverter(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, + newFakeRequestSender(nil)) + require.Nil(t, me) + require.Equal(t, errNilMetricsConverter, err) +} + +func TestMetricsExporterV2_NilRequestSender(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + nil) + require.Nil(t, me) + require.Equal(t, errNilRequestSender, err) +} + func TestMetricsExporter_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil)) @@ -77,6 +99,19 @@ func TestMetricsExporter_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } +func TestMetricsExporterV2_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + func TestMetricsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -86,6 +121,16 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } +func TestMetricsExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + newFakeRequestSender(nil), WithCapabilities(capabilities)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -95,6 +140,26 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } +func TestMetricsExporterV2_Default_ConverterError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("convert_error") + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + fakeRequestConverter{metricsError: want}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsExporterV2_Default_RequestSenderError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("send_error") + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -107,6 +172,19 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, nil) } +func TestMetricsExporterV2_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, me) + + checkRecordedMetricsForMetricsExporter(t, tt, me, nil) +} + func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) @@ -120,6 +198,20 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, want) } +func TestMetricsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { + want := errors.New("my_error") + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, + newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + + checkRecordedMetricsForMetricsExporter(t, tt, me, want) +} + func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -158,6 +250,19 @@ func TestMetricsExporter_WithSpan(t *testing.T) { checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) } +func TestMetricsExporterV2_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + me, err := NewMetricsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, me) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) +} + func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -172,6 +277,20 @@ func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) } +func TestMetricsExporterV2_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + me, err := NewMetricsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, me) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) +} + func TestMetricsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } @@ -185,6 +304,20 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestMetricsExporterV2_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +330,19 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } +func TestMetricsExporterV2_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) +} + func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(ctx context.Context, td pmetric.Metrics) error { return retError @@ -228,7 +374,8 @@ func generateMetricsTraffic(t *testing.T, tracer trace.Tracer, me exporter.Metri } } -func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, me exporter.Metrics, wantError error, numMetricPoints int64) { +func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + me exporter.Metrics, wantError error, numMetricPoints int64) { // nolint: unparam const numRequests = 5 generateMetricsTraffic(t, tracer, me, numRequests, wantError) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2cf5f627d46..e21d7a580a2 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -70,33 +70,32 @@ func (qCfg *QueueSettings) Validate() error { } type queuedRetrySender struct { - fullName string - id component.ID - signal component.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler + fullName string + id component.ID + signal component.DataType + queueSettings queueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, + nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, + fullName: id.String(), + id: id, + signal: signal, + queueSettings: qs, + retryStopCh: retryStopCh, + traceAttribute: traceAttr, + logger: sampledLogger, } qrs.consumerSender = &retrySender{ @@ -109,8 +108,8 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue onTemporaryFailure: qrs.onTemporaryFailure, } - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize) + if !qs.persistenceEnabled() { + qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -143,16 +142,24 @@ func toStorageClient(ctx context.Context, storageID component.ID, host component // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID == nil { + if !qrs.queueSettings.persistenceEnabled() { return nil } - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ + Name: qrs.fullName, + Signal: qrs.signal, + Capacity: uint64(qrs.queueSettings.config.QueueSize), + Logger: qrs.logger, + Client: storageClient, + Marshaler: qrs.queueSettings.marshaler, + Unmarshaler: qrs.queueSettings.unmarshaler, + }) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -165,7 +172,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } @@ -179,7 +186,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna logger.Error( "Exporting failed. Queue did not accept requeuing request. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -191,13 +198,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return err } - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item internal.Request) { + qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { _ = qrs.consumerSender.send(item) item.OnProcessingFinished() }) // Start reporting queue length metric - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { err := globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(qrs.queue.Size()) }, metricdata.NewLabelValue(qrs.fullName)) @@ -205,7 +212,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(qrs.queueSettings.config.QueueSize) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -218,7 +225,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -287,12 +294,12 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.cfg.Enabled { + if !qrs.queueSettings.config.Enabled { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) } return err @@ -306,7 +313,7 @@ func (qrs *queuedRetrySender) send(req internal.Request) error { if !qrs.queue.Produce(req) { qrs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttribute)) return errSendingQueueIsFull @@ -391,7 +398,7 @@ func (rs *retrySender) send(req internal.Request) error { rs.logger.Error( "Exporting failed. The error is not retryable. Dropping data.", zap.Error(err), - zap.Int("dropped_items", req.Count()), + zap.Int("dropped_items", req.ItemsCount()), ) return err } diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index d3e8f4b8c64..3d440f3f955 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -36,11 +36,18 @@ func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { } } +func mockRequestMarshaler(_ internal.Request) ([]byte, error) { + return nil, nil +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR)) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(mockR) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -64,7 +71,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -90,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -117,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -151,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -181,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -228,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -261,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -288,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -309,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -344,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -478,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -510,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -535,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -559,7 +569,10 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) + be, err := newBaseExporter(set, bs, "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -583,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -617,6 +630,14 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { + bs := newBaseSettings(true, WithRetry(NewDefaultRetrySettings())) + assert.True(t, bs.requestExporter) + assert.Panics(t, func() { + _ = newBaseSettings(true, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + }) +} + type mockErrorRequest struct { baseRequest } @@ -633,7 +654,7 @@ func (mer *mockErrorRequest) Marshal() ([]byte, error) { return nil, nil } -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } @@ -684,7 +705,7 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } @@ -716,9 +737,9 @@ func newObservabilityConsumerSender(nextSender requestSender) *observabilityCons func (ocs *observabilityConsumerSender) send(req internal.Request) error { err := ocs.nextSender.send(req) if err != nil { - ocs.droppedItemsCount.Add(int64(req.Count())) + ocs.droppedItemsCount.Add(int64(req.ItemsCount())) } else { - ocs.sentItemsCount.Add(int64(req.Count())) + ocs.sentItemsCount.Add(int64(req.ItemsCount())) } ocs.waitGroup.Done() return err diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go new file mode 100644 index 00000000000..d18f0f9b275 --- /dev/null +++ b/exporter/exporterhelper/request.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// Request represents a single request that can be sent to the endpoint. +type Request interface { + // ItemsCount returns the count basic item in the request, the smallest pieces of data that can be sent to the endpoint. + // For example, for OTLP exporter, this value represents the number of spans, metric data points or log records. + ItemsCount() int +} + +// RequestSender is a helper function that sends a request. +type RequestSender func(ctx context.Context, req Request) error + +type request struct { + Request + baseRequest + sender RequestSender +} + +var _ internal.Request = (*request)(nil) + +func (req *request) Export(ctx context.Context) error { + return req.sender(ctx, req.Request) +} + +func (req *request) OnError(_ error) internal.Request { + // Potentially we could introduce a new RequestError type that would represent partially succeeded request. + // In that case we should consider returning them back to the pipeline converted back to pdata in case if + // sending queue is disabled. We leave it as a future improvement if decided that it's needed. + return req +} diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go new file mode 100644 index 00000000000..5963f7e0a6c --- /dev/null +++ b/exporter/exporterhelper/request_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type fakeRequest struct { + items int +} + +func (r fakeRequest) ItemsCount() int { + return r.items +} + +type fakeRequestConverter struct { + metricsError error + tracesError error + logsError error +} + +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { + return fakeRequest{items: md.DataPointCount()}, c.metricsError +} + +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { + return fakeRequest{items: td.SpanCount()}, c.tracesError +} + +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { + return fakeRequest{items: ld.LogRecordCount()}, c.logsError +} + +func newFakeRequestSender(err error) RequestSender { + return func(_ context.Context, _ Request) error { + return err + } +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 978ece2201c..3c6dd112ecf 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,9 +44,8 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } -// Marshal provides serialization capabilities required by persistent queue -func (req *tracesRequest) Marshal() ([]byte, error) { - return tracesMarshaler.MarshalTraces(req.td) +func tracesRequestMarshaler(req internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } func (req *tracesRequest) OnError(err error) internal.Request { @@ -59,7 +60,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -88,8 +89,10 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = tracesRequestMarshaler + bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err } @@ -104,7 +107,7 @@ func NewTracesExporter( req := newTracesRequest(ctx, td, pusher) serr := be.sender.send(req) if errors.Is(serr, errSendingQueueIsFull) { - be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) + be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.ItemsCount())) } return serr }, bs.consumerOptions...) @@ -115,6 +118,70 @@ func NewTracesExporter( }, err } +type TracesConverter interface { + // RequestFromTraces converts ptrace.Traces into a Request. + RequestFromTraces(context.Context, ptrace.Traces) (Request, error) +} + +// NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. +func NewTracesRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter TracesConverter, + sender RequestSender, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilTracesConverter + } + + if sender == nil { + return nil, errNilRequestSender + } + + bs := newBaseSettings(true, options...) + bs.RequestSender = sender + + be, err := newBaseExporter(set, bs, component.DataTypeTraces) + if err != nil { + return nil, err + } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) + + tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + req, cErr := converter.RequestFromTraces(ctx, td) + if cErr != nil { + set.Logger.Error("Failed to convert traces. Dropping data.", + zap.Int("dropped_spans", td.SpanCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + sErr := be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + sender: sender, + }) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordTracesEnqueueFailure(ctx, int64(req.ItemsCount())) + } + return sErr + }, bs.consumerOptions...) + + return &traceExporter{ + baseExporter: be, + Traces: tc, + }, err +} + type tracesExporterWithObservability struct { obsrep *obsExporter nextSender requestSender @@ -124,6 +191,6 @@ func (tewo *tracesExporterWithObservability) send(req internal.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req) - tewo.obsrep.EndTracesOp(req.Context(), req.Count(), err) + tewo.obsrep.EndTracesOp(req.Context(), req.ItemsCount(), err) return err } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 3b850ce6be2..3736fac2f29 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -33,6 +33,7 @@ const ( var ( fakeTracesExporterName = component.NewIDWithName("fake_traces_exporter", "with_name") + fakeTracesExporterV2Name = component.NewIDWithName("fake_traces_exporter_v2", "with_name") fakeTracesExporterConfig = struct{}{} ) @@ -55,12 +56,31 @@ func TestTracesExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestTracesExporterV2_NilLogger(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, + newFakeRequestSender(nil)) + require.Nil(t, te) + require.Equal(t, errNilLogger, err) +} + func TestTracesExporter_NilPushTraceData(t *testing.T) { te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, nil) require.Nil(t, te) require.Equal(t, errNilPushTraceData, err) } +func TestTracesExporterV2_NilTracesConverter(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil)) + require.Nil(t, te) + require.Equal(t, errNilTracesConverter, err) +} + +func TestTracesExporterV2_NilRequestSender(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil) + require.Nil(t, te) + require.Equal(t, errNilRequestSender, err) +} + func TestTracesExporter_Default(t *testing.T) { td := ptrace.NewTraces() te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil)) @@ -73,6 +93,18 @@ func TestTracesExporter_Default(t *testing.T) { assert.NoError(t, te.Shutdown(context.Background())) } +func TestTracesExporterV2_Default(t *testing.T) { + td := ptrace.NewTraces() + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + func TestTracesExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil), WithCapabilities(capabilities)) @@ -82,6 +114,15 @@ func TestTracesExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, te.Capabilities()) } +func TestTracesExporterV2_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) +} + func TestTracesExporter_Default_ReturnError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("my_error") @@ -93,6 +134,25 @@ func TestTracesExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } +func TestTracesExporterV2_Default_ConverterError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("convert_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{tracesError: want}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesExporterV2_Default_RequestSenderError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("send_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) +} + func TestTracesExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -105,6 +165,18 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, nil) } +func TestTracesExporterV2_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, te) + + checkRecordedMetricsForTracesExporter(t, tt, te, nil) +} + func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) @@ -118,6 +190,19 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, want) } +func TestTracesExporterV2_WithRecordMetrics_ReturnError(t *testing.T) { + want := errors.New("my_error") + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterV2Name) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, te) + + checkRecordedMetricsForTracesExporter(t, tt, te, want) +} + func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -157,6 +242,20 @@ func TestTracesExporter_WithSpan(t *testing.T) { checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1) } +func TestTracesExporterV2_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil)) + require.NoError(t, err) + require.NotNil(t, te) + + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1) +} + func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -172,6 +271,21 @@ func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) } +func TestTracesExporterV2_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want)) + require.NoError(t, err) + require.NotNil(t, te) + + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) +} + func TestTracesExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } @@ -185,6 +299,19 @@ func TestTracesExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestTracesExporterV2_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +324,18 @@ func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, te.Shutdown(context.Background()), want) } +func TestTracesExporterV2_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, te.Shutdown(context.Background()), want) +} + func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(ctx context.Context, td ptrace.Traces) error { return retError @@ -228,7 +367,8 @@ func generateTraceTraffic(t *testing.T, tracer trace.Tracer, te exporter.Traces, } } -func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, te exporter.Traces, wantError error, numSpans int64) { +func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + te exporter.Traces, wantError error, numSpans int64) { // nolint: unparam const numRequests = 5 generateTraceTraffic(t, tracer, te, numRequests, wantError)