From fce05a8fc53b86ed8c98566f776c766b8d5c0feb Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 2 Feb 2023 13:09:22 -0500 Subject: [PATCH] Remove node build methods. Pull CreateSettings into buildComponent functions. --- service/graph.go | 49 ++++++++++++++---- service/graph_test.go | 24 ++++----- service/nodes.go | 116 ++++++++---------------------------------- service/pipelines.go | 85 ++++++++++++++----------------- 4 files changed, 112 insertions(+), 162 deletions(-) diff --git a/service/graph.go b/service/graph.go index 3c5167d3767..b779257fd2b 100644 --- a/service/graph.go +++ b/service/graph.go @@ -24,6 +24,8 @@ import ( "gonum.org/v1/gonum/graph/topo" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/service/internal/fanoutconsumer" ) var _ pipelines = (*pipelinesGraph)(nil) @@ -61,7 +63,7 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) { for pipelineID, pipelineCfg := range set.PipelineConfigs { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { - if set.Connectors.IsConfigured(recvID) { + if set.ConnectorBuilder.IsConfigured(recvID) { connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } @@ -78,7 +80,7 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) { pipe.fanOutNode = newFanOutNode(pipelineID) for _, exprID := range pipelineCfg.Exporters { - if set.Connectors.IsConfigured(exprID) { + if set.ConnectorBuilder.IsConfigured(exprID) { connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } @@ -163,17 +165,46 @@ func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSetti node := nodes[i] switch n := node.(type) { case *receiverNode: - err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Receivers, g.nextConsumers(n.ID())) + n.Component, err = buildReceiver(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, + component.NewIDWithName(n.pipelineType, "*"), g.nextConsumers(n.ID())) case *processorNode: - err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Processors, g.nextConsumers(n.ID())[0]) - case *connectorNode: - err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Connectors, g.nextConsumers(n.ID())) + n.Component, err = buildProcessor(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, + n.pipelineID, g.nextConsumers(n.ID())[0]) case *exporterNode: - err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Exporters) + n.Component, err = buildExporter(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ExporterBuilder, + component.NewIDWithName(n.pipelineType, "*")) + case *connectorNode: + n.Component, err = buildConnector(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, + n.exprPipelineType, n.rcvrPipelineType, g.nextConsumers(n.ID())) case *capabilitiesNode: - n.build(g.nextConsumers(n.ID())[0], g.pipelines[n.pipelineID].processors) + n.baseConsumer = g.nextConsumers(n.ID())[0] + for _, proc := range g.pipelines[n.pipelineID].processors { + n.Capabilities.MutatesData = n.Capabilities.MutatesData || + proc.Component.(baseConsumer).Capabilities().MutatesData + } case *fanOutNode: - n.build(g.nextConsumers(n.ID())) + nexts := g.nextConsumers(n.ID()) + switch n.pipelineID.Type() { + case component.DataTypeTraces: + consumers := make([]consumer.Traces, 0, len(nexts)) + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Traces)) + } + n.baseConsumer = fanoutconsumer.NewTraces(consumers) + case component.DataTypeMetrics: + consumers := make([]consumer.Metrics, 0, len(nexts)) + for _, next := range nexts { + + consumers = append(consumers, next.(consumer.Metrics)) + } + n.baseConsumer = fanoutconsumer.NewMetrics(consumers) + case component.DataTypeLogs: + consumers := make([]consumer.Logs, 0, len(nexts)) + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Logs)) + } + n.baseConsumer = fanoutconsumer.NewLogs(consumers) + } } if err != nil { return err diff --git a/service/graph_test.go b/service/graph_test.go index 52423213d10..5dc2ef4008a 100644 --- a/service/graph_test.go +++ b/service/graph_test.go @@ -577,7 +577,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { set := pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - Receivers: receiver.NewBuilder( + ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), @@ -586,7 +586,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, }, ), - Processors: processor.NewBuilder( + ProcessorBuilder: processor.NewBuilder( map[component.ID]component.Config{ component.NewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), component.NewIDWithName("exampleprocessor", "1"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), @@ -595,7 +595,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory, }, ), - Exporters: exporter.NewBuilder( + ExporterBuilder: exporter.NewBuilder( map[component.ID]component.Config{ component.NewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), component.NewIDWithName("exampleexporter", "1"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), @@ -604,7 +604,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory, }, ), - Connectors: connector.NewBuilder( + ConnectorBuilder: connector.NewBuilder( map[component.ID]component.Config{ component.NewID("exampleconnector"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), component.NewIDWithName("exampleconnector", "fork"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), @@ -1548,25 +1548,25 @@ func TestGraphBuildErrors(t *testing.T) { set := pipelinesSettings{ BuildInfo: component.NewDefaultBuildInfo(), Telemetry: componenttest.NewNopTelemetrySettings(), - Receivers: receiver.NewBuilder( + ReceiverBuilder: receiver.NewBuilder( test.receiverCfgs, map[component.Type]receiver.Factory{ nopReceiverFactory.Type(): nopReceiverFactory, badReceiverFactory.Type(): badReceiverFactory, }), - Processors: processor.NewBuilder( + ProcessorBuilder: processor.NewBuilder( test.processorCfgs, map[component.Type]processor.Factory{ nopProcessorFactory.Type(): nopProcessorFactory, badProcessorFactory.Type(): badProcessorFactory, }), - Exporters: exporter.NewBuilder( + ExporterBuilder: exporter.NewBuilder( test.exporterCfgs, map[component.Type]exporter.Factory{ nopExporterFactory.Type(): nopExporterFactory, badExporterFactory.Type(): badExporterFactory, }), - Connectors: connector.NewBuilder( + ConnectorBuilder: connector.NewBuilder( test.connectorCfgs, map[component.Type]connector.Factory{ nopConnectorFactory.Type(): nopConnectorFactory, @@ -1600,7 +1600,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { set := pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - Receivers: receiver.NewBuilder( + ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), @@ -1609,7 +1609,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { nopReceiverFactory.Type(): nopReceiverFactory, errReceiverFactory.Type(): errReceiverFactory, }), - Processors: processor.NewBuilder( + ProcessorBuilder: processor.NewBuilder( map[component.ID]component.Config{ component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), @@ -1618,7 +1618,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { nopProcessorFactory.Type(): nopProcessorFactory, errProcessorFactory.Type(): errProcessorFactory, }), - Exporters: exporter.NewBuilder( + ExporterBuilder: exporter.NewBuilder( map[component.ID]component.Config{ component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), @@ -1627,7 +1627,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { nopExporterFactory.Type(): nopExporterFactory, errExporterFactory.Type(): errExporterFactory, }), - Connectors: connector.NewBuilder( + ConnectorBuilder: connector.NewBuilder( map[component.ID]component.Config{ component.NewIDWithName(nopConnectorFactory.Type(), "conn"): nopConnectorFactory.CreateDefaultConfig(), component.NewIDWithName(errConnectorFactory.Type(), "conn"): errConnectorFactory.CreateDefaultConfig(), diff --git a/service/nodes.go b/service/nodes.go index 3d2463db778..29fde3fea8d 100644 --- a/service/nodes.go +++ b/service/nodes.go @@ -22,9 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/fanoutconsumer" ) @@ -71,20 +68,6 @@ func newReceiverNode(pipelineID component.ID, recvID component.ID) *receiverNode } } -func (n *receiverNode) build( - ctx context.Context, - tel component.TelemetrySettings, - info component.BuildInfo, - builder *receiver.Builder, - nexts []baseConsumer, -) error { - set := receiver.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - set.TelemetrySettings.Logger = components.ReceiverLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineType) - r, err := buildReceiver(ctx, set, builder, component.NewIDWithName(n.pipelineType, "*"), nexts) - n.Component = r - return err -} - var _ consumerNode = &processorNode{} // Every processor instance is unique to one pipeline. @@ -108,20 +91,6 @@ func (n *processorNode) getConsumer() baseConsumer { return n.Component.(baseConsumer) } -func (n *processorNode) build( - ctx context.Context, - tel component.TelemetrySettings, - info component.BuildInfo, - builder *processor.Builder, - next baseConsumer, -) error { - set := processor.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - set.TelemetrySettings.Logger = components.ProcessorLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineID) - p, err := buildProcessor(ctx, set, builder, n.pipelineID, next) - n.Component = p - return err -} - var _ consumerNode = &exporterNode{} // An exporter instance can be shared by multiple pipelines of the same type. @@ -145,19 +114,6 @@ func (n *exporterNode) getConsumer() baseConsumer { return n.Component.(baseConsumer) } -func (n *exporterNode) build( - ctx context.Context, - tel component.TelemetrySettings, - info component.BuildInfo, - builder *exporter.Builder, -) error { - set := exporter.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - set.TelemetrySettings.Logger = components.ExporterLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineType) - e, err := buildExporter(ctx, set, builder, component.NewIDWithName(n.pipelineType, "*")) - n.Component = e - return err -} - var _ consumerNode = &connectorNode{} // A connector instance connects one pipeline type to one other pipeline type. @@ -183,31 +139,33 @@ func (n *connectorNode) getConsumer() baseConsumer { return n.Component.(baseConsumer) } -func (n *connectorNode) build( +func buildConnector( ctx context.Context, + componentID component.ID, tel component.TelemetrySettings, info component.BuildInfo, builder *connector.Builder, + exprPipelineType component.Type, + rcvrPipelineType component.Type, nexts []baseConsumer, -) error { - set := connector.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - set.TelemetrySettings.Logger = components.ConnectorLogger(set.TelemetrySettings.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) +) (conn component.Component, err error) { + set := connector.CreateSettings{ID: componentID, TelemetrySettings: tel, BuildInfo: info} + set.TelemetrySettings.Logger = components.ConnectorLogger(set.TelemetrySettings.Logger, componentID, exprPipelineType, rcvrPipelineType) - var err error - switch n.rcvrPipelineType { + switch rcvrPipelineType { case component.DataTypeTraces: var consumers []consumer.Traces for _, next := range nexts { consumers = append(consumers, next.(consumer.Traces)) } fanoutConsumer := fanoutconsumer.NewTraces(consumers) - switch n.exprPipelineType { + switch exprPipelineType { case component.DataTypeTraces: - n.Component, err = builder.CreateTracesToTraces(ctx, set, fanoutConsumer) + conn, err = builder.CreateTracesToTraces(ctx, set, fanoutConsumer) case component.DataTypeMetrics: - n.Component, err = builder.CreateMetricsToTraces(ctx, set, fanoutConsumer) + conn, err = builder.CreateMetricsToTraces(ctx, set, fanoutConsumer) case component.DataTypeLogs: - n.Component, err = builder.CreateLogsToTraces(ctx, set, fanoutConsumer) + conn, err = builder.CreateLogsToTraces(ctx, set, fanoutConsumer) } case component.DataTypeMetrics: var consumers []consumer.Metrics @@ -215,13 +173,13 @@ func (n *connectorNode) build( consumers = append(consumers, next.(consumer.Metrics)) } fanoutConsumer := fanoutconsumer.NewMetrics(consumers) - switch n.exprPipelineType { + switch exprPipelineType { case component.DataTypeTraces: - n.Component, err = builder.CreateTracesToMetrics(ctx, set, fanoutConsumer) + conn, err = builder.CreateTracesToMetrics(ctx, set, fanoutConsumer) case component.DataTypeMetrics: - n.Component, err = builder.CreateMetricsToMetrics(ctx, set, fanoutConsumer) + conn, err = builder.CreateMetricsToMetrics(ctx, set, fanoutConsumer) case component.DataTypeLogs: - n.Component, err = builder.CreateLogsToMetrics(ctx, set, fanoutConsumer) + conn, err = builder.CreateLogsToMetrics(ctx, set, fanoutConsumer) } case component.DataTypeLogs: var consumers []consumer.Logs @@ -229,16 +187,16 @@ func (n *connectorNode) build( consumers = append(consumers, next.(consumer.Logs)) } fanoutConsumer := fanoutconsumer.NewLogs(consumers) - switch n.exprPipelineType { + switch exprPipelineType { case component.DataTypeTraces: - n.Component, err = builder.CreateTracesToLogs(ctx, set, fanoutConsumer) + conn, err = builder.CreateTracesToLogs(ctx, set, fanoutConsumer) case component.DataTypeMetrics: - n.Component, err = builder.CreateMetricsToLogs(ctx, set, fanoutConsumer) + conn, err = builder.CreateMetricsToLogs(ctx, set, fanoutConsumer) case component.DataTypeLogs: - n.Component, err = builder.CreateLogsToLogs(ctx, set, fanoutConsumer) + conn, err = builder.CreateLogsToLogs(ctx, set, fanoutConsumer) } } - return err + return } var _ consumerNode = &capabilitiesNode{} @@ -267,14 +225,6 @@ func (n *capabilitiesNode) getConsumer() baseConsumer { return n.baseConsumer } -func (n *capabilitiesNode) build(next baseConsumer, processors []*processorNode) { - n.baseConsumer = next - for _, proc := range processors { - n.Capabilities.MutatesData = n.Capabilities.MutatesData || - proc.Component.(baseConsumer).Capabilities().MutatesData - } -} - var _ consumerNode = &fanOutNode{} // Each pipeline has one fan-out node before exporters. @@ -295,27 +245,3 @@ func newFanOutNode(pipelineID component.ID) *fanOutNode { func (n *fanOutNode) getConsumer() baseConsumer { return n.baseConsumer } - -func (n *fanOutNode) build(nexts []baseConsumer) { - switch n.pipelineID.Type() { - case component.DataTypeTraces: - consumers := make([]consumer.Traces, 0, len(nexts)) - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Traces)) - } - n.baseConsumer = fanoutconsumer.NewTraces(consumers) - case component.DataTypeMetrics: - consumers := make([]consumer.Metrics, 0, len(nexts)) - for _, next := range nexts { - - consumers = append(consumers, next.(consumer.Metrics)) - } - n.baseConsumer = fanoutconsumer.NewMetrics(consumers) - case component.DataTypeLogs: - consumers := make([]consumer.Logs, 0, len(nexts)) - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Logs)) - } - n.baseConsumer = fanoutconsumer.NewLogs(consumers) - } -} diff --git a/service/pipelines.go b/service/pipelines.go index edfcbf2d583..018faf6bd07 100644 --- a/service/pipelines.go +++ b/service/pipelines.go @@ -239,13 +239,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro continue } - cSet := exporter.CreateSettings{ - ID: expID, - TelemetrySettings: set.Telemetry, - BuildInfo: set.BuildInfo, - } - cSet.TelemetrySettings.Logger = components.ExporterLogger(set.Telemetry.Logger, expID, pipelineID.Type()) - exp, err := buildExporter(ctx, cSet, set.ExporterBuilder, pipelineID) + exp, err := buildExporter(ctx, expID, set.Telemetry, set.BuildInfo, set.ExporterBuilder, pipelineID) if err != nil { return nil, err } @@ -272,14 +266,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro // consumer for the one that precedes it in the pipeline and so on. for i := len(pipeline.Processors) - 1; i >= 0; i-- { procID := pipeline.Processors[i] - - cSet := processor.CreateSettings{ - ID: procID, - TelemetrySettings: set.Telemetry, - BuildInfo: set.BuildInfo, - } - cSet.TelemetrySettings.Logger = components.ProcessorLogger(set.Telemetry.Logger, procID, pipelineID) - proc, err := buildProcessor(ctx, cSet, set.ProcessorBuilder, pipelineID, bp.lastConsumer) + proc, err := buildProcessor(ctx, procID, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, pipelineID, bp.lastConsumer) if err != nil { return nil, err } @@ -330,13 +317,7 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro continue } - cSet := receiver.CreateSettings{ - ID: recvID, - TelemetrySettings: set.Telemetry, - BuildInfo: set.BuildInfo, - } - cSet.TelemetrySettings.Logger = components.ReceiverLogger(set.Telemetry.Logger, recvID, pipelineID.Type()) - recv, err := buildReceiver(ctx, cSet, set.ReceiverBuilder, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) + recv, err := buildReceiver(ctx, recvID, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) if err != nil { return nil, err } @@ -348,28 +329,6 @@ func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, erro return exps, nil } -func buildExporter( - ctx context.Context, - set exporter.CreateSettings, - builder *exporter.Builder, - pipelineID component.ID, -) (exp component.Component, err error) { - switch pipelineID.Type() { - case component.DataTypeTraces: - exp, err = builder.CreateTraces(ctx, set) - case component.DataTypeMetrics: - exp, err = builder.CreateMetrics(ctx, set) - case component.DataTypeLogs: - exp, err = builder.CreateLogs(ctx, set) - default: - return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) - } - if err != nil { - return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", set.ID, pipelineID, err) - } - return exp, nil -} - func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces { consumers := make([]consumer.Traces, 0, len(exporters)) for _, exp := range exporters { @@ -397,12 +356,42 @@ func buildFanOutExportersLogsConsumer(exporters []builtComponent) consumer.Logs return fanoutconsumer.NewLogs(consumers) } +func buildExporter( + ctx context.Context, + componentID component.ID, + tel component.TelemetrySettings, + info component.BuildInfo, + builder *exporter.Builder, + pipelineID component.ID, +) (exp component.Component, err error) { + set := exporter.CreateSettings{ID: componentID, TelemetrySettings: tel, BuildInfo: info} + set.TelemetrySettings.Logger = components.ExporterLogger(set.TelemetrySettings.Logger, componentID, pipelineID.Type()) + switch pipelineID.Type() { + case component.DataTypeTraces: + exp, err = builder.CreateTraces(ctx, set) + case component.DataTypeMetrics: + exp, err = builder.CreateMetrics(ctx, set) + case component.DataTypeLogs: + exp, err = builder.CreateLogs(ctx, set) + default: + return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", set.ID, pipelineID, pipelineID.Type()) + } + if err != nil { + return nil, fmt.Errorf("failed to create %q exporter, in pipeline %q: %w", set.ID, pipelineID, err) + } + return exp, nil +} + func buildProcessor(ctx context.Context, - set processor.CreateSettings, + componentID component.ID, + tel component.TelemetrySettings, + info component.BuildInfo, builder *processor.Builder, pipelineID component.ID, next baseConsumer, ) (proc component.Component, err error) { + set := processor.CreateSettings{ID: componentID, TelemetrySettings: tel, BuildInfo: info} + set.TelemetrySettings.Logger = components.ProcessorLogger(set.TelemetrySettings.Logger, componentID, pipelineID) switch pipelineID.Type() { case component.DataTypeTraces: proc, err = builder.CreateTraces(ctx, set, next.(consumer.Traces)) @@ -420,11 +409,15 @@ func buildProcessor(ctx context.Context, } func buildReceiver(ctx context.Context, - set receiver.CreateSettings, + componentID component.ID, + tel component.TelemetrySettings, + info component.BuildInfo, builder *receiver.Builder, pipelineID component.ID, nexts []baseConsumer, ) (recv component.Component, err error) { + set := receiver.CreateSettings{ID: componentID, TelemetrySettings: tel, BuildInfo: info} + set.TelemetrySettings.Logger = components.ReceiverLogger(tel.Logger, componentID, pipelineID.Type()) switch pipelineID.Type() { case component.DataTypeTraces: var consumers []consumer.Traces