Skip to content

Commit

Permalink
Simplify nextProcessors
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Feb 1, 2023
1 parent 262e379 commit 3578c48
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (g *pipelinesGraph) buildNodes(ctx context.Context, set pipelinesSettings)
case *exporterNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Exporters)
case *capabilitiesNode:
n.build(g.nextConsumers(n.ID())[0], g.nextProcessors(n.ID()))
n.build(g.nextConsumers(n.ID())[0], g.nextProcessors(n))
case *fanOutNode:
n.build(g.nextConsumers(n.ID()))
}
Expand All @@ -190,12 +190,12 @@ func (g *pipelinesGraph) nextConsumers(nodeID int64) []baseConsumer {
}

// Get all processors in this pipeline
func (g *pipelinesGraph) nextProcessors(nodeID int64) []*processorNode {
nextNodes := g.componentGraph.From(nodeID)
if procNode, ok := nextNodes.Node().(*processorNode); ok {
return append([]*processorNode{procNode}, g.nextProcessors(procNode.ID())...)
func (g *pipelinesGraph) nextProcessors(node *capabilitiesNode) []*processorNode {
nexts := []*processorNode{}
for _, n := range g.pipelines[node.pipelineID].processors {
nexts = append(nexts, n.(*processorNode))
}
return []*processorNode{}
return nexts
}

// A node-based representation of a pipeline configuration.
Expand Down

0 comments on commit 3578c48

Please sign in to comment.