Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce duplicate code in components helper #2186

Merged
merged 1 commit into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions component/componenthelper/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenthelper

import (
"context"

"go.opentelemetry.io/collector/component"
)

// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error

// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error

// ComponentSettings represents a settings struct to create components.
type ComponentSettings struct {
Start
Shutdown
}

// DefaultComponentSettings returns the default settings for a component. The Start and Shutdown are no-op.
func DefaultComponentSettings() *ComponentSettings {
return &ComponentSettings{
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}
}

type baseComponent struct {
start Start
shutdown Shutdown
}

// Start all senders and exporter and is invoked during service start.
func (be *baseComponent) Start(ctx context.Context, host component.Host) error {
return be.start(ctx, host)
}

// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseComponent) Shutdown(ctx context.Context) error {
return be.shutdown(ctx)
}

// NewComponent returns a component.Component that calls the given Start and Shutdown.
func NewComponent(s *ComponentSettings) component.Component {
return &baseComponent{
start: s.Start,
shutdown: s.Shutdown,
}
}
69 changes: 69 additions & 0 deletions component/componenthelper/component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenthelper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestDefaultSettings(t *testing.T) {
st := DefaultComponentSettings()
require.NotNil(t, st)
cp := NewComponent(st)
require.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, cp.Shutdown(context.Background()))
}

func TestWithStart(t *testing.T) {
startCalled := false
st := DefaultComponentSettings()
st.Start = func(context.Context, component.Host) error { startCalled = true; return nil }
cp := NewComponent(st)
assert.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost()))
assert.True(t, startCalled)
}

func TestWithStart_ReturnError(t *testing.T) {
want := errors.New("my_error")
st := DefaultComponentSettings()
st.Start = func(context.Context, component.Host) error { return want }
cp := NewComponent(st)
assert.Equal(t, want, cp.Start(context.Background(), componenttest.NewNopHost()))
}

func TestWithShutdown(t *testing.T) {
shutdownCalled := false
st := DefaultComponentSettings()
st.Shutdown = func(context.Context) error { shutdownCalled = true; return nil }
cp := NewComponent(st)
assert.NoError(t, cp.Shutdown(context.Background()))
assert.True(t, shutdownCalled)
}

func TestWithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
st := DefaultComponentSettings()
st.Shutdown = func(context.Context) error { return want }
cp := NewComponent(st)
assert.Equal(t, want, cp.Shutdown(context.Background()))
}
104 changes: 41 additions & 63 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ package exporterhelper

import (
"context"
"sync"
"time"

"go.opencensus.io/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
)
Expand All @@ -32,7 +31,7 @@ var (
okStatus = trace.Status{Code: trace.StatusCodeOK}
)

// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
// ComponentSettings for timeout. The timeout applies to individual attempts to send data to the backend.
type TimeoutSettings struct {
// Timeout is the timeout for every attempt to send data to the backend.
Timeout time.Duration `mapstructure:"timeout"`
Expand Down Expand Up @@ -76,34 +75,26 @@ func (req *baseRequest) setContext(ctx context.Context) {
req.ctx = ctx
}

// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error

// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error

// internalOptions represents all the options that users can configure.
type internalOptions struct {
// baseSettings represents all the options that users can configure.
type baseSettings struct {
*componenthelper.ComponentSettings
TimeoutSettings
QueueSettings
RetrySettings
ResourceToTelemetrySettings
Start
Shutdown
}

// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &internalOptions{
TimeoutSettings: CreateDefaultTimeoutSettings(),
opts := &baseSettings{
ComponentSettings: componenthelper.DefaultComponentSettings(),
TimeoutSettings: CreateDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
ResourceToTelemetrySettings: createDefaultResourceToTelemetrySettings(),
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}

for _, op := range options {
Expand All @@ -113,79 +104,75 @@ func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
return opts
}

// ExporterOption apply changes to internalOptions.
type ExporterOption func(*internalOptions)
// Option apply changes to baseSettings.
type Option func(*baseSettings)

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) ExporterOption {
return func(o *internalOptions) {
func WithShutdown(shutdown componenthelper.Shutdown) Option {
return func(o *baseSettings) {
o.Shutdown = shutdown
}
}

// WithStart overrides the default Start function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) ExporterOption {
return func(o *internalOptions) {
func WithStart(start componenthelper.Start) Option {
return func(o *baseSettings) {
o.Start = start
}
}

// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
return func(o *internalOptions) {
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseSettings) {
o.TimeoutSettings = timeoutSettings
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) ExporterOption {
return func(o *internalOptions) {
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseSettings) {
o.RetrySettings = retrySettings
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) ExporterOption {
return func(o *internalOptions) {
func WithQueue(queueSettings QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
}
}

// WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter.
// The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion.
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) ExporterOption {
return func(o *internalOptions) {
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option {
return func(o *baseSettings) {
o.ResourceToTelemetrySettings = resourceToTelemetrySettings
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
cfg configmodels.Exporter
sender requestSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdownOnce sync.Once
convertResourceToTelemetry bool
}

func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...ExporterOption) *baseExporter {
opts := fromConfiguredOptions(options...)
func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
bs := fromOptions(options)
be := &baseExporter{
Component: componenthelper.NewComponent(bs.ComponentSettings),
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
convertResourceToTelemetry: opts.ResourceToTelemetrySettings.Enabled,
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
}

be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}, logger)
be.qrSender = newQueuedRetrySender(bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.sender = be.qrSender

return be
Expand All @@ -199,31 +186,22 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques

// Start all senders and exporter and is invoked during service start.
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
err := componenterror.ErrAlreadyStarted
be.startOnce.Do(func() {
// First start the wrapped exporter.
err = be.start(ctx, host)
if err != nil {
// TODO: Log errors, or check if it is recorded by the caller.
return
}
// First start the wrapped exporter.
if err := be.Component.Start(ctx, host); err != nil {
return err
}

// If no error then start the queuedRetrySender.
be.qrSender.start()
})
return err
// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
}

// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseExporter) Shutdown(ctx context.Context) error {
err := componenterror.ErrAlreadyStopped
be.shutdownOnce.Do(func() {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
err = be.shutdown(ctx)
})
return err
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return be.Component.Shutdown(ctx)
}

// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
Expand Down
9 changes: 5 additions & 4 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ func TestBaseExporter(t *testing.T) {
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be := newBaseExporter(
defaultExporterCfg,
zap.NewNop(),
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }),
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(createDefaultResourceToTelemetrySettings()),
WithTimeout(CreateDefaultTimeoutSettings()),
)
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, be.Shutdown(context.Background()))
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
}

func errToStatus(err error) trace.Status {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewLogsExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
pushLogsData PushLogsData,
options ...ExporterOption,
options ...Option,
) (component.LogsExporter, error) {
if cfg == nil {
return nil, errNilConfig
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewMetricsExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
pushMetricsData PushMetricsData,
options ...ExporterOption,
options ...Option,
) (component.MetricsExporter, error) {
if cfg == nil {
return nil, errNilConfig
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewTraceExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
dataPusher traceDataPusher,
options ...ExporterOption,
options ...Option,
) (component.TracesExporter, error) {

if cfg == nil {
Expand Down
Loading