Skip to content

Commit

Permalink
Add full component graph build (#7045)
Browse files Browse the repository at this point in the history
* Add full component graph build

* PR feedback

* Split adding nodes to graph from adding to pipelines

* Rename fanInNode to capabilitiesNode

* Add consumerNode interface

* Clarify comments about start/stop order of graph

* Simplify nextProcessors

* Move virtual node creation to createNodes, simplify createEdges

* Remove node build methods. Pull CreateSettings into buildComponent functions.

* Fix capabilities node MutatesData
  • Loading branch information
djaglowski authored Feb 3, 2023
1 parent 3c791d8 commit 490fd6a
Show file tree
Hide file tree
Showing 8 changed files with 2,064 additions and 105 deletions.
5 changes: 5 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@ func (b *Builder) CreateLogsToLogs(ctx context.Context, set CreateSettings, next
return f.CreateLogsToLogs(ctx, set, cfg, next)
}

func (b *Builder) IsConfigured(componentID component.ID) bool {
_, ok := b.cfgs[componentID]
return ok
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}
Expand Down
5 changes: 4 additions & 1 deletion connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ func TestBuilderMissingConfig(t *testing.T) {
assert.Nil(t, l2l)
}

func TestBuilderFactory(t *testing.T) {
func TestBuilderGetters(t *testing.T) {
factories, err := MakeFactoryMap([]Factory{NewFactory("foo", nil)}...)
require.NoError(t, err)

cfgs := map[component.ID]component.Config{component.NewID("foo"): struct{}{}}
b := NewBuilder(cfgs, factories)

assert.True(t, b.IsConfigured(component.NewID("foo")))
assert.False(t, b.IsConfigured(component.NewID("bar")))

assert.NotNil(t, b.Factory(component.NewID("foo").Type()))
assert.Nil(t, b.Factory(component.NewID("bar").Type()))
}
Expand Down
249 changes: 241 additions & 8 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,260 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"errors"
"net/http"

"go.uber.org/multierr"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

var _ pipelines = (*pipelinesGraph)(nil)

type pipelinesGraph struct {
// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph

// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelines map[component.ID]*pipelineNodes
}

func buildPipelinesGraph(ctx context.Context, set pipelinesSettings) (pipelines, error) {
pipelines := &pipelinesGraph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
}
for pipelineID := range set.PipelineConfigs {
pipelines.pipelines[pipelineID] = &pipelineNodes{
receivers: make(map[int64]graph.Node),
exporters: make(map[int64]graph.Node),
}
}
pipelines.createNodes(set)
pipelines.createEdges()
return pipelines, pipelines.buildComponents(ctx, set)
}

// Creates a node for each instance of a component and adds it to the graph
func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
// Keep track of connectors and where they are used. (map[connectorID][]pipelineID)
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)

for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
if set.ConnectorBuilder.IsConfigured(recvID) {
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
rcvrNode := g.createReceiver(pipelineID, recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
}

pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)

for _, procID := range pipelineCfg.Processors {
pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID))
}

pipe.fanOutNode = newFanOutNode(pipelineID)

for _, exprID := range pipelineCfg.Exporters {
if set.ConnectorBuilder.IsConfigured(exprID) {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
expNode := g.createExporter(pipelineID, exprID)
pipe.exporters[expNode.ID()] = expNode
}
}

for connID, exprPipelineIDs := range connectorsAsExporter {
for _, eID := range exprPipelineIDs {
for _, rID := range connectorsAsReceiver[connID] {
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
}
}
}
}

func (g *pipelinesGraph) createReceiver(pipelineID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID, recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
return node.(*receiverNode)
}
g.componentGraph.AddNode(rcvrNode)
return rcvrNode
}

func (g *pipelinesGraph) createProcessor(pipelineID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
return procNode
}

func (g *pipelinesGraph) createExporter(pipelineID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID, exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
return expNode
}

func buildPipelinesGraph(_ context.Context, _ pipelinesSettings) (pipelines, error) {
err := errors.New("not yet implemented")
return &pipelinesGraph{componentGraph: simple.NewDirectedGraph()}, err
func (g *pipelinesGraph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
return connNode
}

func (g *pipelinesGraph) createEdges() {
for _, pg := range g.pipelines {
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}

var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
from = processor
}
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))

