Skip to content

Commit

Permalink
Simplify builders for components, remove unnecessary builders (open-t…
Browse files Browse the repository at this point in the history
…elemetry#2624)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Mar 8, 2021
1 parent 98a0672 commit d9099c5
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 88 deletions.
22 changes: 9 additions & 13 deletions service/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,32 +140,28 @@ type dataTypeRequirements map[configmodels.DataType]dataTypeRequirement
// Data type requirements for all exporters.
type exportersRequiredDataTypes map[configmodels.Exporter]dataTypeRequirements

// ExportersBuilder builds exporters from config.
type ExportersBuilder struct {
// exportersBuilder builds exporters from config.
type exportersBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
factories map[configmodels.Type]component.ExporterFactory
}

// NewExportersBuilder creates a new ExportersBuilder. Call BuildExporters() on the returned value.
func NewExportersBuilder(
// BuildExporters builds Exporters from config.
func BuildExporters(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
factories map[configmodels.Type]component.ExporterFactory,
) *ExportersBuilder {
return &ExportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}
}

// BuildExporters exporters from config.
func (eb *ExportersBuilder) Build() (Exporters, error) {
exporters := make(Exporters)
) (Exporters, error) {
eb := &exportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}

// We need to calculate required input data types for each exporter so that we know
// which data type must be started for each exporter.
exporterInputDataTypes := eb.calcExportersRequiredDataTypes()

exporters := make(Exporters)
// BuildExporters exporters based on configuration and required input data types.
for _, cfg := range eb.config.Exporters {
componentLogger := eb.logger.With(zap.String(typeLogKey, string(cfg.Type())), zap.String(nameLogKey, cfg.Name()))
Expand All @@ -180,7 +176,7 @@ func (eb *ExportersBuilder) Build() (Exporters, error) {
return exporters, nil
}

func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDataTypes {
func (eb *exportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDataTypes {

// Go over all pipelines. The data type of the pipeline defines what data type
// each exporter is expected to receive. Collect all required types for each
Expand Down Expand Up @@ -213,7 +209,7 @@ func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDa
return result
}

func (eb *ExportersBuilder) buildExporter(
func (eb *exportersBuilder) buildExporter(
ctx context.Context,
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
Expand Down
10 changes: 5 additions & 5 deletions service/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestExportersBuilder_Build(t *testing.T) {
},
}

exporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)

assert.NoError(t, err)
require.NotNil(t, exporters)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestExportersBuilder_Build(t *testing.T) {
// This should result in creating an exporter that has none of consumption
// functions set.
delete(cfg.Service.Pipelines, "trace")
exporters, err = NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err = BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NotNil(t, exporters)
assert.NoError(t, err)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestExportersBuilder_BuildLogs(t *testing.T) {
},
}

exporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)

assert.NoError(t, err)
require.NotNil(t, exporters)
Expand All @@ -156,7 +156,7 @@ func TestExportersBuilder_BuildLogs(t *testing.T) {
// This should result in creating an exporter that has none of consumption
// functions set.
delete(cfg.Service.Pipelines, "logs")
exporters, err = NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
exporters, err = BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NotNil(t, exporters)
assert.Nil(t, err)

Expand Down Expand Up @@ -260,7 +260,7 @@ func TestExportersBuilder_ErrorOnNilExporter(t *testing.T) {
},
}

exporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, fm).Build()
exporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, fm)
assert.Error(t, err)
assert.Zero(t, len(exporters))
})
Expand Down
18 changes: 7 additions & 11 deletions service/builder/extensions_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,24 @@ func (exts Extensions) ToMap() map[configmodels.NamedEntity]component.Extension
return result
}

// ExportersBuilder builds exporters from config.
type ExtensionsBuilder struct {
// exportersBuilder builds exporters from config.
type extensionsBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
factories map[configmodels.Type]component.ExtensionFactory
}

