Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 27, 2023
1 parent 0b8d98d commit 26d1a07
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 159 deletions.
14 changes: 3 additions & 11 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"fmt"
"net/http"

"go.uber.org/multierr"
Expand Down Expand Up @@ -87,16 +86,9 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) error {
}
}

if len(connectorsAsExporter) != len(connectorsAsReceiver) {
return fmt.Errorf("each connector must be used as both receiver and exporter")
}
for connID, exprPipelineIDs := range connectorsAsExporter {
rcvrPipelineIDs, ok := connectorsAsReceiver[connID]
if !ok {
return fmt.Errorf("connector %q must be used as receiver, only found as exporter", connID)
}
for _, eID := range exprPipelineIDs {
for _, rID := range rcvrPipelineIDs {
for _, rID := range connectorsAsReceiver[connID] {
g.addConnector(eID, rID, connID)
}
}
Expand Down Expand Up @@ -181,13 +173,13 @@ func (g *pipelinesGraph) buildNodes(ctx context.Context, set pipelinesSettings)
case *receiverNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Receivers, g.nextConsumers(n.ID()))
case *processorNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Processors, g.nextConsumers(n.ID()))
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Processors, g.nextConsumers(n.ID())[0])
case *connectorNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Connectors, g.nextConsumers(n.ID()))
case *exporterNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Exporters)
case *fanInNode:
n.build(g.nextConsumers(n.ID()), g.nextProcessors(n.ID()))
n.build(g.nextConsumers(n.ID())[0], g.nextProcessors(n.ID()))
case *fanOutNode:
n.build(g.nextConsumers(n.ID()))
}
Expand Down
138 changes: 0 additions & 138 deletions service/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,144 +1166,6 @@ func TestGraphBuildErrors(t *testing.T) {
},
expected: "connector \"bf\" cannot connect from logs to logs: telemetry type is not supported",
},
{
name: "not_allowed_conn_omit_recv_traces.yaml",
receiverCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(),
},
exporterCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(),
},
connectorCfgs: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(),
},
pipelineCfgs: map[component.ID]*PipelineConfig{
component.NewIDWithName("traces", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("traces", "out"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "each connector must be used as both receiver and exporter",
},
{
name: "not_allowed_conn_omit_recv_metrics.yaml",
receiverCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(),
},
exporterCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(),
},
connectorCfgs: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(),
},
pipelineCfgs: map[component.ID]*PipelineConfig{
component.NewIDWithName("metrics", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("metrics", "out"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "each connector must be used as both receiver and exporter",
},
{
name: "not_allowed_conn_omit_recv_logs.yaml",
receiverCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(),
},
exporterCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(),
},
connectorCfgs: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(),
},
pipelineCfgs: map[component.ID]*PipelineConfig{
component.NewIDWithName("logs", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("logs", "out"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "each connector must be used as both receiver and exporter",
},
{
name: "not_allowed_conn_omit_exp_traces.yaml",
receiverCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(),
},
exporterCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(),
},
connectorCfgs: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(),
},
pipelineCfgs: map[component.ID]*PipelineConfig{
component.NewIDWithName("traces", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewIDWithName("traces", "out"): {
Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "conn")},
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "each connector must be used as both receiver and exporter",
},
{
name: "not_allowed_conn_omit_exp_metrics.yaml",
receiverCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(),
},
exporterCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(),
},
connectorCfgs: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(),
},
pipelineCfgs: map[component.ID]*PipelineConfig{
component.NewIDWithName("metrics", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewIDWithName("metrics", "out"): {
Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "conn")},
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "each connector must be used as both receiver and exporter",
},
{
name: "not_allowed_conn_omit_exp_logs.yaml",
receiverCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(),
},
exporterCfgs: map[component.ID]component.Config{
component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(),
},
connectorCfgs: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(),
},
pipelineCfgs: map[component.ID]*PipelineConfig{
component.NewIDWithName("logs", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewIDWithName("logs", "out"): {
Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "conn")},
Exporters: []component.ID{component.NewID("nop")},
},
},
expected: "each connector must be used as both receiver and exporter",
},
{
name: "not_allowed_simple_cycle_traces.yaml",
receiverCfgs: map[component.ID]component.Config{
Expand Down
29 changes: 19 additions & 10 deletions service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ import (
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

const (
receiverSeed = "receiver"
processorSeed = "processor"
exporterSeed = "exporter"
connectorSeed = "connector"
fanInToProcessors = "fanin_to_processors"
fanOutToExporters = "fanout_to_exporters"
)

type nodeID int64

func (n nodeID) ID() int64 {
Expand All @@ -52,7 +61,7 @@ type receiverNode struct {

func newReceiverNode(pipelineID component.ID, recvID component.ID) *receiverNode {
return &receiverNode{
nodeID: newNodeID("receiver", string(pipelineID.Type()), recvID.String()),
nodeID: newNodeID(receiverSeed, string(pipelineID.Type()), recvID.String()),
componentID: recvID,
pipelineType: pipelineID.Type(),
}
Expand Down Expand Up @@ -83,7 +92,7 @@ type processorNode struct {

func newProcessorNode(pipelineID, procID component.ID) *processorNode {
return &processorNode{
nodeID: newNodeID("processor", pipelineID.String(), procID.String()),
nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()),
componentID: procID,
pipelineID: pipelineID,
}
Expand All @@ -94,11 +103,11 @@ func (n *processorNode) build(
tel component.TelemetrySettings,
info component.BuildInfo,
builder *processor.Builder,
nexts []baseConsumer,
next baseConsumer,
) error {
set := processor.CreateSettings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set.TelemetrySettings.Logger = components.ProcessorLogger(set.TelemetrySettings.Logger, n.componentID, n.pipelineID)
p, err := buildProcessor(ctx, set, builder, n.pipelineID, nexts[0])
p, err := buildProcessor(ctx, set, builder, n.pipelineID, next)
n.Component = p
return err
}
Expand All @@ -114,7 +123,7 @@ type exporterNode struct {

func newExporterNode(pipelineID component.ID, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID("exporter", string(pipelineID.Type()), exprID.String()),
nodeID: newNodeID(exporterSeed, string(pipelineID.Type()), exprID.String()),
componentID: exprID,
pipelineType: pipelineID.Type(),
}
Expand Down Expand Up @@ -145,7 +154,7 @@ type connectorNode struct {

func newConnectorNode(exprPipelineType, rcvrPipelineType component.DataType, connID component.ID) *connectorNode {
return &connectorNode{
nodeID: newNodeID("connector", connID.String(), string(exprPipelineType), string(rcvrPipelineType)),
nodeID: newNodeID(connectorSeed, connID.String(), string(exprPipelineType), string(rcvrPipelineType)),
componentID: connID,
exprPipelineType: exprPipelineType,
rcvrPipelineType: rcvrPipelineType,
Expand Down Expand Up @@ -223,14 +232,14 @@ type fanInNode struct {

func newFanInNode(pipelineID component.ID) *fanInNode {
return &fanInNode{
nodeID: newNodeID("fanin_to_processors", pipelineID.String()),
nodeID: newNodeID(fanInToProcessors, pipelineID.String()),
pipelineID: pipelineID,
Capabilities: consumer.Capabilities{},
}
}

func (n *fanInNode) build(nexts []baseConsumer, processors []*processorNode) {
n.baseConsumer = nexts[0]
func (n *fanInNode) build(next baseConsumer, processors []*processorNode) {
n.baseConsumer = next
for _, proc := range processors {
n.Capabilities.MutatesData = n.Capabilities.MutatesData ||
proc.Component.(baseConsumer).Capabilities().MutatesData
Expand All @@ -247,7 +256,7 @@ type fanOutNode struct {

func newFanOutNode(pipelineID component.ID) *fanOutNode {
return &fanOutNode{
nodeID: newNodeID("fanout_to_exporters", pipelineID.String()),
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
pipelineID: pipelineID,
}
}
Expand Down

0 comments on commit 26d1a07

Please sign in to comment.