Skip to content

Commit

Permalink
Move virtual node creation to createNodes, simplify createEdges
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Feb 1, 2023
1 parent 929ba19 commit 6ab3baf
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 52 deletions.
93 changes: 43 additions & 50 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func buildPipelinesGraph(ctx context.Context, set pipelinesSettings) (pipelines,
}
pipelines.createNodes(set)
pipelines.createEdges()
return pipelines, pipelines.buildNodes(ctx, set)
return pipelines, pipelines.buildComponents(ctx, set)
}

// Creates a node for each instance of a component and adds it to the graph
Expand All @@ -65,26 +65,34 @@ func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
pipe.addReceiver(g.createReceiver(pipelineID, recvID))
rcvrNode := g.createReceiver(pipelineID, recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
}

pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)

for _, procID := range pipelineCfg.Processors {
pipe.addProcessor(g.createProcessor(pipelineID, procID))
pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID))
}

pipe.fanOutNode = newFanOutNode(pipelineID)

for _, exprID := range pipelineCfg.Exporters {
if set.Connectors.IsConfigured(exprID) {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
pipe.addExporter(g.createExporter(pipelineID, exprID))
expNode := g.createExporter(pipelineID, exprID)
pipe.exporters[expNode.ID()] = expNode
}
}

for connID, exprPipelineIDs := range connectorsAsExporter {
for _, eID := range exprPipelineIDs {
for _, rID := range connectorsAsReceiver[connID] {
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].addExporter(connNode)
g.pipelines[rID].addReceiver(connNode)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
}
}
}
Expand Down Expand Up @@ -124,33 +132,28 @@ func (g *pipelinesGraph) createConnector(exprPipelineID, rcvrPipelineID, connID
}

func (g *pipelinesGraph) createEdges() {
for pipelineID, pg := range g.pipelines {
fanOutToExporters := newFanOutNode(pipelineID)
for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(fanOutToExporters, exporter))
}

if len(pg.processors) == 0 {
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, fanOutToExporters))
}
continue
for _, pg := range g.pipelines {
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}

fanInToProcessors := newCapabilitiesNode(pipelineID)
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, fanInToProcessors))
var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
from = processor
}
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))

g.componentGraph.SetEdge(g.componentGraph.NewEdge(fanInToProcessors, pg.processors[0]))
for i := 0; i+1 < len(pg.processors); i++ {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.processors[i], pg.processors[i+1]))
for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
}
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.processors[len(pg.processors)-1], fanOutToExporters))
}
}

func (g *pipelinesGraph) buildNodes(ctx context.Context, set pipelinesSettings) error {
func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSettings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err // TODO clean up error message
Expand All @@ -168,7 +171,7 @@ func (g *pipelinesGraph) buildNodes(ctx context.Context, set pipelinesSettings)
case *exporterNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Exporters)
case *capabilitiesNode:
n.build(g.nextConsumers(n.ID())[0], g.nextProcessors(n))
n.build(g.nextConsumers(n.ID())[0], g.pipelines[n.pipelineID].processors)
case *fanOutNode:
n.build(g.nextConsumers(n.ID()))
}
Expand All @@ -189,33 +192,23 @@ func (g *pipelinesGraph) nextConsumers(nodeID int64) []baseConsumer {
return nexts
}

// Get all processors in this pipeline
func (g *pipelinesGraph) nextProcessors(node *capabilitiesNode) []*processorNode {
nexts := []*processorNode{}
for _, n := range g.pipelines[node.pipelineID].processors {
nexts = append(nexts, n.(*processorNode))
}
return nexts
}

// A node-based representation of a pipeline configuration.
type pipelineNodes struct {
// Use maps for receivers and exporters to assist with deduplication of connector instances.
// Use map to assist with deduplication of connector instances.
receivers map[int64]graph.Node
exporters map[int64]graph.Node

// The node to which receivers emit. Passes through to processors.
// Easily accessible as the first node in a pipeline.
*capabilitiesNode

// The order of processors is very important. Therefore use a slice for processors.
processors []graph.Node
}
processors []*processorNode

func (p *pipelineNodes) addReceiver(node graph.Node) {
p.receivers[node.ID()] = node
}
func (p *pipelineNodes) addProcessor(node graph.Node) {
p.processors = append(p.processors, node)
}
func (p *pipelineNodes) addExporter(node graph.Node) {
p.exporters[node.ID()] = node
// Emits to exporters.
*fanOutNode

// Use map to assist with deduplication of connector instances.
exporters map[int64]graph.Node
}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
Expand All @@ -230,7 +223,7 @@ func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) erro
for i := len(nodes) - 1; i >= 0; i-- {
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip fanin/out nodes
// Skip capabilities/fanout nodes
continue
}
if compErr := comp.Start(ctx, host); compErr != nil {
Expand All @@ -254,7 +247,7 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {
for i := 0; i < len(nodes); i++ {
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip fanin/out nodes
// Skip capabilities/fanout nodes
continue
}
errs = multierr.Append(errs, comp.Shutdown(ctx))
Expand All @@ -264,7 +257,7 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {

func (g *pipelinesGraph) GetExporters() map[component.DataType]map[component.ID]component.Component {
// TODO actual implementation
return nil
return make(map[component.DataType]map[component.ID]component.Component)
}

func (g *pipelinesGraph) HandleZPages(w http.ResponseWriter, r *http.Request) {
Expand Down
4 changes: 2 additions & 2 deletions service/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
}

for _, n := range pipeline.processors {
require.True(t, n.(*processorNode).Component.(*testcomponents.ExampleProcessor).Started())
require.True(t, n.Component.(*testcomponents.ExampleProcessor).Started())
}

for _, n := range pipeline.receivers {
Expand Down Expand Up @@ -731,7 +731,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
}

for _, n := range pipeline.processors {
require.True(t, n.(*processorNode).Component.(*testcomponents.ExampleProcessor).Stopped())
require.True(t, n.Component.(*testcomponents.ExampleProcessor).Stopped())
}

for _, n := range pipeline.exporters {
Expand Down

0 comments on commit 6ab3baf

Please sign in to comment.