// NewExportersBuilder creates a new ExportersBuilder. Call BuildExporters() on the returned value.
func NewExtensionsBuilder(
// BuildExtensions builds Extensions from config.
func BuildExtensions(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
factories map[configmodels.Type]component.ExtensionFactory,
) *ExtensionsBuilder {
return &ExtensionsBuilder{logger.With(zap.String(kindLogKey, kindLogExtension)), appInfo, config, factories}
}
) (Extensions, error) {
eb := &extensionsBuilder{logger.With(zap.String(kindLogKey, kindLogExtension)), appInfo, config, factories}

// Build extensions from config.
func (eb *ExtensionsBuilder) Build() (Extensions, error) {
extensions := make(Extensions)

for _, extName := range eb.config.Service.Extensions {
extCfg, exists := eb.config.Extensions[extName]
if !exists {
Expand All @@ -150,7 +146,7 @@ func (eb *ExtensionsBuilder) Build() (Extensions, error) {
return extensions, nil
}

func (eb *ExtensionsBuilder) buildExtension(logger *zap.Logger, appInfo component.ApplicationStartInfo, cfg configmodels.Extension) (*builtExtension, error) {
func (eb *extensionsBuilder) buildExtension(logger *zap.Logger, appInfo component.ApplicationStartInfo, cfg configmodels.Extension) (*builtExtension, error) {
factory := eb.factories[cfg.Type()]
if factory == nil {
return nil, fmt.Errorf("extension factory for type %q is not configured", cfg.Type())
Expand Down
28 changes: 12 additions & 16 deletions service/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,27 @@ func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error {
return consumererror.CombineErrors(errs)
}

// PipelinesBuilder builds pipelines from config.
type PipelinesBuilder struct {
// pipelinesBuilder builds Pipelines from config.
type pipelinesBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
exporters Exporters
factories map[configmodels.Type]component.ProcessorFactory
}

// NewPipelinesBuilder creates a new PipelinesBuilder. Requires exporters to be already
// built via ExportersBuilder. Call BuildProcessors() on the returned value.
func NewPipelinesBuilder(
// BuildPipelines builds pipeline processors from config. Requires exporters to be already
// built via BuildExporters.
func BuildPipelines(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
exporters Exporters,
factories map[configmodels.Type]component.ProcessorFactory,
) *PipelinesBuilder {
return &PipelinesBuilder{logger, appInfo, config, exporters, factories}
}
) (BuiltPipelines, error) {
pb := &pipelinesBuilder{logger, appInfo, config, exporters, factories}

// BuildProcessors pipeline processors from config.
func (pb *PipelinesBuilder) Build() (BuiltPipelines, error) {
pipelineProcessors := make(BuiltPipelines)

for _, pipeline := range pb.config.Service.Pipelines {
firstProcessor, err := pb.buildPipeline(context.Background(), pipeline)
if err != nil {
Expand All @@ -117,7 +113,7 @@ func (pb *PipelinesBuilder) Build() (BuiltPipelines, error) {
// Builds a pipeline of processors. Returns the first processor in the pipeline.
// The last processor in the pipeline will be plugged to fan out the data into exporters
// that are configured for this pipeline.
func (pb *PipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *configmodels.Pipeline) (*builtPipeline, error) {
func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *configmodels.Pipeline) (*builtPipeline, error) {

// BuildProcessors the pipeline backwards.

Expand Down Expand Up @@ -219,7 +215,7 @@ func (pb *PipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
}

// Converts the list of exporter names to a list of corresponding builtExporters.
func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
func (pb *pipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
var result []*builtExporter
for _, name := range exporterNames {
exporter := pb.exporters[pb.config.Exporters[name]]
Expand All @@ -229,7 +225,7 @@ func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*
return result
}

func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TracesConsumer {
func (pb *pipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TracesConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

var exporters []consumer.TracesConsumer
Expand All @@ -241,7 +237,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st
return processor.NewTracesFanOutConnector(exporters)
}

func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

var exporters []consumer.MetricsConsumer
Expand All @@ -253,7 +249,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []
return processor.NewMetricsFanOutConnector(exporters)
}

func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.LogsConsumer {
func (pb *pipelinesBuilder) buildFanoutExportersLogConsumer(exporterNames []string) consumer.LogsConsumer {
builtExporters := pb.getBuiltExportersByNames(exporterNames)

exporters := make([]consumer.LogsConsumer, len(builtExporters))
Expand Down
16 changes: 8 additions & 8 deletions service/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func TestPipelinesBuilder_BuildVarious(t *testing.T) {
cfg := createExampleConfig(dataType)

// BuildProcessors the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
allExporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
if test.shouldFail {
assert.Error(t, err)
return
}

require.NoError(t, err)
require.EqualValues(t, 1, len(allExporters))
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err := BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)

assert.NoError(t, err)
require.NotNil(t, pipelineProcessors)
Expand Down Expand Up @@ -205,9 +205,9 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
require.Nil(t, err)

// BuildProcessors the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
allExporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NoError(t, err)
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err := BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)

assert.NoError(t, err)
require.NotNil(t, pipelineProcessors)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
cfg, err := configtest.LoadConfigFile(t, "testdata/bad_processor_factory.yaml", factories)
require.Nil(t, err)

allExporters, err := NewExportersBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters).Build()
allExporters, err := BuildExporters(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, factories.Exporters)
assert.NoError(t, err)

// First test only trace receivers by removing the metrics pipeline.
Expand All @@ -276,7 +276,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
delete(cfg.Service.Pipelines, "logs")
require.Equal(t, 1, len(cfg.Service.Pipelines))

pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err := BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))

Expand All @@ -285,7 +285,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
cfg.Service.Pipelines["metrics"] = metricsPipeline
require.Equal(t, 1, len(cfg.Service.Pipelines))

pipelineProcessors, err = NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err = BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))

Expand All @@ -294,7 +294,7 @@ func TestProcessorsBuilder_ErrorOnUnsupportedProcessor(t *testing.T) {
cfg.Service.Pipelines["logs"] = logsPipeline
require.Equal(t, 1, len(cfg.Service.Pipelines))

pipelineProcessors, err = NewPipelinesBuilder(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors).Build()
pipelineProcessors, err = BuildPipelines(zap.NewNop(), component.DefaultApplicationStartInfo(), cfg, allExporters, factories.Processors)
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))
}
Expand Down
23 changes: 9 additions & 14 deletions service/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,26 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {
return nil
}

// ReceiversBuilder builds receivers from config.
type ReceiversBuilder struct {
// receiversBuilder builds receivers from config.
type receiversBuilder struct {
logger *zap.Logger
appInfo component.ApplicationStartInfo
config *configmodels.Config
builtPipelines BuiltPipelines
factories map[configmodels.Type]component.ReceiverFactory
}

// NewReceiversBuilder creates a new ReceiversBuilder. Call BuildProcessors() on the returned value.
func NewReceiversBuilder(
// BuildReceivers builds Receivers from config.
func BuildReceivers(
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
config *configmodels.Config,
builtPipelines BuiltPipelines,
factories map[configmodels.Type]component.ReceiverFactory,
) *ReceiversBuilder {
return &ReceiversBuilder{logger.With(zap.String(kindLogKey, kindLogsReceiver)), appInfo, config, builtPipelines, factories}
}
) (Receivers, error) {
rb := &receiversBuilder{logger.With(zap.String(kindLogKey, kindLogsReceiver)), appInfo, config, builtPipelines, factories}

// BuildProcessors receivers from config.
func (rb *ReceiversBuilder) Build() (Receivers, error) {
receivers := make(Receivers)

// BuildProcessors receivers based on configuration.
for _, cfg := range rb.config.Receivers {
logger := rb.logger.With(zap.String(typeLogKey, string(cfg.Type())), zap.String(nameLogKey, cfg.Name()))
rcv, err := rb.buildReceiver(context.Background(), logger, rb.appInfo, cfg)
Expand Down Expand Up @@ -130,7 +125,7 @@ func hasReceiver(pipeline *configmodels.Pipeline, receiverName string) bool {

type attachedPipelines map[configmodels.DataType][]*builtPipeline

func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver) (attachedPipelines, error) {
func (rb *receiversBuilder) findPipelinesToAttach(config configmodels.Receiver) (attachedPipelines, error) {
// A receiver may be attached to multiple pipelines. Pipelines may consume different
// data types. We need to compile the list of pipelines of each type that must be
// attached to this receiver according to configuration.
Expand Down Expand Up @@ -163,7 +158,7 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver)
return pipelinesToAttach, nil
}

func (rb *ReceiversBuilder) attachReceiverToPipelines(
func (rb *receiversBuilder) attachReceiverToPipelines(
ctx context.Context,
logger *zap.Logger,
appInfo component.ApplicationStartInfo,
Expand Down Expand Up @@ -237,7 +232,7 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
return nil
}

func (rb *ReceiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logger, appInfo component.ApplicationStartInfo, config configmodels.Receiver) (*builtReceiver, error) {
func (rb *receiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logger, appInfo component.ApplicationStartInfo, config configmodels.Receiver) (*builtReceiver, error) {

// First find pipelines that must be attached to this receiver.
pipelinesToAttach, err := rb.findPipelinesToAttach(config)
Expand Down
Loading

0 comments on commit d9099c5

Please sign in to comment.