From e63aed8fefe5e02249894960f050c5de4baa0350 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 15 Dec 2022 12:43:00 -0800 Subject: [PATCH] Improve service Config types name (#6787) (#6803) Signed-off-by: Bogdan Drutu Signed-off-by: Bogdan Drutu Signed-off-by: Bogdan Drutu --- .chloggen/draftbuilder.yaml | 11 + exporter/exporter.go | 76 +++ exporter/exportertest/nop_exporter.go | 10 + exporter/exportertest/nop_exporter_test.go | 28 + extension/extension.go | 36 ++ extension/extensiontest/nop_extension.go | 10 + extension/extensiontest/nop_extension_test.go | 16 + otelcol/collector.go | 22 +- processor/processor.go | 80 +++ processor/processortest/nop_processor.go | 12 + processor/processortest/nop_processor_test.go | 28 + receiver/receiver.go | 76 +++ receiver/receivertest/nop_receiver.go | 10 + receiver/receivertest/nop_receiver_test.go | 28 + service/extensions/extensions.go | 22 +- service/extensions/extensions_test.go | 4 +- service/host.go | 24 +- service/pipelines.go | 182 ++---- service/pipelines_test.go | 523 +++++++++--------- service/service.go | 72 +-- service/service_test.go | 36 +- service/zpages.go | 2 +- 22 files changed, 791 insertions(+), 517 deletions(-) create mode 100755 .chloggen/draftbuilder.yaml diff --git a/.chloggen/draftbuilder.yaml b/.chloggen/draftbuilder.yaml new file mode 100755 index 00000000000..954bdd051de --- /dev/null +++ b/.chloggen/draftbuilder.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: components + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add [receiver|processor|exporter|extension].Builder to help with creating components form a set of configs and factories + +# One or more tracking issues or pull requests related to the change +issues: [6803] diff --git a/exporter/exporter.go b/exporter/exporter.go index 22f817f3902..e049e150a19 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" ) @@ -170,3 +172,77 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) { } return fMap, nil } + +// Builder exporter is a helper struct that given a set of Configs and Factories helps with creating exporters. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +// NewBuilder creates a new exporter.Builder to help with creating components form a set of configs and factories. +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// CreateTraces creates a Traces exporter based on the settings and config. +func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings) (Traces, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("exporter %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("exporter factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesExporterStability()) + return f.CreateTracesExporter(ctx, set, cfg) +} + +// CreateMetrics creates a Metrics exporter based on the settings and config. +func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings) (Metrics, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("exporter %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("exporter factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsExporterStability()) + return f.CreateMetricsExporter(ctx, set, cfg) +} + +// CreateLogs creates a Logs exporter based on the settings and config. +func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings) (Logs, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("exporter %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("exporter factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsExporterStability()) + return f.CreateLogsExporter(ctx, set, cfg) +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} + +// logStabilityLevel logs the stability level of a component. The log level is set to info for +// undefined, unmaintained, deprecated and development. The log level is set to debug +// for alpha, beta and stable. +func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { + if sl >= component.StabilityLevelAlpha { + logger.Debug(sl.LogMessage()) + } else { + logger.Info(sl.LogMessage()) + } +} diff --git a/exporter/exportertest/nop_exporter.go b/exporter/exportertest/nop_exporter.go index c1c9fc1e3a1..f504a547b5d 100644 --- a/exporter/exportertest/nop_exporter.go +++ b/exporter/exportertest/nop_exporter.go @@ -20,6 +20,8 @@ import ( "go.opentelemetry.io/collector/exporter" ) +const typeStr = "nop" + // NewNopCreateSettings returns a new nop settings for Create*Exporter functions. func NewNopCreateSettings() exporter.CreateSettings { return exporter.CreateSettings{ @@ -30,3 +32,11 @@ func NewNopCreateSettings() exporter.CreateSettings { // NewNopFactory returns an exporter.Factory that constructs nop exporters. var NewNopFactory = componenttest.NewNopExporterFactory //nolint:staticcheck + +// NewNopBuilder returns a exporter.Builder that constructs nop receivers. +func NewNopBuilder() *exporter.Builder { + nopFactory := NewNopFactory() + return exporter.NewBuilder( + map[component.ID]component.Config{component.NewID(typeStr): nopFactory.CreateDefaultConfig()}, + map[component.Type]exporter.Factory{typeStr: nopFactory}) +} diff --git a/exporter/exportertest/nop_exporter_test.go b/exporter/exportertest/nop_exporter_test.go index cd8b48cb0a1..531a5f2fcb1 100644 --- a/exporter/exportertest/nop_exporter_test.go +++ b/exporter/exportertest/nop_exporter_test.go @@ -53,3 +53,31 @@ func TestNewNopFactory(t *testing.T) { assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logs.Shutdown(context.Background())) } + +func TestNewNopBuilder(t *testing.T) { + builder := NewNopBuilder() + require.NotNil(t, builder) + + factory := NewNopFactory() + cfg := factory.CreateDefaultConfig() + set := NewNopCreateSettings() + set.ID = component.NewID(typeStr) + + traces, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + bTraces, err := builder.CreateTraces(context.Background(), set) + require.NoError(t, err) + assert.IsType(t, traces, bTraces) + + metrics, err := factory.CreateMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + bMetrics, err := builder.CreateMetrics(context.Background(), set) + require.NoError(t, err) + assert.IsType(t, metrics, bMetrics) + + logs, err := factory.CreateLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + bLogs, err := builder.CreateLogs(context.Background(), set) + require.NoError(t, err) + assert.IsType(t, logs, bLogs) +} diff --git a/extension/extension.go b/extension/extension.go index 7993cb420cc..391f23a6674 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -106,3 +106,39 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) { } return fMap, nil } + +// Builder extension is a helper struct that given a set of Configs and Factories helps with creating extensions. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +// NewBuilder creates a new extension.Builder to help with creating components form a set of configs and factories. +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// Create creates an extension based on the settings and configs available. +func (b *Builder) Create(ctx context.Context, set CreateSettings) (Extension, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("extension %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("extension factory not available for: %q", set.ID) + } + + sl := f.ExtensionStability() + if sl >= component.StabilityLevelAlpha { + set.Logger.Debug(sl.LogMessage()) + } else { + set.Logger.Info(sl.LogMessage()) + } + return f.CreateExtension(ctx, set, cfg) +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} diff --git a/extension/extensiontest/nop_extension.go b/extension/extensiontest/nop_extension.go index 5275ae532e4..03b4f520485 100644 --- a/extension/extensiontest/nop_extension.go +++ b/extension/extensiontest/nop_extension.go @@ -20,6 +20,8 @@ import ( "go.opentelemetry.io/collector/extension" ) +const typeStr = "nop" + // NewNopCreateSettings returns a new nop settings for extension.Factory Create* functions. func NewNopCreateSettings() extension.CreateSettings { return extension.CreateSettings{ @@ -30,3 +32,11 @@ func NewNopCreateSettings() extension.CreateSettings { // NewNopFactory returns an extension.Factory that constructs nop extensions. var NewNopFactory = componenttest.NewNopExtensionFactory //nolint:staticcheck + +// NewNopBuilder returns a extension.Builder that constructs nop receivers. +func NewNopBuilder() *extension.Builder { + nopFactory := NewNopFactory() + return extension.NewBuilder( + map[component.ID]component.Config{component.NewID(typeStr): nopFactory.CreateDefaultConfig()}, + map[component.Type]extension.Factory{typeStr: nopFactory}) +} diff --git a/extension/extensiontest/nop_extension_test.go b/extension/extensiontest/nop_extension_test.go index ad5e39ef674..16ec0910397 100644 --- a/extension/extensiontest/nop_extension_test.go +++ b/extension/extensiontest/nop_extension_test.go @@ -37,3 +37,19 @@ func TestNewNopFactory(t *testing.T) { assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.Shutdown(context.Background())) } + +func TestNewNopBuilder(t *testing.T) { + builder := NewNopBuilder() + require.NotNil(t, builder) + + factory := NewNopFactory() + cfg := factory.CreateDefaultConfig() + set := NewNopCreateSettings() + set.ID = component.NewID(typeStr) + + ext, err := factory.CreateExtension(context.Background(), set, cfg) + require.NoError(t, err) + bExt, err := builder.Create(context.Background(), set) + require.NoError(t, err) + assert.IsType(t, ext, bExt) +} diff --git a/otelcol/collector.go b/otelcol/collector.go index 00282d81bd8..f695637da13 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -29,7 +29,11 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/otelcol/internal/grpclog" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service" ) @@ -157,17 +161,13 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } col.service, err = service.New(ctx, service.Settings{ - BuildInfo: col.set.BuildInfo, - ReceiverFactories: col.set.Factories.Receivers, - ReceiverConfigs: cfg.Receivers, - ProcessorFactories: col.set.Factories.Processors, - ProcessorConfigs: cfg.Processors, - ExporterFactories: col.set.Factories.Exporters, - ExporterConfigs: cfg.Exporters, - ExtensionFactories: col.set.Factories.Extensions, - ExtensionConfigs: cfg.Extensions, - AsyncErrorChannel: col.asyncErrorChannel, - LoggingOptions: col.set.LoggingOptions, + BuildInfo: col.set.BuildInfo, + Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers), + Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors), + Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters), + Extensions: extension.NewBuilder(cfg.Extensions, col.set.Factories.Extensions), + AsyncErrorChannel: col.asyncErrorChannel, + LoggingOptions: col.set.LoggingOptions, }, cfg.Service) if err != nil { return err diff --git a/processor/processor.go b/processor/processor.go index eb35a08f36e..9b78c1b766f 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -15,7 +15,13 @@ package processor // import "go.opentelemetry.io/collector/processor" import ( + "context" + "fmt" + + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" ) // Traces is a processor that can consume traces. @@ -63,3 +69,77 @@ var NewFactory = component.NewProcessorFactory //nolint:staticcheck // MakeFactoryMap takes a list of factories and returns a map with Factory type as keys. // It returns a non-nil error when there are factories with duplicate type. var MakeFactoryMap = component.MakeProcessorFactoryMap //nolint:staticcheck + +// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +// NewBuilder creates a new processor.Builder to help with creating components form a set of configs and factories. +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// CreateTraces creates a Traces processor based on the settings and config. +func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesProcessorStability()) + return f.CreateTracesProcessor(ctx, set, cfg, next) +} + +// CreateMetrics creates a Metrics processor based on the settings and config. +func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsProcessorStability()) + return f.CreateMetricsProcessor(ctx, set, cfg, next) +} + +// CreateLogs creates a Logs processor based on the settings and config. +func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsProcessorStability()) + return f.CreateLogsProcessor(ctx, set, cfg, next) +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} + +// logStabilityLevel logs the stability level of a component. The log level is set to info for +// undefined, unmaintained, deprecated and development. The log level is set to debug +// for alpha, beta and stable. +func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { + if sl >= component.StabilityLevelAlpha { + logger.Debug(sl.LogMessage()) + } else { + logger.Info(sl.LogMessage()) + } +} diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index a804d29ba00..1b3a23a74b1 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -15,11 +15,23 @@ package processortest // import "go.opentelemetry.io/collector/processor/processortest" import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/processor" ) +const typeStr = "nop" + // NewNopCreateSettings returns a new nop settings for Create* functions. var NewNopCreateSettings = componenttest.NewNopProcessorCreateSettings //nolint:staticcheck // NewNopFactory returns a component.ProcessorFactory that constructs nop processors. var NewNopFactory = componenttest.NewNopProcessorFactory //nolint:staticcheck + +// NewNopBuilder returns a processor.Builder that constructs nop receivers. +func NewNopBuilder() *processor.Builder { + nopFactory := NewNopFactory() + return processor.NewBuilder( + map[component.ID]component.Config{component.NewID(typeStr): nopFactory.CreateDefaultConfig()}, + map[component.Type]processor.Factory{typeStr: nopFactory}) +} diff --git a/processor/processortest/nop_processor_test.go b/processor/processortest/nop_processor_test.go index aada07465d9..9a9515ecd9a 100644 --- a/processor/processortest/nop_processor_test.go +++ b/processor/processortest/nop_processor_test.go @@ -58,3 +58,31 @@ func TestNewNopFactory(t *testing.T) { assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logs.Shutdown(context.Background())) } + +func TestNewNopBuilder(t *testing.T) { + builder := NewNopBuilder() + require.NotNil(t, builder) + + factory := NewNopFactory() + cfg := factory.CreateDefaultConfig() + set := NewNopCreateSettings() + set.ID = component.NewID(typeStr) + + traces, err := factory.CreateTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop()) + require.NoError(t, err) + assert.IsType(t, traces, bTraces) + + metrics, err := factory.CreateMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop()) + require.NoError(t, err) + assert.IsType(t, metrics, bMetrics) + + logs, err := factory.CreateLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop()) + require.NoError(t, err) + assert.IsType(t, logs, bLogs) +} diff --git a/receiver/receiver.go b/receiver/receiver.go index 3de2673fc45..98b65761e16 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" ) @@ -195,3 +197,77 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) { } return fMap, nil } + +// Builder receiver is a helper struct that given a set of Configs and Factories helps with creating receivers. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +// NewBuilder creates a new receiver.Builder to help with creating components form a set of configs and factories. +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// CreateTraces creates a Traces receiver based on the settings and config. +func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesReceiverStability()) + return f.CreateTracesReceiver(ctx, set, cfg, next) +} + +// CreateMetrics creates a Metrics receiver based on the settings and config. +func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsReceiverStability()) + return f.CreateMetricsReceiver(ctx, set, cfg, next) +} + +// CreateLogs creates a Logs receiver based on the settings and config. +func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) { + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsReceiverStability()) + return f.CreateLogsReceiver(ctx, set, cfg, next) +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} + +// logStabilityLevel logs the stability level of a component. The log level is set to info for +// undefined, unmaintained, deprecated and development. The log level is set to debug +// for alpha, beta and stable. +func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { + if sl >= component.StabilityLevelAlpha { + logger.Debug(sl.LogMessage()) + } else { + logger.Info(sl.LogMessage()) + } +} diff --git a/receiver/receivertest/nop_receiver.go b/receiver/receivertest/nop_receiver.go index f1f275843b2..753b6f25c06 100644 --- a/receiver/receivertest/nop_receiver.go +++ b/receiver/receivertest/nop_receiver.go @@ -20,6 +20,8 @@ import ( "go.opentelemetry.io/collector/receiver" ) +const typeStr = "nop" + // NewNopCreateSettings returns a new nop settings for Create* functions. func NewNopCreateSettings() receiver.CreateSettings { return receiver.CreateSettings{ @@ -30,3 +32,11 @@ func NewNopCreateSettings() receiver.CreateSettings { // NewNopFactory returns a receiver.Factory that constructs nop receivers. var NewNopFactory = componenttest.NewNopReceiverFactory //nolint:staticcheck + +// NewNopBuilder returns a receiver.Builder that constructs nop receivers. +func NewNopBuilder() *receiver.Builder { + nopFactory := NewNopFactory() + return receiver.NewBuilder( + map[component.ID]component.Config{component.NewID(typeStr): nopFactory.CreateDefaultConfig()}, + map[component.Type]receiver.Factory{typeStr: nopFactory}) +} diff --git a/receiver/receivertest/nop_receiver_test.go b/receiver/receivertest/nop_receiver_test.go index 68882a11259..8eb7be75c89 100644 --- a/receiver/receivertest/nop_receiver_test.go +++ b/receiver/receivertest/nop_receiver_test.go @@ -48,3 +48,31 @@ func TestNewNopFactory(t *testing.T) { assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, logs.Shutdown(context.Background())) } + +func TestNewNopBuilder(t *testing.T) { + builder := NewNopBuilder() + require.NotNil(t, builder) + + factory := NewNopFactory() + cfg := factory.CreateDefaultConfig() + set := NewNopCreateSettings() + set.ID = component.NewID(typeStr) + + traces, err := factory.CreateTracesReceiver(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop()) + require.NoError(t, err) + assert.IsType(t, traces, bTraces) + + metrics, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop()) + require.NoError(t, err) + assert.IsType(t, metrics, bMetrics) + + logs, err := factory.CreateLogsReceiver(context.Background(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop()) + require.NoError(t, err) + assert.IsType(t, logs, bLogs) +} diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 6082c8df34c..5ddc87ce239 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -123,30 +123,26 @@ type Settings struct { Telemetry component.TelemetrySettings BuildInfo component.BuildInfo - // Configs is a map of component.ID to component.Config. + // Drepecated: [v0.68.0] use Extensions. Configs map[component.ID]component.Config - // Factories maps extension type names in the config to the respective extension.Factory. + // Drepecated: [v0.68.0] use Extensions. Factories map[component.Type]extension.Factory + + // Extensions builder for extensions. + Extensions *extension.Builder } // New creates a new Extensions from Config. func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { + if set.Extensions == nil { + set.Extensions = extension.NewBuilder(set.Configs, set.Factories) + } exts := &Extensions{ telemetry: set.Telemetry, extMap: make(map[component.ID]extension.Extension), } for _, extID := range cfg { - extCfg, existsCfg := set.Configs[extID] - if !existsCfg { - return nil, fmt.Errorf("extension %q is not configured", extID) - } - - factory, existsFactory := set.Factories[extID.Type()] - if !existsFactory { - return nil, fmt.Errorf("extension factory for type %q is not configured", extID.Type()) - } - extSet := extension.CreateSettings{ ID: extID, TelemetrySettings: set.Telemetry, @@ -154,7 +150,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { } extSet.TelemetrySettings.Logger = extensionLogger(set.Telemetry.Logger, extID) - ext, err := factory.CreateExtension(ctx, extSet, extCfg) + ext, err := set.Extensions.Create(ctx, extSet) if err != nil { return nil, fmt.Errorf("failed to create extension %q: %w", extID, err) } diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index 6149127f075..b3090247c26 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -48,7 +48,7 @@ func TestBuildExtensions(t *testing.T) { serviceExtensions: []component.ID{ component.NewID("myextension"), }, - wantErrMsg: "extension \"myextension\" is not configured", + wantErrMsg: "failed to create extension \"myextension\": extension \"myextension\" is not configured", }, { name: "missing_extension_factory", @@ -58,7 +58,7 @@ func TestBuildExtensions(t *testing.T) { serviceExtensions: []component.ID{ component.NewID("unknown"), }, - wantErrMsg: "extension factory for type \"unknown\" is not configured", + wantErrMsg: "failed to create extension \"unknown\": extension factory not available for: \"unknown\"", }, { name: "error_on_create_extension", diff --git a/service/host.go b/service/host.go index 1a722b07d32..506ce0cd88b 100644 --- a/service/host.go +++ b/service/host.go @@ -26,16 +26,16 @@ import ( var _ component.Host = (*serviceHost)(nil) type serviceHost struct { - asyncErrorChannel chan error - receiverFactories map[component.Type]receiver.Factory - processorFactories map[component.Type]processor.Factory - exporterFactories map[component.Type]exporter.Factory - extensionFactories map[component.Type]extension.Factory + asyncErrorChannel chan error + receivers *receiver.Builder + processors *processor.Builder + exporters *exporter.Builder + extensions *extension.Builder buildInfo component.BuildInfo - pipelines *builtPipelines - extensions *extensions.Extensions + pipelines *builtPipelines + serviceExtensions *extensions.Extensions } // ReportFatalError is used to report to the host that the receiver encountered @@ -48,19 +48,19 @@ func (host *serviceHost) ReportFatalError(err error) { func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: - return host.receiverFactories[componentType] + return host.receivers.Factory(componentType) case component.KindProcessor: - return host.processorFactories[componentType] + return host.processors.Factory(componentType) case component.KindExporter: - return host.exporterFactories[componentType] + return host.exporters.Factory(componentType) case component.KindExtension: - return host.extensionFactories[componentType] + return host.extensions.Factory(componentType) } return nil } func (host *serviceHost) GetExtensions() map[component.ID]component.Component { - return host.extensions.GetExtensions() + return host.serviceExtensions.GetExtensions() } func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Component { diff --git a/service/pipelines.go b/service/pipelines.go index 7c711b351dd..a13cb52d71d 100644 --- a/service/pipelines.go +++ b/service/pipelines.go @@ -185,23 +185,9 @@ type pipelinesSettings struct { Telemetry component.TelemetrySettings BuildInfo component.BuildInfo - // ReceiverFactories maps receiver type names in the config to the respective receiver.Factory. - ReceiverFactories map[component.Type]receiver.Factory - - // ReceiverConfigs is a map of component.ID to component.Config. - ReceiverConfigs map[component.ID]component.Config - - // ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory. - ProcessorFactories map[component.Type]processor.Factory - - // ProcessorConfigs is a map of component.ID to component.Config. - ProcessorConfigs map[component.ID]component.Config - - // ExporterFactories maps exporter type names in the config to the respective exporter.Factory. - ExporterFactories map[component.Type]exporter.Factory - - // ExporterConfigs is a map of component.ID to component.Config. - ExporterConfigs map[component.ID]component.Config + Receivers *receiver.Builder + Processors *processor.Builder + Exporters *exporter.Builder // PipelineConfigs is a map of component.ID to PipelineConfig. PipelineConfigs map[component.ID]*PipelineConfig @@ -249,7 +235,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines BuildInfo: set.BuildInfo, } cSet.TelemetrySettings.Logger = exporterLogger(set.Telemetry.Logger, expID, pipelineID.Type()) - exp, err := buildExporter(ctx, cSet, set.ExporterConfigs, set.ExporterFactories, pipelineID) + exp, err := buildExporter(ctx, cSet, set.Exporters, pipelineID) if err != nil { return nil, err } @@ -283,7 +269,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines BuildInfo: set.BuildInfo, } cSet.TelemetrySettings.Logger = processorLogger(set.Telemetry.Logger, procID, pipelineID) - proc, err := buildProcessor(ctx, cSet, set.ProcessorConfigs, set.ProcessorFactories, pipelineID, bp.lastConsumer) + proc, err := buildProcessor(ctx, cSet, set.Processors, pipelineID, bp.lastConsumer) if err != nil { return nil, err } @@ -340,7 +326,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines BuildInfo: set.BuildInfo, } cSet.TelemetrySettings.Logger = receiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type()) - recv, err := buildReceiver(ctx, cSet, set.ReceiverConfigs, set.ReceiverFactories, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) + recv, err := buildReceiver(ctx, cSet, set.Receivers, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) if err != nil { return nil, err } @@ -355,42 +341,23 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines func buildExporter( ctx context.Context, set exporter.CreateSettings, - cfgs map[component.ID]component.Config, - factories map[component.Type]exporter.Factory, + builder *exporter.Builder, pipelineID component.ID, -) (component.Component, error) { - cfg, existsCfg := cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("exporter %q is not configured", set.ID) - } - - factory, existsFactory := factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("exporter factory not available for: %q", set.ID) - } - - components.LogStabilityLevel(set.TelemetrySettings.Logger, getExporterStabilityLevel(factory, pipelineID.Type())) - - exp, err := createExporter(ctx, set, cfg, pipelineID, factory) - if err != nil { - return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", set.ID, pipelineID, err) - } - - return exp, nil -} - -func createExporter(ctx context.Context, set exporter.CreateSettings, cfg component.Config, pipelineID component.ID, factory exporter.Factory) (component.Component, error) { +) (exp component.Component, err error) { switch pipelineID.Type() { case component.DataTypeTraces: - return factory.CreateTracesExporter(ctx, set, cfg) - + exp, err = builder.CreateTraces(ctx, set) case component.DataTypeMetrics: - return factory.CreateMetricsExporter(ctx, set, cfg) - + exp, err = builder.CreateMetrics(ctx, set) case component.DataTypeLogs: - return factory.CreateLogsExporter(ctx, set, cfg) + exp, err = builder.CreateLogs(ctx, set) + default: + return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) } - return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) + if err != nil { + return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", set.ID, pipelineID, err) + } + return exp, nil } func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces { @@ -427,56 +394,26 @@ func exporterLogger(logger *zap.Logger, id component.ID, dt component.DataType) zap.String(components.ZapNameKey, id.String())) } -func getExporterStabilityLevel(factory exporter.Factory, dt component.DataType) component.StabilityLevel { - switch dt { - case component.DataTypeTraces: - return factory.TracesExporterStability() - case component.DataTypeMetrics: - return factory.MetricsExporterStability() - case component.DataTypeLogs: - return factory.LogsExporterStability() - } - return component.StabilityLevelUndefined -} - func buildProcessor(ctx context.Context, set processor.CreateSettings, - cfgs map[component.ID]component.Config, - factories map[component.Type]processor.Factory, + builder *processor.Builder, pipelineID component.ID, next baseConsumer, -) (component.Component, error) { - procCfg, existsCfg := cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", set.ID) - } - - factory, existsFactory := factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("processor factory not available for: %q", set.ID) - } - - components.LogStabilityLevel(set.TelemetrySettings.Logger, getProcessorStabilityLevel(factory, pipelineID.Type())) - - proc, err := createProcessor(ctx, set, procCfg, pipelineID, next, factory) - if err != nil { - return nil, fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, pipelineID, err) - } - return proc, nil -} - -func createProcessor(ctx context.Context, set processor.CreateSettings, cfg component.Config, pipelineID component.ID, next baseConsumer, factory processor.Factory) (component.Component, error) { +) (proc component.Component, err error) { switch pipelineID.Type() { case component.DataTypeTraces: - return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces)) - + proc, err = builder.CreateTraces(ctx, set, next.(consumer.Traces)) case component.DataTypeMetrics: - return factory.CreateMetricsProcessor(ctx, set, cfg, next.(consumer.Metrics)) - + proc, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics)) case component.DataTypeLogs: - return factory.CreateLogsProcessor(ctx, set, cfg, next.(consumer.Logs)) + proc, err = builder.CreateLogs(ctx, set, next.(consumer.Logs)) + default: + return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) } - return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) + if err != nil { + return nil, fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, pipelineID, err) + } + return proc, nil } func processorLogger(logger *zap.Logger, procID component.ID, pipelineID component.ID) *zap.Logger { @@ -486,67 +423,38 @@ func processorLogger(logger *zap.Logger, procID component.ID, pipelineID compone zap.String(components.ZapKindPipeline, pipelineID.String())) } -func getProcessorStabilityLevel(factory processor.Factory, dt component.DataType) component.StabilityLevel { - switch dt { - case component.DataTypeTraces: - return factory.TracesProcessorStability() - case component.DataTypeMetrics: - return factory.MetricsProcessorStability() - case component.DataTypeLogs: - return factory.LogsProcessorStability() - } - return component.StabilityLevelUndefined -} - func buildReceiver(ctx context.Context, set receiver.CreateSettings, - cfgs map[component.ID]component.Config, - factories map[component.Type]receiver.Factory, + builder *receiver.Builder, pipelineID component.ID, nexts []baseConsumer, -) (component.Component, error) { - cfg, existsCfg := cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("receiver %q is not configured", set.ID) - } - - factory, existsFactory := factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) - } - - components.LogStabilityLevel(set.TelemetrySettings.Logger, getReceiverStabilityLevel(factory, pipelineID.Type())) - - recv, err := createReceiver(ctx, set, cfg, pipelineID, nexts, factory) - if err != nil { - return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err) - } - - return recv, nil -} - -func createReceiver(ctx context.Context, set receiver.CreateSettings, cfg component.Config, pipelineID component.ID, nexts []baseConsumer, factory receiver.Factory) (component.Component, error) { +) (recv component.Component, err error) { switch pipelineID.Type() { case component.DataTypeTraces: var consumers []consumer.Traces for _, next := range nexts { consumers = append(consumers, next.(consumer.Traces)) } - return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers)) + recv, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) case component.DataTypeMetrics: var consumers []consumer.Metrics for _, next := range nexts { consumers = append(consumers, next.(consumer.Metrics)) } - return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers)) + recv, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) case component.DataTypeLogs: var consumers []consumer.Logs for _, next := range nexts { consumers = append(consumers, next.(consumer.Logs)) } - return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers)) + recv, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) + default: + return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) + } + if err != nil { + return nil, fmt.Errorf("failed to create %q receiver, in pipeline %q: %w", set.ID, pipelineID, err) } - return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) + return recv, nil } func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger { @@ -556,18 +464,6 @@ func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) zap.String(components.ZapKindPipeline, string(dt))) } -func getReceiverStabilityLevel(factory receiver.Factory, dt component.DataType) component.StabilityLevel { - switch dt { - case component.DataTypeTraces: - return factory.TracesReceiverStability() - case component.DataTypeMetrics: - return factory.MetricsReceiverStability() - case component.DataTypeLogs: - return factory.LogsReceiverStability() - } - return component.StabilityLevelUndefined -} - func (bps *builtPipelines) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData { sumData := zpages.SummaryPipelinesTableData{} sumData.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(bps.pipelines)) diff --git a/service/pipelines_test.go b/service/pipelines_test.go index 6972fbd7aa0..4346c50bbe4 100644 --- a/service/pipelines_test.go +++ b/service/pipelines_test.go @@ -184,27 +184,30 @@ func TestBuildPipelines(t *testing.T) { pipelines, err := buildPipelines(context.Background(), pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]receiver.Factory{ - testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, - }, - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), - component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), - }, - ProcessorFactories: map[component.Type]processor.Factory{ - testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory, - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), - component.NewIDWithName("exampleprocessor", "1"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), - }, - ExporterFactories: map[component.Type]exporter.Factory{ - testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory, - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), - component.NewIDWithName("exampleexporter", "1"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), - }, + Receivers: receiver.NewBuilder( + map[component.ID]component.Config{ + component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + }, + map[component.Type]receiver.Factory{ + testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, + }), + Processors: processor.NewBuilder( + map[component.ID]component.Config{ + component.NewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), + component.NewIDWithName("exampleprocessor", "1"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), + }, + map[component.Type]processor.Factory{ + testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory, + }), + Exporters: exporter.NewBuilder( + map[component.ID]component.Config{ + component.NewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), + component.NewIDWithName("exampleexporter", "1"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), + }, + map[component.Type]exporter.Factory{ + testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory, + }), PipelineConfigs: test.pipelineConfigs, }) assert.NoError(t, err) @@ -300,318 +303,299 @@ func TestBuildErrors(t *testing.T) { badExporterFactory := newBadExporterFactory() tests := []struct { - name string - settings pipelinesSettings - expected string + name string + ReceiverConfigs map[component.ID]component.Config + ProcessorConfigs map[component.ID]component.Config + ExporterConfigs map[component.ID]component.Config + PipelineConfigs map[component.ID]*PipelineConfig + expected string }{ { name: "not_supported_exporter_logs", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, }, }, expected: "failed to create \"bf\" exporter, in pipeline \"logs\": telemetry type is not supported", }, { name: "not_supported_exporter_metrics", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, }, }, expected: "failed to create \"bf\" exporter, in pipeline \"metrics\": telemetry type is not supported", }, { name: "not_supported_exporter_traces", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, }, }, expected: "failed to create \"bf\" exporter, in pipeline \"traces\": telemetry type is not supported", }, { name: "not_supported_processor_logs", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" processor, in pipeline \"logs\": telemetry type is not supported", }, { name: "not_supported_processor_metrics", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" processor, in pipeline \"metrics\": telemetry type is not supported", }, { name: "not_supported_processor_traces", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" processor, in pipeline \"traces\": telemetry type is not supported", }, { name: "not_supported_receiver_logs", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" receiver, in pipeline \"logs\": telemetry type is not supported", }, { name: "not_supported_receiver_metrics", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" receiver, in pipeline \"metrics\": telemetry type is not supported", }, { name: "not_supported_receiver_traces", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("bf"): badReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, expected: "failed to create \"bf\" receiver, in pipeline \"traces\": telemetry type is not supported", }, { name: "unknown_exporter_config", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, }, }, - expected: "exporter \"nop/1\" is not configured", + expected: "failed to create \"nop/1\" exporter, in pipeline \"traces\": exporter \"nop/1\" is not configured", }, { name: "unknown_exporter_factory", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("unknown"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("unknown")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("unknown"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("unknown")}, }, }, - expected: "exporter factory not available for: \"unknown\"", + expected: "failed to create \"unknown\" exporter, in pipeline \"traces\": exporter factory not available for: \"unknown\"", }, { name: "unknown_processor_config", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "processor \"nop/1\" is not configured", + expected: "failed to create \"nop/1\" processor, in pipeline \"metrics\": processor \"nop/1\" is not configured", }, { name: "unknown_processor_factory", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID("unknown"): nopProcessorFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("unknown")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ProcessorConfigs: map[component.ID]component.Config{ + component.NewID("unknown"): nopProcessorFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("metrics"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("unknown")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "processor factory not available for: \"unknown\"", + expected: "failed to create \"unknown\" processor, in pipeline \"metrics\": processor factory not available for: \"unknown\"", }, { name: "unknown_receiver_config", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "1")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "receiver \"nop/1\" is not configured", + expected: "failed to create \"nop/1\" receiver, in pipeline \"logs\": receiver \"nop/1\" is not configured", }, { name: "unknown_receiver_factory", - settings: pipelinesSettings{ - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID("unknown"): nopReceiverFactory.CreateDefaultConfig(), - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - PipelineConfigs: map[component.ID]*PipelineConfig{ - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("unknown")}, - Exporters: []component.ID{component.NewID("nop")}, - }, + ReceiverConfigs: map[component.ID]component.Config{ + component.NewID("unknown"): nopReceiverFactory.CreateDefaultConfig(), + }, + ExporterConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + PipelineConfigs: map[component.ID]*PipelineConfig{ + component.NewID("logs"): { + Receivers: []component.ID{component.NewID("unknown")}, + Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "receiver factory not available for: \"unknown\"", + expected: "failed to create \"unknown\" receiver, in pipeline \"logs\": receiver factory not available for: \"unknown\"", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - set := test.settings - set.BuildInfo = component.NewDefaultBuildInfo() - set.Telemetry = componenttest.NewNopTelemetrySettings() - set.ReceiverFactories = map[component.Type]receiver.Factory{ - nopReceiverFactory.Type(): nopReceiverFactory, - badReceiverFactory.Type(): badReceiverFactory, - } - set.ProcessorFactories = map[component.Type]processor.Factory{ - nopProcessorFactory.Type(): nopProcessorFactory, - badProcessorFactory.Type(): badProcessorFactory, - } - set.ExporterFactories = map[component.Type]exporter.Factory{ - nopExporterFactory.Type(): nopExporterFactory, - badExporterFactory.Type(): badExporterFactory, + set := pipelinesSettings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + Receivers: receiver.NewBuilder( + test.ReceiverConfigs, + map[component.Type]receiver.Factory{ + nopReceiverFactory.Type(): nopReceiverFactory, + badReceiverFactory.Type(): badReceiverFactory, + }), + Processors: processor.NewBuilder( + test.ProcessorConfigs, + map[component.Type]processor.Factory{ + nopProcessorFactory.Type(): nopProcessorFactory, + badProcessorFactory.Type(): badProcessorFactory, + }), + Exporters: exporter.NewBuilder( + test.ExporterConfigs, + map[component.Type]exporter.Factory{ + nopExporterFactory.Type(): nopExporterFactory, + badExporterFactory.Type(): badExporterFactory, + }), + PipelineConfigs: test.PipelineConfigs, } _, err := buildPipelines(context.Background(), set) @@ -631,30 +615,33 @@ func TestFailToStartAndShutdown(t *testing.T) { set := pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]receiver.Factory{ - nopReceiverFactory.Type(): nopReceiverFactory, - errReceiverFactory.Type(): errReceiverFactory, - }, - ReceiverConfigs: map[component.ID]component.Config{ - component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), - component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), - }, - ProcessorFactories: map[component.Type]processor.Factory{ - nopProcessorFactory.Type(): nopProcessorFactory, - errProcessorFactory.Type(): errProcessorFactory, - }, - ProcessorConfigs: map[component.ID]component.Config{ - component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), - component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), - }, - ExporterFactories: map[component.Type]exporter.Factory{ - nopExporterFactory.Type(): nopExporterFactory, - errExporterFactory.Type(): errExporterFactory, - }, - ExporterConfigs: map[component.ID]component.Config{ - component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), - component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), - }, + Receivers: receiver.NewBuilder( + map[component.ID]component.Config{ + component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), + component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), + }, + map[component.Type]receiver.Factory{ + nopReceiverFactory.Type(): nopReceiverFactory, + errReceiverFactory.Type(): errReceiverFactory, + }), + Processors: processor.NewBuilder( + map[component.ID]component.Config{ + component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), + component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), + }, + map[component.Type]processor.Factory{ + nopProcessorFactory.Type(): nopProcessorFactory, + errProcessorFactory.Type(): errProcessorFactory, + }), + Exporters: exporter.NewBuilder( + map[component.ID]component.Config{ + component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), + component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), + }, + map[component.Type]exporter.Factory{ + nopExporterFactory.Type(): nopExporterFactory, + errExporterFactory.Type(): errExporterFactory, + }), } for _, dt := range []component.DataType{component.DataTypeTraces, component.DataTypeMetrics, component.DataTypeLogs} { diff --git a/service/service.go b/service/service.go index e45a8a728a3..1848c306856 100644 --- a/service/service.go +++ b/service/service.go @@ -40,29 +40,17 @@ type Settings struct { // BuildInfo provides collector start information. BuildInfo component.BuildInfo - // ReceiverFactories maps receiver type names in the config to the respective receiver.Factory. - ReceiverFactories map[component.Type]receiver.Factory + // Receivers builder for receivers. + Receivers *receiver.Builder - // ReceiverConfigs is a map of component.ID to receivers component.Config. - ReceiverConfigs map[component.ID]component.Config + // Processors builder for processors. + Processors *processor.Builder - // ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory. - ProcessorFactories map[component.Type]processor.Factory + // Exporters builder for exporters. + Exporters *exporter.Builder - // ProcessorConfigs is a map of component.ID to processors component.Config. - ProcessorConfigs map[component.ID]component.Config - - // ExporterFactories maps exporter type names in the config to the respective exporter.Factory. - ExporterFactories map[component.Type]exporter.Factory - - // ExporterConfigs is a map of component.ID to exporters component.Config. - ExporterConfigs map[component.ID]component.Config - - // ExtensionConfigs is a map of component.ID to extensions component.Config. - ExtensionConfigs map[component.ID]component.Config - - // ExtensionFactories maps extension type names in the config to the respective extension.Factory. - ExtensionFactories map[component.Type]extension.Factory + // Extensions builder for extensions. + Extensions *extension.Builder // AsyncErrorChannel is the channel that is used to report fatal errors. AsyncErrorChannel chan error @@ -91,12 +79,12 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { srv := &Service{ buildInfo: set.BuildInfo, host: &serviceHost{ - receiverFactories: set.ReceiverFactories, - processorFactories: set.ProcessorFactories, - exporterFactories: set.ExporterFactories, - extensionFactories: set.ExtensionFactories, - buildInfo: set.BuildInfo, - asyncErrorChannel: set.AsyncErrorChannel, + receivers: set.Receivers, + processors: set.Processors, + exporters: set.Exporters, + extensions: set.Extensions, + buildInfo: set.BuildInfo, + asyncErrorChannel: set.AsyncErrorChannel, }, telemetryInitializer: newColTelemetry(reg), } @@ -137,7 +125,7 @@ func (srv *Service) Start(ctx context.Context) error { zap.Int("NumCPU", runtime.NumCPU()), ) - if err := srv.host.extensions.Start(ctx, srv.host); err != nil { + if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } @@ -145,7 +133,7 @@ func (srv *Service) Start(ctx context.Context) error { return fmt.Errorf("cannot start pipelines: %w", err) } - if err := srv.host.extensions.NotifyPipelineReady(); err != nil { + if err := srv.host.serviceExtensions.NotifyPipelineReady(); err != nil { return err } @@ -160,7 +148,7 @@ func (srv *Service) Shutdown(ctx context.Context) error { // Begin shutdown sequence. srv.telemetrySettings.Logger.Info("Starting shutdown...") - if err := srv.host.extensions.NotifyPipelineNotReady(); err != nil { + if err := srv.host.serviceExtensions.NotifyPipelineNotReady(); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } @@ -168,7 +156,7 @@ func (srv *Service) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err)) } - if err := srv.host.extensions.Shutdown(ctx); err != nil { + if err := srv.host.serviceExtensions.Shutdown(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err)) } @@ -187,25 +175,21 @@ func (srv *Service) Shutdown(ctx context.Context) error { func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error { var err error extensionsSettings := extensions.Settings{ - Telemetry: srv.telemetrySettings, - BuildInfo: srv.buildInfo, - Configs: set.ExtensionConfigs, - Factories: srv.host.extensionFactories, + Telemetry: srv.telemetrySettings, + BuildInfo: srv.buildInfo, + Extensions: srv.host.extensions, } - if srv.host.extensions, err = extensions.New(ctx, extensionsSettings, cfg.Extensions); err != nil { + if srv.host.serviceExtensions, err = extensions.New(ctx, extensionsSettings, cfg.Extensions); err != nil { return fmt.Errorf("failed build extensions: %w", err) } pSet := pipelinesSettings{ - Telemetry: srv.telemetrySettings, - BuildInfo: srv.buildInfo, - ReceiverFactories: srv.host.receiverFactories, - ReceiverConfigs: set.ReceiverConfigs, - ProcessorFactories: srv.host.processorFactories, - ProcessorConfigs: set.ProcessorConfigs, - ExporterFactories: srv.host.exporterFactories, - ExporterConfigs: set.ExporterConfigs, - PipelineConfigs: cfg.Pipelines, + Telemetry: srv.telemetrySettings, + BuildInfo: srv.buildInfo, + Receivers: set.Receivers, + Processors: set.Processors, + Exporters: set.Exporters, + PipelineConfigs: cfg.Pipelines, } if srv.host.pipelines, err = buildPipelines(ctx, pSet); err != nil { return fmt.Errorf("cannot build pipelines: %w", err) diff --git a/service/service_test.go b/service/service_test.go index b24fffebde3..a8d76d10e3f 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" @@ -40,9 +39,7 @@ import ( "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/internal/testutil" - "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" - "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/service/telemetry" ) @@ -151,16 +148,16 @@ func TestServiceGetFactory(t *testing.T) { }) assert.Nil(t, srv.host.GetFactory(component.KindReceiver, "wrongtype")) - assert.Equal(t, set.ReceiverFactories["nop"], srv.host.GetFactory(component.KindReceiver, "nop")) + assert.Equal(t, set.Receivers.Factory("nop"), srv.host.GetFactory(component.KindReceiver, "nop")) assert.Nil(t, srv.host.GetFactory(component.KindProcessor, "wrongtype")) - assert.Equal(t, set.ProcessorFactories["nop"], srv.host.GetFactory(component.KindProcessor, "nop")) + assert.Equal(t, set.Processors.Factory("nop"), srv.host.GetFactory(component.KindProcessor, "nop")) assert.Nil(t, srv.host.GetFactory(component.KindExporter, "wrongtype")) - assert.Equal(t, set.ExporterFactories["nop"], srv.host.GetFactory(component.KindExporter, "nop")) + assert.Equal(t, set.Exporters.Factory("nop"), srv.host.GetFactory(component.KindExporter, "nop")) assert.Nil(t, srv.host.GetFactory(component.KindExtension, "wrongtype")) - assert.Equal(t, set.ExtensionFactories["nop"], srv.host.GetFactory(component.KindExtension, "nop")) + assert.Equal(t, set.Extensions.Factory("nop"), srv.host.GetFactory(component.KindExtension, "nop")) // Try retrieve non existing component.Kind. assert.Nil(t, srv.host.GetFactory(42, "nop")) @@ -250,17 +247,14 @@ func testCollectorStartHelper(t *testing.T, reg *featuregate.Registry, tc ownMet set := newNopSettings() set.BuildInfo = component.BuildInfo{Version: "test version"} - set.ExtensionConfigs[component.NewID("zpages")] = &zpagesextension.Config{ - TCPAddr: confignet.TCPAddr{ - Endpoint: zpagesAddr, - }, - } - set.ExtensionFactories["zpages"] = zpagesextension.NewFactory() + set.Extensions = extension.NewBuilder( + map[component.ID]component.Config{component.NewID("zpages"): &zpagesextension.Config{TCPAddr: confignet.TCPAddr{Endpoint: zpagesAddr}}}, + map[component.Type]extension.Factory{"zpages": zpagesextension.NewFactory()}) set.LoggingOptions = []zap.Option{zap.Hooks(hook)} set.registry = reg cfg := newNopConfig() - cfg.Extensions = []component.ID{component.NewID("nop"), component.NewID("zpages")} + cfg.Extensions = []component.ID{component.NewID("zpages")} cfg.Telemetry.Metrics.Address = metricsAddr cfg.Telemetry.Resource = make(map[string]*string) // Include resource attributes under the service::telemetry::resource key. @@ -395,15 +389,11 @@ func assertZPages(t *testing.T, zpagesAddr string) { func newNopSettings() Settings { return Settings{ - BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]receiver.Factory{"nop": receivertest.NewNopFactory()}, - ReceiverConfigs: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()}, - ProcessorFactories: map[component.Type]processor.Factory{"nop": processortest.NewNopFactory()}, - ProcessorConfigs: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()}, - ExporterFactories: map[component.Type]exporter.Factory{"nop": exportertest.NewNopFactory()}, - ExporterConfigs: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()}, - ExtensionFactories: map[component.Type]extension.Factory{"nop": extensiontest.NewNopFactory()}, - ExtensionConfigs: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()}, + BuildInfo: component.NewDefaultBuildInfo(), + Receivers: receivertest.NewNopBuilder(), + Processors: processortest.NewNopBuilder(), + Exporters: exportertest.NewNopBuilder(), + Extensions: extensiontest.NewNopBuilder(), } } diff --git a/service/zpages.go b/service/zpages.go index 654811dc592..32256a49376 100644 --- a/service/zpages.go +++ b/service/zpages.go @@ -34,7 +34,7 @@ const ( func (host *serviceHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) { mux.HandleFunc(path.Join(pathPrefix, servicezPath), host.zPagesRequest) mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), host.pipelines.HandleZPages) - mux.HandleFunc(path.Join(pathPrefix, extensionzPath), host.extensions.HandleZPages) + mux.HandleFunc(path.Join(pathPrefix, extensionzPath), host.serviceExtensions.HandleZPages) mux.HandleFunc(path.Join(pathPrefix, featurezPath), handleFeaturezRequest) }