Skip to content

Commit

Permalink
Remove node build methods. Pull CreateSettings into buildComponent fu…
Browse files Browse the repository at this point in the history
…nctions.
  • Loading branch information
djaglowski committed Feb 2, 2023
1 parent b16ba00 commit fce05a8
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 162 deletions.
49 changes: 40 additions & 9 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

var _ pipelines = (*pipelinesGraph)(nil)
Expand Down Expand Up @@ -61,7 +63,7 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
if set.Connectors.IsConfigured(recvID) {
if set.ConnectorBuilder.IsConfigured(recvID) {
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
Expand All @@ -78,7 +80,7 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
pipe.fanOutNode = newFanOutNode(pipelineID)

for _, exprID := range pipelineCfg.Exporters {
if set.Connectors.IsConfigured(exprID) {
if set.ConnectorBuilder.IsConfigured(exprID) {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
Expand Down Expand Up @@ -163,17 +165,46 @@ func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSetti
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Receivers, g.nextConsumers(n.ID()))
n.Component, err = buildReceiver(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ReceiverBuilder,
component.NewIDWithName(n.pipelineType, "*"), g.nextConsumers(n.ID()))
case *processorNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Processors, g.nextConsumers(n.ID())[0])
case *connectorNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Connectors, g.nextConsumers(n.ID()))
n.Component, err = buildProcessor(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ProcessorBuilder,
n.pipelineID, g.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Exporters)
n.Component, err = buildExporter(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ExporterBuilder,
component.NewIDWithName(n.pipelineType, "*"))
case *connectorNode:
n.Component, err = buildConnector(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ConnectorBuilder,
n.exprPipelineType, n.rcvrPipelineType, g.nextConsumers(n.ID()))
case *capabilitiesNode:
n.build(g.nextConsumers(n.ID())[0], g.pipelines[n.pipelineID].processors)
n.baseConsumer = g.nextConsumers(n.ID())[0]
for _, proc := range g.pipelines[n.pipelineID].processors {
n.Capabilities.MutatesData = n.Capabilities.MutatesData ||
proc.Component.(baseConsumer).Capabilities().MutatesData
}
case *fanOutNode:
n.build(g.nextConsumers(n.ID()))
nexts := g.nextConsumers(n.ID())
switch n.pipelineID.Type() {
case component.DataTypeTraces:
consumers := make([]consumer.Traces, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.baseConsumer = fanoutconsumer.NewTraces(consumers)
case component.DataTypeMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {

consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
case component.DataTypeLogs:
consumers := make([]consumer.Logs, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
}
}
if err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions service/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
set := pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Receivers: receiver.NewBuilder(
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
component.NewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(),
component.NewIDWithName("examplereceiver", "1"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(),
Expand All @@ -586,7 +586,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory,
},
),
Processors: processor.NewBuilder(
ProcessorBuilder: processor.NewBuilder(
map[component.ID]component.Config{
component.NewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(),
component.NewIDWithName("exampleprocessor", "1"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(),
Expand All @@ -595,7 +595,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory,
},
),
Exporters: exporter.NewBuilder(
ExporterBuilder: exporter.NewBuilder(
map[component.ID]component.Config{
component.NewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
component.NewIDWithName("exampleexporter", "1"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
Expand All @@ -604,7 +604,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory,
},
),
Connectors: connector.NewBuilder(
ConnectorBuilder: connector.NewBuilder(
map[component.ID]component.Config{
component.NewID("exampleconnector"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(),
component.NewIDWithName("exampleconnector", "fork"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(),
Expand Down Expand Up @@ -1548,25 +1548,25 @@ func TestGraphBuildErrors(t *testing.T) {
set := pipelinesSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Telemetry: componenttest.NewNopTelemetrySettings(),
Receivers: receiver.NewBuilder(
ReceiverBuilder: receiver.NewBuilder(
test.receiverCfgs,
map[component.Type]receiver.Factory{
nopReceiverFactory.Type(): nopReceiverFactory,
badReceiverFactory.Type(): badReceiverFactory,
}),
Processors: processor.NewBuilder(
ProcessorBuilder: processor.NewBuilder(
test.processorCfgs,
map[component.Type]processor.Factory{
nopProcessorFactory.Type(): nopProcessorFactory,
badProcessorFactory.Type(): badProcessorFactory,
}),
Exporters: exporter.NewBuilder(
ExporterBuilder: exporter.NewBuilder(
test.exporterCfgs,
map[component.Type]exporter.Factory{
nopExporterFactory.Type(): nopExporterFactory,
badExporterFactory.Type(): badExporterFactory,
}),
Connectors: connector.NewBuilder(
ConnectorBuilder: connector.NewBuilder(
test.connectorCfgs,
map[component.Type]connector.Factory{
nopConnectorFactory.Type(): nopConnectorFactory,
Expand Down Expand Up @@ -1600,7 +1600,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
set := pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Receivers: receiver.NewBuilder(
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(),
component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(),
Expand All @@ -1609,7 +1609,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
nopReceiverFactory.Type(): nopReceiverFactory,
errReceiverFactory.Type(): errReceiverFactory,
}),
Processors: processor.NewBuilder(
ProcessorBuilder: processor.NewBuilder(
map[component.ID]component.Config{
component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(),
component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(),
Expand All @@ -1618,7 +1618,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
nopProcessorFactory.Type(): nopProcessorFactory,
errProcessorFactory.Type(): errProcessorFactory,
}),
Exporters: exporter.NewBuilder(
ExporterBuilder: exporter.NewBuilder(
map[component.ID]component.Config{
component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(),
component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(),
Expand All @@ -1627,7 +1627,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
nopExporterFactory.Type(): nopExporterFactory,
errExporterFactory.Type(): errExporterFactory,
}),
Connectors: connector.NewBuilder(
ConnectorBuilder: connector.NewBuilder(
map[component.ID]component.Config{
component.NewIDWithName(nopConnectorFactory.Type(), "conn"): nopConnectorFactory.CreateDefaultConfig(),
component.NewIDWithName(errConnectorFactory.Type(), "conn"): errConnectorFactory.CreateDefaultConfig(),
Expand Down
116 changes: 21 additions & 95 deletions service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)
Expand Down Expand Up @@ -71,20 +68,6 @@ func newReceiverNode(pipelineID component.ID, recvID component.ID) *receiverNode
}
}

func (n *receiverNode) build(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *receiver.Builder,
nexts []baseConsumer,
) error {
set := receiver.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ReceiverLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineType)
r, err := buildReceiver(ctx, set, builder, component.NewIDWithName(n.pipelineType, "*"), nexts)
n.Component = r
return err
}

var _ consumerNode = &processorNode{}

// Every processor instance is unique to one pipeline.
Expand All @@ -108,20 +91,6 @@ func (n *processorNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *processorNode) build(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *processor.Builder,
next baseConsumer,
) error {
set := processor.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ProcessorLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineID)
p, err := buildProcessor(ctx, set, builder, n.pipelineID, next)
n.Component = p
return err
}

var _ consumerNode = &exporterNode{}

// An exporter instance can be shared by multiple pipelines of the same type.
Expand All @@ -145,19 +114,6 @@ func (n *exporterNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *exporterNode) build(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *exporter.Builder,
) error {
set := exporter.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ExporterLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineType)
e, err := buildExporter(ctx, set, builder, component.NewIDWithName(n.pipelineType, "*"))
n.Component = e
return err
}

var _ consumerNode = &connectorNode{}

// A connector instance connects one pipeline type to one other pipeline type.
Expand All @@ -183,62 +139,64 @@ func (n *connectorNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *connectorNode) build(
func buildConnector(
ctx context.Context,
componentID component.ID,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *connector.Builder,
exprPipelineType component.Type,
rcvrPipelineType component.Type,
nexts []baseConsumer,
) error {
set := connector.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ConnectorLogger(set.TelemetrySettings.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType)
) (conn component.Component, err error) {
set := connector.CreateSettings{ID: componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ConnectorLogger(set.TelemetrySettings.Logger, componentID, exprPipelineType, rcvrPipelineType)

var err error
switch n.rcvrPipelineType {
switch rcvrPipelineType {
case component.DataTypeTraces:
var consumers []consumer.Traces
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
fanoutConsumer := fanoutconsumer.NewTraces(consumers)
switch n.exprPipelineType {
switch exprPipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTracesToTraces(ctx, set, fanoutConsumer)
conn, err = builder.CreateTracesToTraces(ctx, set, fanoutConsumer)
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetricsToTraces(ctx, set, fanoutConsumer)
conn, err = builder.CreateMetricsToTraces(ctx, set, fanoutConsumer)
case component.DataTypeLogs:
n.Component, err = builder.CreateLogsToTraces(ctx, set, fanoutConsumer)
conn, err = builder.CreateLogsToTraces(ctx, set, fanoutConsumer)
}
case component.DataTypeMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
fanoutConsumer := fanoutconsumer.NewMetrics(consumers)
switch n.exprPipelineType {
switch exprPipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTracesToMetrics(ctx, set, fanoutConsumer)
conn, err = builder.CreateTracesToMetrics(ctx, set, fanoutConsumer)
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetricsToMetrics(ctx, set, fanoutConsumer)
conn, err = builder.CreateMetricsToMetrics(ctx, set, fanoutConsumer)
case component.DataTypeLogs:
n.Component, err = builder.CreateLogsToMetrics(ctx, set, fanoutConsumer)
conn, err = builder.CreateLogsToMetrics(ctx, set, fanoutConsumer)
}
case component.DataTypeLogs:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
fanoutConsumer := fanoutconsumer.NewLogs(consumers)
switch n.exprPipelineType {
switch exprPipelineType {
case component.DataTypeTraces:
n.Component, err = builder.CreateTracesToLogs(ctx, set, fanoutConsumer)
conn, err = builder.CreateTracesToLogs(ctx, set, fanoutConsumer)
case component.DataTypeMetrics:
n.Component, err = builder.CreateMetricsToLogs(ctx, set, fanoutConsumer)
conn, err = builder.CreateMetricsToLogs(ctx, set, fanoutConsumer)
case component.DataTypeLogs:
n.Component, err = builder.CreateLogsToLogs(ctx, set, fanoutConsumer)
conn, err = builder.CreateLogsToLogs(ctx, set, fanoutConsumer)
}
}
return err
return
}

var _ consumerNode = &capabilitiesNode{}
Expand Down Expand Up @@ -267,14 +225,6 @@ func (n *capabilitiesNode) getConsumer() baseConsumer {
return n.baseConsumer
}

func (n *capabilitiesNode) build(next baseConsumer, processors []*processorNode) {
n.baseConsumer = next
for _, proc := range processors {
n.Capabilities.MutatesData = n.Capabilities.MutatesData ||
proc.Component.(baseConsumer).Capabilities().MutatesData
}
}

var _ consumerNode = &fanOutNode{}

// Each pipeline has one fan-out node before exporters.
Expand All @@ -295,27 +245,3 @@ func newFanOutNode(pipelineID component.ID) *fanOutNode {
func (n *fanOutNode) getConsumer() baseConsumer {
return n.baseConsumer
}

func (n *fanOutNode) build(nexts []baseConsumer) {
switch n.pipelineID.Type() {
case component.DataTypeTraces:
consumers := make([]consumer.Traces, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.baseConsumer = fanoutconsumer.NewTraces(consumers)
case component.DataTypeMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {

consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
case component.DataTypeLogs:
consumers := make([]consumer.Logs, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
}
}
Loading

0 comments on commit fce05a8

Please sign in to comment.