Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify builders for components, remove unnecessary builders #2624

Merged
merged 1 commit into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions service/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,32 +140,28 @@ type dataTypeRequirements map[configmodels.DataType]dataTypeRequirement
// Data type requirements for all exporters.
type exportersRequiredDataTypes map[configmodels.Exporter]dataTypeRequirements

// ExportersBuilder builds exporters from config.
type ExportersBuilder struct {
// exportersBuilder builds exporters from config.
type exportersBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
factories map[configmodels.Type]component.ExporterFactory
}

// NewExportersBuilder creates a new ExportersBuilder. Call BuildExporters() on the returned value.
func NewExportersBuilder(
// BuildExporters builds Exporters from config.
func BuildExporters(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
factories map[configmodels.Type]component.ExporterFactory,
) *ExportersBuilder {
return &ExportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}
}

// BuildExporters exporters from config.
func (eb *ExportersBuilder) Build() (Exporters, error) {
exporters := make(Exporters)
) (Exporters, error) {
eb := &exportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}

// We need to calculate required input data types for each exporter so that we know
// which data type must be started for each exporter.
exporterInputDataTypes := eb.calcExportersRequiredDataTypes()

exporters := make(Exporters)
// BuildExporters exporters based on configuration and required input data types.
for _, cfg := range eb.config.Exporters {
componentLogger := eb.logger.With(zap.String(typeLogKey, string(cfg.Type())), zap.String(nameLogKey, cfg.Name()))
Expand All @@ -180,7 +176,7 @@ func (eb *ExportersBuilder) Build() (Exporters, error) {
return exporters, nil
}

func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDataTypes {
func (eb *exportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDataTypes {

// Go over all pipelines. The data type of the pipeline defines what data type
// each exporter is expected to receive. Collect all required types for each
Expand Down Expand Up @@ -213,7 +209,7 @@ func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDa
return result
}

func (eb *ExportersBuilder) buildExporter(
func (eb *exportersBuilder) buildExporter(
ctx context.Context,
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
Expand Down
10 changes: 5 additions & 5 deletions service/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestExportersBuilder_Build(t *testing.T) {
},
}

exporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)

assert.NoError(t, err)
require.NotNil(t, exporters)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestExportersBuilder_Build(t *testing.T) {
// This should result in creating an exporter that has none of consumption
// functions set.
delete(cfg.Service.Pipelines, "trace")
exporters, err = NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err = BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NotNil(t, exporters)
assert.NoError(t, err)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestExportersBuilder_BuildLogs(t *testing.T) {
},
}

exporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)

assert.NoError(t, err)
require.NotNil(t, exporters)
Expand All @@ -156,7 +156,7 @@ func TestExportersBuilder_BuildLogs(t *testing.T) {
// This should result in creating an exporter that has none of consumption
// functions set.
delete(cfg.Service.Pipelines, "logs")
exporters, err = NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err = BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NotNil(t, exporters)
assert.Nil(t, err)

Expand Down Expand Up @@ -260,7 +260,7 @@ func TestExportersBuilder_ErrorOnNilExporter(t *testing.T) {
},
}

exporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, fm).Build()
exporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, fm)
assert.Error(t, err)
assert.Zero(t, len(exporters))
})
Expand Down
18 changes: 7 additions & 11 deletions service/builder/extensions_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,24 @@ func (exts Extensions) ToMap() map[configmodels.NamedEntity]component.Extension
return result
}

// ExportersBuilder builds exporters from config.
type ExtensionsBuilder struct {
// exportersBuilder builds exporters from config.
type extensionsBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
factories map[configmodels.Type]component.ExtensionFactory
}