for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
}
}
}

func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSettings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
// TODO When there is a cycle in the graph, there is enough information
// within the error to construct a better error message that indicates
// exactly the components that are in a cycle.
return err
}

for i := len(nodes) - 1; i >= 0; i-- {
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
n.Component, err = buildReceiver(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ReceiverBuilder,
component.NewIDWithName(n.pipelineType, "*"), g.nextConsumers(n.ID()))
case *processorNode:
n.Component, err = buildProcessor(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ProcessorBuilder,
n.pipelineID, g.nextConsumers(n.ID())[0])
case *exporterNode:
n.Component, err = buildExporter(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ExporterBuilder,
component.NewIDWithName(n.pipelineType, "*"))
case *connectorNode:
n.Component, err = buildConnector(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ConnectorBuilder,
n.exprPipelineType, n.rcvrPipelineType, g.nextConsumers(n.ID()))
case *capabilitiesNode:
cap := consumer.Capabilities{}
for _, proc := range g.pipelines[n.pipelineID].processors {
cap.MutatesData = cap.MutatesData || proc.getConsumer().Capabilities().MutatesData
}
next := g.nextConsumers(n.ID())[0]
switch n.pipelineID.Type() {
case component.DataTypeTraces:
n.baseConsumer = capabilityconsumer.NewTraces(next.(consumer.Traces), cap)
case component.DataTypeMetrics:
n.baseConsumer = capabilityconsumer.NewMetrics(next.(consumer.Metrics), cap)
case component.DataTypeLogs:
n.baseConsumer = capabilityconsumer.NewLogs(next.(consumer.Logs), cap)
}
case *fanOutNode:
nexts := g.nextConsumers(n.ID())
switch n.pipelineID.Type() {
case component.DataTypeTraces:
consumers := make([]consumer.Traces, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.baseConsumer = fanoutconsumer.NewTraces(consumers)
case component.DataTypeMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {

consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
case component.DataTypeLogs:
consumers := make([]consumer.Logs, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
}
}
if err != nil {
return err
}
}
return nil
}

// Find all nodes
func (g *pipelinesGraph) nextConsumers(nodeID int64) []baseConsumer {
nextNodes := g.componentGraph.From(nodeID)
nexts := make([]baseConsumer, 0, nextNodes.Len())
for nextNodes.Next() {
nexts = append(nexts, nextNodes.Node().(consumerNode).getConsumer())
}
return nexts
}

// A node-based representation of a pipeline configuration.
type pipelineNodes struct {
// Use map to assist with deduplication of connector instances.
receivers map[int64]graph.Node

// The node to which receivers emit. Passes through to processors.
// Easily accessible as the first node in a pipeline.
*capabilitiesNode

// The order of processors is very important. Therefore use a slice for processors.
processors []*processorNode

// Emits to exporters.
*fanOutNode

// Use map to assist with deduplication of connector instances.
exporters map[int64]graph.Node
}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}
// Start exporters first, and work towards receivers

// Start in reverse topological order so that downstream components
// are started before upstream components. This ensures that each
// component's consumer is ready to consume.
for i := len(nodes) - 1; i >= 0; i-- {
if compErr := nodes[i].(component.Component).Start(ctx, host); compErr != nil {
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip capabilities/fanout nodes
continue
}
if compErr := comp.Start(ctx, host); compErr != nil {
return compErr
}
}
Expand All @@ -57,10 +281,19 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {
if err != nil {
return err
}
// Stop receivers first, and work towards exporters

// Stop in topological order so that upstream components
// are stopped before downstream components. This ensures
// that each component has a chance to drain to it's consumer
// before the consumer is stopped.
var errs error
for i := 0; i < len(nodes); i++ {
errs = multierr.Append(errs, nodes[i].(component.Component).Shutdown(ctx))
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip capabilities/fanout nodes
continue
}
errs = multierr.Append(errs, comp.Shutdown(ctx))
}
return errs
}
Expand Down
Loading

0 comments on commit 490fd6a

Please sign in to comment.