diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..600a71f1b0c --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,12 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce a new exporter helper that operates over client-provided requests instead of pdata + +# One or more tracking issues or pull requests related to the change +issues: [7874] + diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 08b3e015e41..8077d465741 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,32 +56,46 @@ func (req *baseRequest) OnProcessingFinished() { } } +type queueSettings struct { + config QueueSettings + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler +} + +func (qs *queueSettings) persistenceEnabled() bool { + return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil +} + // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - QueueSettings + queueSettings RetrySettings + requestExporter bool } -// fromOptions returns the internal options starting from the default and applying all configured options. -func fromOptions(options ...Option) *baseSettings { - // Start from the default options: - opts := &baseSettings{ +// newBaseSettings returns the baseSettings starting from the default and applying all configured options. +// requestExporter indicates whether the base settings are for a new request exporter or not. +func newBaseSettings(requestExporter bool, options ...Option) *baseSettings { + bs := &baseSettings{ + requestExporter: requestExporter, TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) - QueueSettings: QueueSettings{Enabled: false}, + queueSettings: queueSettings{ + config: QueueSettings{Enabled: false}, + }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, } for _, op := range options { - op(opts) + op(bs) } - return opts + return bs } // Option apply changes to baseSettings. @@ -121,9 +135,13 @@ func WithRetry(retrySettings RetrySettings) Option { // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) Option { +// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { - o.QueueSettings = queueSettings + if o.requestExporter { + panic("queueing is not available for the new request exporters yet") + } + o.queueSettings.config = config } } @@ -145,7 +163,7 @@ type baseExporter struct { qrSender *queuedRetrySender } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { be := &baseExporter{} var err error @@ -154,7 +172,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8f5a5376c39..9c5bcbe2513 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -14,11 +14,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/ptrace" ) var ( @@ -35,7 +32,11 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "") + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) + be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -45,12 +46,12 @@ func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( defaultSettings, - fromOptions( + newBaseSettings( + false, WithStart(func(ctx context.Context, host component.Host) error { return want }), WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), "", - nopRequestUnmarshaler(), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) @@ -65,13 +66,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd) } } - -func nopTracePusher() consumer.ConsumeTracesFunc { - return func(ctx context.Context, ld ptrace.Traces) error { - return nil - } -} - -func nopRequestUnmarshaler() internal.RequestUnmarshaler { - return newTraceRequestUnmarshalerFunc(nopTracePusher()) -} diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index bdcbf1a4fd6..a7cfca32aca 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -18,4 +18,10 @@ var ( errNilPushMetricsData = errors.New("nil PushMetrics") // errNilPushLogsData is returned when a nil PushLogs is given. errNilPushLogsData = errors.New("nil PushLogs") + // errNilTracesConverter is returned when a nil TracesConverter is given. + errNilTracesConverter = errors.New("nil TracesConverter") + // errNilMetricsConverter is returned when a nil MetricsConverter is given. + errNilMetricsConverter = errors.New("nil MetricsConverter") + // errNilLogsConverter is returned when a nil LogsConverter is given. + errNilLogsConverter = errors.New("nil LogsConverter") ) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index a7956d6f569..63a617daebd 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } +type PersistentQueueSettings struct { + Name string + Signal component.DataType + Capacity uint64 + Logger *zap.Logger + Client storage.Client + Unmarshaler RequestUnmarshaler + Marshaler RequestMarshaler +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { return &persistentQueue{ stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 13505440580..8b1ffb7674e 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ + Name: "foo", + Signal: component.DataTypeTraces, + Capacity: uint64(capacity), + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) return wq.(*persistentQueue) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1d79d63a124..cbbcf2e03c5 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -43,6 +43,7 @@ type persistentContiguousStorage struct { queueName string client storage.Client unmarshaler RequestUnmarshaler + marshaler RequestMarshaler putChan chan struct{} stopChan chan struct{} @@ -80,14 +81,15 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, + logger: set.Logger, + client: set.Client, queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), + unmarshaler: set.Unmarshaler, + marshaler: set.Marshaler, + capacity: set.Capacity, + putChan: make(chan struct{}, set.Capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 85c99cf51e9..a80ba93c5c3 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { - return bof.set(key, value, requestToBytes) + return bof.set(key, value, bof.requestToBytes) } // setItemIndex adds Set operation over a given itemIndex to the batch @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) { return val, err } -func requestToBytes(req any) ([]byte, error) { - return req.(Request).Marshal() +func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { + return bof.pcs.marshaler(req.(Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index b27836fd2b6..5d33fc11064 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ + Capacity: capacity, + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func newFakeTracesRequestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + marshaler := ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + } +} + func TestPersistentStorage_CorruptedData(t *testing.T) { path := t.TempDir() diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 390b35e94bd..454a42782ce 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -22,9 +22,6 @@ type Request interface { // Count returns the count of spans/metric points or log records. Count() int - // Marshal serializes the current request into a byte stream - Marshal() ([]byte, error) - // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() @@ -34,3 +31,6 @@ type Request interface { // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request type RequestUnmarshaler func([]byte) (Request, error) + +// RequestMarshaler defines a function which takes a request and marshals it into a byte slice +type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b53bfc8addb..0fdf1fc3858 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } +func logsRequestMarshaler(req internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +} + func (req *logsRequest) OnError(err error) internal.Request { var logError consumererror.Logs if errors.As(err, &logError) { @@ -54,10 +60,6 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Marshal() ([]byte, error) { - return logsMarshaler.MarshalLogs(req.ld) -} - func (req *logsRequest) Count() int { return req.ld.LogRecordCount() } @@ -87,8 +89,10 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = logsRequestMarshaler + bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err } @@ -114,6 +118,55 @@ func NewLogsExporter( }, err } +type LogsConverter interface { + // RequestFromLogs converts plog.Logs data into a request. + RequestFromLogs(context.Context, plog.Logs) (Request, error) +} + +// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. +func NewLogsRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter LogsConverter, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilLogsConverter + } + + bs := newBaseSettings(true, options...) + + be, err := newBaseExporter(set, bs, component.DataTypeLogs) + if err != nil { + return nil, err + } + + // TODO: Add new observability tracing/metrics to the new exporterhelper. + + lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + req, cErr := converter.RequestFromLogs(ctx, ld) + if cErr != nil { + set.Logger.Error("Failed to convert logs. Dropping data.", + zap.Int("dropped_log_records", ld.LogRecordCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + }) + }, bs.consumerOptions...) + + return &logsExporter{ + baseExporter: be, + Logs: lc, + }, err +} + type logsExporterWithObservability struct { obsrep *obsExporter nextSender requestSender diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 3aa50b26e02..14e35679ff4 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -59,12 +60,24 @@ func TestLogsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestLogsRequestExporter_NilLogger(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + func TestLogsExporter_NilPushLogsData(t *testing.T) { le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, nil) require.Nil(t, le) require.Equal(t, errNilPushLogsData, err) } +func TestLogsRequestExporter_NilLogsConverter(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil) + require.Nil(t, le) + require.Equal(t, errNilLogsConverter, err) +} + func TestLogsExporter_Default(t *testing.T) { ld := plog.NewLogs() le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil)) @@ -77,6 +90,18 @@ func TestLogsExporter_Default(t *testing.T) { assert.NoError(t, le.Shutdown(context.Background())) } +func TestLogsRequestExporter_Default(t *testing.T) { + ld := plog.NewLogs() + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + func TestLogsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil), WithCapabilities(capabilities)) @@ -86,6 +111,15 @@ func TestLogsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, le.Capabilities()) } +func TestLogsRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + func TestLogsExporter_Default_ReturnError(t *testing.T) { ld := plog.NewLogs() want := errors.New("my_error") @@ -95,7 +129,26 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } -func TestLogsExporter_WithRecordLogs(t *testing.T) { +func TestLogsRequestExporter_Default_ConvertError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("convert_error") + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{logsError: want}) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsRequestExporter_Default_ExportError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("export_error") + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) @@ -107,7 +160,7 @@ func TestLogsExporter_WithRecordLogs(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } -func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { +func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -184,6 +237,18 @@ func TestLogsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestLogsRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdown)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Nil(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -195,6 +260,17 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func TestLogsRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdownErr)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, le.Shutdown(context.Background()), want) +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(ctx context.Context, td plog.Logs) error { return retError @@ -225,7 +301,8 @@ func generateLogsTraffic(t *testing.T, tracer trace.Tracer, le exporter.Logs, nu } } -func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, wantError error, numLogRecords int64) { +func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, + wantError error, numLogRecords int64) { // nolint: unparam const numRequests = 5 generateLogsTraffic(t, tracer, le, numRequests, wantError) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 1639a45fcab..906d0d6f855 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } +func metricsRequestMarshaler(req internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +} + func (req *metricsRequest) OnError(err error) internal.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { @@ -54,11 +60,6 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -// Marshal provides serialization capabilities required by persistent queue -func (req *metricsRequest) Marshal() ([]byte, error) { - return metricsMarshaler.MarshalMetrics(req.md) -} - func (req *metricsRequest) Count() int { return req.md.DataPointCount() } @@ -88,8 +89,10 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = metricsRequestMarshaler + bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err } @@ -115,6 +118,55 @@ func NewMetricsExporter( }, err } +type MetricsConverter interface { + // RequestFromMetrics converts pdata.Metrics into a request. + RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) +} + +// NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender. +func NewMetricsRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter MetricsConverter, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilMetricsConverter + } + + bs := newBaseSettings(true, options...) + + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + if err != nil { + return nil, err + } + + // TODO: Add new observability tracing/metrics to the new exporterhelper. + + mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + req, cErr := converter.RequestFromMetrics(ctx, md) + if cErr != nil { + set.Logger.Error("Failed to convert metrics. Dropping data.", + zap.Int("dropped_data_points", md.DataPointCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.sender.send(&request{ + Request: req, + baseRequest: baseRequest{ctx: ctx}, + }) + }, bs.consumerOptions...) + + return &metricsExporter{ + baseExporter: be, + Metrics: mc, + }, err +} + type metricsSenderWithObservability struct { obsrep *obsExporter nextSender requestSender diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5ebc2d9ec65..c8ea7587219 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -47,7 +48,7 @@ func TestMetricsRequest(t *testing.T) { ) } -func TestMetricsExporter_InvalidName(t *testing.T) { +func TestMetricsExporter_NilConfig(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) @@ -59,12 +60,24 @@ func TestMetricsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestMetricsRequestExporter_NilLogger(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + func TestMetricsExporter_NilPushMetricsData(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetricsData, err) } +func TestMetricsRequestExporter_NilMetricsConverter(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil) + require.Nil(t, me) + require.Equal(t, errNilMetricsConverter, err) +} + func TestMetricsExporter_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil)) @@ -77,6 +90,18 @@ func TestMetricsExporter_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } +func TestMetricsRequestExporter_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + func TestMetricsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -86,6 +111,16 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } +func TestMetricsRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + WithCapabilities(capabilities)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -95,6 +130,25 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } +func TestMetricsRequestExporter_Default_ConvertError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("convert_error") + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{metricsError: want}) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsRequestExporter_Default_ExportError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("export_error") + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -185,6 +239,20 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestMetricsRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, WithShutdown(shutdown)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +265,19 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } +func TestMetricsRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, WithShutdown(shutdownErr)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) +} + func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(ctx context.Context, td pmetric.Metrics) error { return retError @@ -228,7 +309,8 @@ func generateMetricsTraffic(t *testing.T, tracer trace.Tracer, me exporter.Metri } } -func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, me exporter.Metrics, wantError error, numMetricPoints int64) { +func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + me exporter.Metrics, wantError error, numMetricPoints int64) { // nolint: unparam const numRequests = 5 generateMetricsTraffic(t, tracer, me, numRequests, wantError) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2cf5f627d46..79505445b47 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -70,33 +70,32 @@ func (qCfg *QueueSettings) Validate() error { } type queuedRetrySender struct { - fullName string - id component.ID - signal component.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler + fullName string + id component.ID + signal component.DataType + queueSettings queueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, + nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, + fullName: id.String(), + id: id, + signal: signal, + queueSettings: qs, + retryStopCh: retryStopCh, + traceAttribute: traceAttr, + logger: sampledLogger, } qrs.consumerSender = &retrySender{ @@ -109,8 +108,8 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue onTemporaryFailure: qrs.onTemporaryFailure, } - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize) + if !qs.persistenceEnabled() { + qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -143,16 +142,24 @@ func toStorageClient(ctx context.Context, storageID component.ID, host component // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID == nil { + if !qrs.queueSettings.persistenceEnabled() { return nil } - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ + Name: qrs.fullName, + Signal: qrs.signal, + Capacity: uint64(qrs.queueSettings.config.QueueSize), + Logger: qrs.logger, + Client: storageClient, + Marshaler: qrs.queueSettings.marshaler, + Unmarshaler: qrs.queueSettings.unmarshaler, + }) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -191,13 +198,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return err } - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item internal.Request) { + qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { _ = qrs.consumerSender.send(item) item.OnProcessingFinished() }) // Start reporting queue length metric - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { err := globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(qrs.queue.Size()) }, metricdata.NewLabelValue(qrs.fullName)) @@ -205,7 +212,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(qrs.queueSettings.config.QueueSize) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -218,7 +225,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -287,7 +294,7 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.cfg.Enabled { + if !qrs.queueSettings.config.Enabled { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index d3e8f4b8c64..a55eade8680 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -36,11 +36,18 @@ func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { } } +func mockRequestMarshaler(_ internal.Request) ([]byte, error) { + return nil, nil +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR)) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(mockR) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -64,7 +71,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -90,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -117,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -151,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -181,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -228,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -261,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -288,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -309,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -344,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -478,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -510,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -535,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -559,7 +569,10 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) + be, err := newBaseExporter(set, bs, "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -583,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -617,6 +630,14 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { + bs := newBaseSettings(true, WithRetry(NewDefaultRetrySettings())) + assert.True(t, bs.requestExporter) + assert.Panics(t, func() { + _ = newBaseSettings(true, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + }) +} + type mockErrorRequest struct { baseRequest } diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go new file mode 100644 index 00000000000..30f250c8ba0 --- /dev/null +++ b/exporter/exporterhelper/request.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// Request represents a single request that can be sent to an external endpoint. +type Request interface { + // Export exports the request to an external endpoint. + Export(ctx context.Context) error +} + +// RequestItemsCounter is an optional interface that can be implemented by Request to provide a number of items +// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing +// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. +// If not implemented, collector's logs, metrics and traces will report 0 items. +type RequestItemsCounter interface { + // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be + // sent. For example, for OTLP exporter, this value represents the number of spans, + // metric data points or log records. + ItemsCount() int +} + +type request struct { + Request + baseRequest +} + +var _ internal.Request = (*request)(nil) + +func (req *request) OnError(_ error) internal.Request { + // Potentially we could introduce a new RequestError type that would represent partially succeeded request. + // In that case we should consider returning them back to the pipeline converted back to pdata in case if + // sending queue is disabled. We leave it as a future improvement if decided that it's needed. + return req +} + +// Count returns a number of items in the request. If the request does not implement RequestItemsCounter +// then 0 is returned. +func (req *request) Count() int { + if counter, ok := req.Request.(RequestItemsCounter); ok { + return counter.ItemsCount() + } + return 0 +} diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go new file mode 100644 index 00000000000..6dd3f67800a --- /dev/null +++ b/exporter/exporterhelper/request_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type fakeRequest struct { + items int + err error +} + +func (r fakeRequest) Export(_ context.Context) error { + return r.err +} + +func (r fakeRequest) ItemsCount() int { + return r.items +} + +type fakeRequestConverter struct { + metricsError error + tracesError error + logsError error + requestError error +} + +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { + return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError +} + +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { + return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError +} + +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { + return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 978ece2201c..886c0a2f197 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,9 +44,8 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } -// Marshal provides serialization capabilities required by persistent queue -func (req *tracesRequest) Marshal() ([]byte, error) { - return tracesMarshaler.MarshalTraces(req.td) +func tracesRequestMarshaler(req internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } func (req *tracesRequest) OnError(err error) internal.Request { @@ -88,8 +89,10 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = tracesRequestMarshaler + bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err } @@ -115,6 +118,55 @@ func NewTracesExporter( }, err } +type TracesConverter interface { + // RequestFromTraces converts ptrace.Traces into a Request. + RequestFromTraces(context.Context, ptrace.Traces) (Request, error) +} + +// NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. +func NewTracesRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter TracesConverter, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilTracesConverter + } + + bs := newBaseSettings(true, options...) + + be, err := newBaseExporter(set, bs, component.DataTypeTraces) + if err != nil { + return nil, err + } + + // TODO: Add new observability tracing/metrics to the new exporterhelper. + + tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + req, cErr := converter.RequestFromTraces(ctx, td) + if cErr != nil { + set.Logger.Error("Failed to convert traces. Dropping data.", + zap.Int("dropped_spans", td.SpanCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + }) + }, bs.consumerOptions...) + + return &traceExporter{ + baseExporter: be, + Traces: tc, + }, err +} + type tracesExporterWithObservability struct { obsrep *obsExporter nextSender requestSender diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 3b850ce6be2..0b314426f10 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -55,12 +56,24 @@ func TestTracesExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestTracesRequestExporter_NilLogger(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}) + require.Nil(t, te) + require.Equal(t, errNilLogger, err) +} + func TestTracesExporter_NilPushTraceData(t *testing.T) { te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, nil) require.Nil(t, te) require.Equal(t, errNilPushTraceData, err) } +func TestTracesRequestExporter_NilTracesConverter(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil) + require.Nil(t, te) + require.Equal(t, errNilTracesConverter, err) +} + func TestTracesExporter_Default(t *testing.T) { td := ptrace.NewTraces() te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil)) @@ -73,6 +86,18 @@ func TestTracesExporter_Default(t *testing.T) { assert.NoError(t, te.Shutdown(context.Background())) } +func TestTracesRequestExporter_Default(t *testing.T) { + td := ptrace.NewTraces() + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + func TestTracesExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil), WithCapabilities(capabilities)) @@ -82,6 +107,15 @@ func TestTracesExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, te.Capabilities()) } +func TestTracesRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithCapabilities(capabilities)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) +} + func TestTracesExporter_Default_ReturnError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("my_error") @@ -93,6 +127,25 @@ func TestTracesExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } +func TestTracesRequestExporter_Default_ConvertError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("convert_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{tracesError: want}) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesRequestExporter_Default_ExportError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) +} + func TestTracesExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -185,6 +238,19 @@ func TestTracesExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestTracesRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdown)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +263,18 @@ func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, te.Shutdown(context.Background()), want) } +func TestTracesRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdownErr)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, te.Shutdown(context.Background()), want) +} + func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(ctx context.Context, td ptrace.Traces) error { return retError @@ -228,7 +306,8 @@ func generateTraceTraffic(t *testing.T, tracer trace.Tracer, te exporter.Traces, } } -func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, te exporter.Traces, wantError error, numSpans int64) { +func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + te exporter.Traces, wantError error, numSpans int64) { // nolint: unparam const numRequests = 5 generateTraceTraffic(t, tracer, te, numRequests, wantError)