// NewExportersBuilder creates a new ExportersBuilder. Call BuildExporters() on the returned value.
func NewExtensionsBuilder(
// BuildExtensions builds Extensions from config.
func BuildExtensions(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
factories map[configmodels.Type]component.ExtensionFactory,
) *ExtensionsBuilder {
return &ExtensionsBuilder{logger.With(zap.String(kindLogKey, kindLogExtension)), appInfo, config, factories}
}
) (Extensions, error) {
eb := &extensionsBuilder{logger.With(zap.String(kindLogKey, kindLogExtension)), appInfo, config, factories}

// Build extensions from config.
func (eb *ExtensionsBuilder) Build() (Extensions, error) {
extensions := make(Extensions)

for _, extName := range eb.config.Service.Extensions {
extCfg, exists := eb.config.Extensions[extName]
if !exists {
Expand All @@ -150,7 +146,7 @@ func (eb *ExtensionsBuilder) Build() (Extensions, error) {
return extensions, nil
}

func (eb *ExtensionsBuilder) buildExtension(logger *zap.Logger, appInfo component.ApplicationStartInfo, cfg configmodels.Extension) (*builtExtension, error) {
func (eb *extensionsBuilder) buildExtension(logger *zap.Logger, appInfo component.ApplicationStartInfo, cfg configmodels.Extension) (*builtExtension, error) {
factory := eb.factories[cfg.Type()]
if factory == nil {
return nil, fmt.Errorf("extension factory for type %q is not configured", cfg.Type())
Expand Down
28 changes: 12 additions & 16 deletions service/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,27 @@ func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error {
return consumererror.CombineErrors(errs)
}

// PipelinesBuilder builds pipelines from config.
type PipelinesBuilder struct {
// pipelinesBuilder builds Pipelines from config.
type pipelinesBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
exporters Exporters
factories map[configmodels.Type]component.ProcessorFactory
}

// NewPipelinesBuilder creates a new PipelinesBuilder. Requires exporters to be already
// built via ExportersBuilder. Call BuildProcessors() on the returned value.
func NewPipelinesBuilder(
// BuildPipelines builds pipeline processors from config. Requires exporters to be already
// built via BuildExporters.
func BuildPipelines(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
exporters Exporters,
factories map[configmodels.Type]component.ProcessorFactory,
) *PipelinesBuilder {
return &PipelinesBuilder{logger, appInfo, config, exporters, factories}
}
) (BuiltPipelines, error) {
pb := &pipelinesBuilder{logger, appInfo, config, exporters, factories}

// BuildProcessors pipeline processors from config.
func (pb *PipelinesBuilder) Build() (BuiltPipelines, error) {
pipelineProcessors := make(BuiltPipelines)

for _, pipeline := range pb.config.Service.Pipelines {
firstProcessor, err := pb.buildPipeline(context.Background(), pipeline)
if err != nil {
Expand All @@ -117,7 +113,7 @@ func (pb *PipelinesBuilder) Build() (BuiltPipelines, error) {
// Builds a pipeline of processors. Returns the first processor in the pipeline.
// The last processor in the pipeline will be plugged to fan out the data into exporters
// that are configured for this pipeline.
func (pb *PipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *configmodels.Pipeline) (*builtPipeline, error) {
func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *configmodels.Pipeline) (*builtPipeline, error) {

// BuildProcessors the pipeline backwards.

Expand Down Expand Up @@ -219,7 +215,7 @@ func (pb *PipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
}

// Converts the list of exporter names to a list of corresponding builtExporters.
func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
func (pb *pipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
var result []*builtExporter
for _, name := range exporterNames {
exporter := pb.exporters[pb.config.Exporters[name]]
Expand All @@ -229,7 +225,7 @@ func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*
return result
}

func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TracesConsumer {
func (pb *pipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TracesConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

var exporters []consumer.TracesConsumer
Expand All @@ -241,7 +237,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st
return processor.NewTracesFanOutConnector(exporters)
}

func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

var exporters []consumer.MetricsConsumer
Expand All @@ -253,7 +249,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []
return processor.NewMetricsFanOutConnector(exporters)
}

func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.LogsConsumer {
func (pb *pipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.LogsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

exporters := make([]consumer.LogsConsumer, len(builtExporters))
Expand Down
16 changes: 8 additions & 8 deletions service/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func TestPipelinesBuilder_BuildVarious(t *testing.T) {
cfg := createExampleConfig(dataType)

// BuildProcessors the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
allExporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
if test.shouldFail {
assert.Error(t, err)
return
}

require.NoError(t, err)
require.EqualValues(t, 1, len(allExporters))
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err := BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)

assert.NoError(t, err)
require.NotNil(t, pipelineProcessors)
Expand Down Expand Up @@ -205,9 +205,9 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
require.Nil(t, err)

// BuildProcessors the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
allExporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NoError(t, err)
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err := BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)

assert.NoError(t, err)
require.NotNil(t, pipelineProcessors)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
cfg, err := configtest.LoadConfigFile(t, "testdata/bad_processor_factory.yaml", factories)
require.Nil(t, err)

allExporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
allExporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NoError(t, err)

// First test only trace receivers by removing the metrics pipeline.
Expand All @@ -276,7 +276,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
delete(cfg.Service.Pipelines, "logs")
require.Equal(t, 1, len(cfg.Service.Pipelines))

pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err := BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))

Expand All @@ -285,7 +285,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
cfg.Service.Pipelines["metrics"] = metricsPipeline
require.Equal(t, 1, len(cfg.Service.Pipelines))

pipelineProcessors, err = NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err = BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))

Expand All @@ -294,7 +294,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
cfg.Service.Pipelines["logs"] = logsPipeline
require.Equal(t, 1, len(cfg.Service.Pipelines))

pipelineProcessors, err = NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err = BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))
}
Expand Down
23 changes: 9 additions & 14 deletions service/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,26 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {
return nil
}

// ReceiversBuilder builds receivers from config.
type ReceiversBuilder struct {
// receiversBuilder builds receivers from config.
type receiversBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
builtPipelines BuiltPipelines
factories map[configmodels.Type]component.ReceiverFactory
}

// NewReceiversBuilder creates a new ReceiversBuilder. Call BuildProcessors() on the returned value.
func NewReceiversBuilder(
// BuildReceivers builds Receivers from config.
func BuildReceivers(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
builtPipelines BuiltPipelines,
factories map[configmodels.Type]component.ReceiverFactory,
) *ReceiversBuilder {
return &ReceiversBuilder{logger.With(zap.String(kindLogKey, kindLogsReceiver)), appInfo, config, builtPipelines, factories}
}
) (Receivers, error) {
rb := &receiversBuilder{logger.With(zap.String(kindLogKey, kindLogsReceiver)), appInfo, config, builtPipelines, factories}

// BuildProcessors receivers from config.
func (rb *ReceiversBuilder) Build() (Receivers, error) {
receivers := make(Receivers)

// BuildProcessors receivers based on configuration.
for _, cfg := range rb.config.Receivers {
logger := rb.logger.With(zap.String(typeLogKey, string(cfg.Type())), zap.String(nameLogKey, cfg.Name()))
rcv, err := rb.buildReceiver(context.Background(), logger, rb.appInfo, cfg)
Expand Down Expand Up @@ -130,7 +125,7 @@ func hasReceiver(pipeline *configmodels.Pipeline, receiverName string) bool {

type attachedPipelines map[configmodels.DataType][]*builtPipeline

func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver) (attachedPipelines, error) {
func (rb *receiversBuilder) findPipelinesToAttach(config configmodels.Receiver) (attachedPipelines, error) {
// A receiver may be attached to multiple pipelines. Pipelines may consume different
// data types. We need to compile the list of pipelines of each type that must be
// attached to this receiver according to configuration.
Expand Down Expand Up @@ -163,7 +158,7 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver)
return pipelinesToAttach, nil
}

func (rb *ReceiversBuilder) attachReceiverToPipelines(
func (rb *receiversBuilder) attachReceiverToPipelines(
ctx context.Context,
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
Expand Down Expand Up @@ -237,7 +232,7 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
return nil
}

func (rb *ReceiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logger, appInfo component.ApplicationStartInfo, config configmodels.Receiver) (*builtReceiver, error) {
func (rb *receiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logger, appInfo component.ApplicationStartInfo, config configmodels.Receiver) (*builtReceiver, error) {

// First find pipelines that must be attached to this receiver.
pipelinesToAttach, err := rb.findPipelinesToAttach(config)
Expand Down
Loading