From c8deaa1e07b130ef773c4268c6ddcfffc2016145 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 19 Nov 2020 16:12:12 -0800 Subject: [PATCH] Reduce duplicate code in components helper Add componenthelper package to help building components like Processors, Exporters. Signed-off-by: Bogdan Drutu --- component/componenthelper/component.go | 64 ++++++++++++ component/componenthelper/component_test.go | 69 +++++++++++++ exporter/exporterhelper/common.go | 104 ++++++++------------ exporter/exporterhelper/common_test.go | 9 +- exporter/exporterhelper/logshelper.go | 2 +- exporter/exporterhelper/metricshelper.go | 2 +- exporter/exporterhelper/tracehelper.go | 2 +- processor/processorhelper/processor.go | 70 +++++++------ processor/processorhelper/processor_test.go | 42 ++------ 9 files changed, 224 insertions(+), 140 deletions(-) create mode 100644 component/componenthelper/component.go create mode 100644 component/componenthelper/component_test.go diff --git a/component/componenthelper/component.go b/component/componenthelper/component.go new file mode 100644 index 00000000000..0ecc6e5a610 --- /dev/null +++ b/component/componenthelper/component.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenthelper + +import ( + "context" + + "go.opentelemetry.io/collector/component" +) + +// Start specifies the function invoked when the exporter is being started. +type Start func(context.Context, component.Host) error + +// Shutdown specifies the function invoked when the exporter is being shutdown. +type Shutdown func(context.Context) error + +// ComponentSettings represents a settings struct to create components. +type ComponentSettings struct { + Start + Shutdown +} + +// DefaultComponentSettings returns the default settings for a component. The Start and Shutdown are no-op. +func DefaultComponentSettings() *ComponentSettings { + return &ComponentSettings{ + Start: func(ctx context.Context, host component.Host) error { return nil }, + Shutdown: func(ctx context.Context) error { return nil }, + } +} + +type baseComponent struct { + start Start + shutdown Shutdown +} + +// Start all senders and exporter and is invoked during service start. +func (be *baseComponent) Start(ctx context.Context, host component.Host) error { + return be.start(ctx, host) +} + +// Shutdown all senders and exporter and is invoked during service shutdown. +func (be *baseComponent) Shutdown(ctx context.Context) error { + return be.shutdown(ctx) +} + +// NewComponent returns a component.Component that calls the given Start and Shutdown. +func NewComponent(s *ComponentSettings) component.Component { + return &baseComponent{ + start: s.Start, + shutdown: s.Shutdown, + } +} diff --git a/component/componenthelper/component_test.go b/component/componenthelper/component_test.go new file mode 100644 index 00000000000..3d4ee6cfb49 --- /dev/null +++ b/component/componenthelper/component_test.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenthelper + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestDefaultSettings(t *testing.T) { + st := DefaultComponentSettings() + require.NotNil(t, st) + cp := NewComponent(st) + require.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, cp.Shutdown(context.Background())) +} + +func TestWithStart(t *testing.T) { + startCalled := false + st := DefaultComponentSettings() + st.Start = func(context.Context, component.Host) error { startCalled = true; return nil } + cp := NewComponent(st) + assert.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost())) + assert.True(t, startCalled) +} + +func TestWithStart_ReturnError(t *testing.T) { + want := errors.New("my_error") + st := DefaultComponentSettings() + st.Start = func(context.Context, component.Host) error { return want } + cp := NewComponent(st) + assert.Equal(t, want, cp.Start(context.Background(), componenttest.NewNopHost())) +} + +func TestWithShutdown(t *testing.T) { + shutdownCalled := false + st := DefaultComponentSettings() + st.Shutdown = func(context.Context) error { shutdownCalled = true; return nil } + cp := NewComponent(st) + assert.NoError(t, cp.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestWithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + st := DefaultComponentSettings() + st.Shutdown = func(context.Context) error { return want } + cp := NewComponent(st) + assert.Equal(t, want, cp.Shutdown(context.Background())) +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index af0ce73b1c4..92c6036b629 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -16,14 +16,13 @@ package exporterhelper import ( "context" - "sync" "time" "go.opencensus.io/trace" "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/component/componenthelper" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumererror" ) @@ -32,7 +31,7 @@ var ( okStatus = trace.Status{Code: trace.StatusCodeOK} ) -// Settings for timeout. The timeout applies to individual attempts to send data to the backend. +// ComponentSettings for timeout. The timeout applies to individual attempts to send data to the backend. type TimeoutSettings struct { // Timeout is the timeout for every attempt to send data to the backend. Timeout time.Duration `mapstructure:"timeout"` @@ -76,34 +75,26 @@ func (req *baseRequest) setContext(ctx context.Context) { req.ctx = ctx } -// Start specifies the function invoked when the exporter is being started. -type Start func(context.Context, component.Host) error - -// Shutdown specifies the function invoked when the exporter is being shutdown. -type Shutdown func(context.Context) error - -// internalOptions represents all the options that users can configure. -type internalOptions struct { +// baseSettings represents all the options that users can configure. +type baseSettings struct { + *componenthelper.ComponentSettings TimeoutSettings QueueSettings RetrySettings ResourceToTelemetrySettings - Start - Shutdown } -// fromConfiguredOptions returns the internal options starting from the default and applying all configured options. -func fromConfiguredOptions(options ...ExporterOption) *internalOptions { +// fromOptions returns the internal options starting from the default and applying all configured options. +func fromOptions(options []Option) *baseSettings { // Start from the default options: - opts := &internalOptions{ - TimeoutSettings: CreateDefaultTimeoutSettings(), + opts := &baseSettings{ + ComponentSettings: componenthelper.DefaultComponentSettings(), + TimeoutSettings: CreateDefaultTimeoutSettings(), // TODO: Enable queuing by default (call CreateDefaultQueueSettings) QueueSettings: QueueSettings{Enabled: false}, // TODO: Enable retry by default (call CreateDefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, ResourceToTelemetrySettings: createDefaultResourceToTelemetrySettings(), - Start: func(ctx context.Context, host component.Host) error { return nil }, - Shutdown: func(ctx context.Context) error { return nil }, } for _, op := range options { @@ -113,79 +104,75 @@ func fromConfiguredOptions(options ...ExporterOption) *internalOptions { return opts } -// ExporterOption apply changes to internalOptions. -type ExporterOption func(*internalOptions) +// Option apply changes to baseSettings. +type Option func(*baseSettings) // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. -func WithShutdown(shutdown Shutdown) ExporterOption { - return func(o *internalOptions) { +func WithShutdown(shutdown componenthelper.Shutdown) Option { + return func(o *baseSettings) { o.Shutdown = shutdown } } // WithStart overrides the default Start function for an exporter. // The default shutdown function does nothing and always returns nil. -func WithStart(start Start) ExporterOption { - return func(o *internalOptions) { +func WithStart(start componenthelper.Start) Option { + return func(o *baseSettings) { o.Start = start } } // WithTimeout overrides the default TimeoutSettings for an exporter. // The default TimeoutSettings is 5 seconds. -func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption { - return func(o *internalOptions) { +func WithTimeout(timeoutSettings TimeoutSettings) Option { + return func(o *baseSettings) { o.TimeoutSettings = timeoutSettings } } // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. -func WithRetry(retrySettings RetrySettings) ExporterOption { - return func(o *internalOptions) { +func WithRetry(retrySettings RetrySettings) Option { + return func(o *baseSettings) { o.RetrySettings = retrySettings } } // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) ExporterOption { - return func(o *internalOptions) { +func WithQueue(queueSettings QueueSettings) Option { + return func(o *baseSettings) { o.QueueSettings = queueSettings } } // WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter. // The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion. -func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) ExporterOption { - return func(o *internalOptions) { +func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option { + return func(o *baseSettings) { o.ResourceToTelemetrySettings = resourceToTelemetrySettings } } // baseExporter contains common fields between different exporter types. type baseExporter struct { + component.Component cfg configmodels.Exporter sender requestSender qrSender *queuedRetrySender - start Start - shutdown Shutdown - startOnce sync.Once - shutdownOnce sync.Once convertResourceToTelemetry bool } -func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...ExporterOption) *baseExporter { - opts := fromConfiguredOptions(options...) +func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...Option) *baseExporter { + bs := fromOptions(options) be := &baseExporter{ + Component: componenthelper.NewComponent(bs.ComponentSettings), cfg: cfg, - start: opts.Start, - shutdown: opts.Shutdown, - convertResourceToTelemetry: opts.ResourceToTelemetrySettings.Enabled, + convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled, } - be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}, logger) + be.qrSender = newQueuedRetrySender(bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger) be.sender = be.qrSender return be @@ -199,31 +186,22 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques // Start all senders and exporter and is invoked during service start. func (be *baseExporter) Start(ctx context.Context, host component.Host) error { - err := componenterror.ErrAlreadyStarted - be.startOnce.Do(func() { - // First start the wrapped exporter. - err = be.start(ctx, host) - if err != nil { - // TODO: Log errors, or check if it is recorded by the caller. - return - } + // First start the wrapped exporter. + if err := be.Component.Start(ctx, host); err != nil { + return err + } - // If no error then start the queuedRetrySender. - be.qrSender.start() - }) - return err + // If no error then start the queuedRetrySender. + be.qrSender.start() + return nil } // Shutdown all senders and exporter and is invoked during service shutdown. func (be *baseExporter) Shutdown(ctx context.Context) error { - err := componenterror.ErrAlreadyStopped - be.shutdownOnce.Do(func() { - // First shutdown the queued retry sender - be.qrSender.shutdown() - // Last shutdown the wrapped exporter itself. - err = be.shutdown(ctx) - }) - return err + // First shutdown the queued retry sender + be.qrSender.shutdown() + // Last shutdown the wrapped exporter itself. + return be.Component.Shutdown(ctx) } // timeoutSender is a request sender that adds a `timeout` to every request that passes this sender. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index bbef2862370..fb14c712f0c 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -44,16 +44,17 @@ func TestBaseExporter(t *testing.T) { } func TestBaseExporterWithOptions(t *testing.T) { + want := errors.New("my error") be := newBaseExporter( defaultExporterCfg, zap.NewNop(), - WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }), - WithShutdown(func(ctx context.Context) error { return errors.New("my error") }), + WithStart(func(ctx context.Context, host component.Host) error { return want }), + WithShutdown(func(ctx context.Context) error { return want }), WithResourceToTelemetryConversion(createDefaultResourceToTelemetrySettings()), WithTimeout(CreateDefaultTimeoutSettings()), ) - require.Error(t, be.Start(context.Background(), componenttest.NewNopHost())) - require.Error(t, be.Shutdown(context.Background())) + require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) + require.Equal(t, want, be.Shutdown(context.Background())) } func errToStatus(err error) trace.Status { diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index 26df9fe96f1..e552399218a 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -73,7 +73,7 @@ func NewLogsExporter( cfg configmodels.Exporter, logger *zap.Logger, pushLogsData PushLogsData, - options ...ExporterOption, + options ...Option, ) (component.LogsExporter, error) { if cfg == nil { return nil, errNilConfig diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 55bfe167e28..ac1f88e4c2f 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -78,7 +78,7 @@ func NewMetricsExporter( cfg configmodels.Exporter, logger *zap.Logger, pushMetricsData PushMetricsData, - options ...ExporterOption, + options ...Option, ) (component.MetricsExporter, error) { if cfg == nil { return nil, errNilConfig diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 5238bf34b7b..a94ca63b534 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -74,7 +74,7 @@ func NewTraceExporter( cfg configmodels.Exporter, logger *zap.Logger, dataPusher traceDataPusher, - options ...ExporterOption, + options ...Option, ) (component.TracesExporter, error) { if cfg == nil { diff --git a/processor/processorhelper/processor.go b/processor/processorhelper/processor.go index 85c1a40c637..286fd25ba4c 100644 --- a/processor/processorhelper/processor.go +++ b/processor/processorhelper/processor.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/component/componenthelper" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" @@ -30,12 +31,6 @@ import ( // to stop further processing without propagating an error back up the pipeline to logs. var ErrSkipProcessingData = errors.New("sentinel error to skip processing data from the remainder of the pipeline") -// Start specifies the function invoked when the processor is being started. -type Start func(context.Context, component.Host) error - -// Shutdown specifies the function invoked when the processor is being shutdown. -type Shutdown func(context.Context) error - // TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTraceProcessor. type TProcessor interface { // ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component. @@ -58,74 +53,75 @@ type LProcessor interface { } // Option apply changes to internalOptions. -type Option func(*baseProcessor) +type Option func(*baseSettings) // WithStart overrides the default Start function for an processor. // The default shutdown function does nothing and always returns nil. -func WithStart(start Start) Option { - return func(o *baseProcessor) { - o.start = start +func WithStart(start componenthelper.Start) Option { + return func(o *baseSettings) { + o.Start = start } } // WithShutdown overrides the default Shutdown function for an processor. // The default shutdown function does nothing and always returns nil. -func WithShutdown(shutdown Shutdown) Option { - return func(o *baseProcessor) { - o.shutdown = shutdown +func WithShutdown(shutdown componenthelper.Shutdown) Option { + return func(o *baseSettings) { + o.Shutdown = shutdown } } // WithShutdown overrides the default GetCapabilities function for an processor. // The default GetCapabilities function returns mutable capabilities. func WithCapabilities(capabilities component.ProcessorCapabilities) Option { - return func(o *baseProcessor) { + return func(o *baseSettings) { o.capabilities = capabilities } } +type baseSettings struct { + *componenthelper.ComponentSettings + capabilities component.ProcessorCapabilities +} + +// fromOptions returns the internal settings starting from the default and applying all options. +func fromOptions(options []Option) *baseSettings { + // Start from the default options: + opts := &baseSettings{ + ComponentSettings: componenthelper.DefaultComponentSettings(), + capabilities: component.ProcessorCapabilities{MutatesConsumedData: true}, + } + + for _, op := range options { + op(opts) + } + + return opts +} + // internalOptions contains internalOptions concerning how an Processor is configured. type baseProcessor struct { + component.Component fullName string - start Start - shutdown Shutdown capabilities component.ProcessorCapabilities } // Construct the internalOptions from multiple Option. func newBaseProcessor(fullName string, options ...Option) baseProcessor { + bs := fromOptions(options) be := baseProcessor{ + Component: componenthelper.NewComponent(bs.ComponentSettings), fullName: fullName, - capabilities: component.ProcessorCapabilities{MutatesConsumedData: true}, - } - - for _, op := range options { - op(&be) + capabilities: bs.capabilities, } return be } -// Start the processor, invoked during service start. -func (bp *baseProcessor) Start(ctx context.Context, host component.Host) error { - if bp.start != nil { - return bp.start(ctx, host) - } - return nil -} - func (bp *baseProcessor) GetCapabilities() component.ProcessorCapabilities { return bp.capabilities } -// Shutdown the processor, invoked during service shutdown. -func (bp *baseProcessor) Shutdown(ctx context.Context) error { - if bp.shutdown != nil { - return bp.shutdown(ctx) - } - return nil -} - type tracesProcessor struct { baseProcessor processor TProcessor diff --git a/processor/processorhelper/processor_test.go b/processor/processorhelper/processor_test.go index 4bf5e438c82..345843186a5 100644 --- a/processor/processorhelper/processor_test.go +++ b/processor/processorhelper/processor_test.go @@ -38,45 +38,21 @@ var testCfg = &configmodels.ProcessorSettings{ NameVal: testFullName, } -func TestWithStart(t *testing.T) { - startCalled := false - start := func(context.Context, component.Host) error { startCalled = true; return nil } - - bp := newBaseProcessor(testFullName, WithStart(start)) +func TestDefaultOptions(t *testing.T) { + bp := newBaseProcessor(testFullName) + assert.True(t, bp.GetCapabilities().MutatesConsumedData) assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) - assert.True(t, startCalled) -} - -func TestWithStart_ReturnError(t *testing.T) { - want := errors.New("my_error") - start := func(context.Context, component.Host) error { return want } - - bp := newBaseProcessor(testFullName, WithStart(start)) - assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost())) -} - -func TestWithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - bp := newBaseProcessor(testFullName, WithShutdown(shutdown)) assert.NoError(t, bp.Shutdown(context.Background())) - assert.True(t, shutdownCalled) } -func TestWithShutdown_ReturnError(t *testing.T) { +func TestWithOptions(t *testing.T) { want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - bp := newBaseProcessor(testFullName, WithShutdown(shutdownErr)) + bp := newBaseProcessor(testFullName, + WithStart(func(context.Context, component.Host) error { return want }), + WithShutdown(func(context.Context) error { return want }), + WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: false})) + assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, want, bp.Shutdown(context.Background())) -} - -func TestWithCapabilities(t *testing.T) { - bp := newBaseProcessor(testFullName) - assert.True(t, bp.GetCapabilities().MutatesConsumedData) - - bp = newBaseProcessor(testFullName, WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: false})) assert.False(t, bp.GetCapabilities().MutatesConsumedData) }