-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathgraph.go
202 lines (173 loc) · 5.07 KB
/
graph.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package eventlogger
import (
"context"
"fmt"
"sync"
"github.com/hashicorp/go-multierror"
)
// graph
type graph struct {
// roots maps PipelineIDs to pipelineRegistrations.
// A registeredPipeline includes the root Node for a pipeline.
roots graphMap
// successThreshold specifies how many pipelines must successfully process
// an event for Process to not return an error. This means that a filter
// could of course filter an event before it reaches the pipeline's sink,
// but it would still count as success when it comes to meeting this threshold
successThreshold int
// successThresholdSinks specifies how many sinks must successfully process
// an event for Process to not return an error.
successThresholdSinks int
}
// Process the Event by routing it through all of the graph's nodes,
// starting with the root node.
func (g *graph) process(ctx context.Context, e *Event) (Status, error) {
statusChan := make(chan Status)
var wg sync.WaitGroup
go func() {
g.roots.Range(func(_ PipelineID, pipeline *registeredPipeline) bool {
select {
// Don't continue to start root nodes if our context is already done.
// We would just process the node and then drop the status, and no
// other linked nodes would be processed.
case <-ctx.Done():
return false
default:
}
wg.Add(1)
g.doProcess(ctx, pipeline.rootNode, e, statusChan, &wg)
return true
})
wg.Wait()
close(statusChan)
}()
var status Status
var done bool
for !done {
select {
case <-ctx.Done():
done = true
case s, ok := <-statusChan:
if ok {
status.Warnings = append(status.Warnings, s.Warnings...)
status.complete = append(status.complete, s.complete...)
status.completeSinks = append(status.completeSinks, s.completeSinks...)
} else {
done = true
}
}
}
return status, status.getError(ctx.Err(), g.successThreshold, g.successThresholdSinks)
}
// Recursively process every node in the graph.
//
// # No Status is sent when a request is cancelled by the context
//
// Status will be sent when we stop processing nodes, which can happen if:
// - a node.Process(...) returns an error, and Status.complete is empty
// - a node.Process(...) filters an event, and Status.complete contains the
// filter node's ID
// - the final node in a pipeline (a sink) finishes, and Status.complete contains
// the sink node's ID
func (g *graph) doProcess(ctx context.Context, node *linkedNode, e *Event, statusChan chan Status, wg *sync.WaitGroup) {
defer wg.Done()
// Process the current Node
e, err := node.node.Process(ctx, e)
if err != nil {
select {
case <-ctx.Done():
case statusChan <- Status{Warnings: []error{err}}:
}
return
}
completeStatus := Status{complete: []NodeID{node.nodeID}}
if node.node.Type() == NodeTypeSink {
completeStatus.completeSinks = []NodeID{node.nodeID}
}
// If the Event is nil, it has been filtered out and we are done.
if e == nil {
select {
case <-ctx.Done():
case statusChan <- completeStatus:
}
return
}
// Process any child nodes. This is depth-first.
if len(node.next) != 0 {
// If the new Event is nil, it has been filtered out and we are done.
if e == nil {
statusChan <- Status{}
return
}
for _, child := range node.next {
wg.Add(1)
go g.doProcess(ctx, child, e, statusChan, wg)
}
} else {
select {
case <-ctx.Done():
case statusChan <- completeStatus:
}
}
}
func (g *graph) reopen(ctx context.Context) error {
var errors *multierror.Error
g.roots.Range(func(_ PipelineID, pipeline *registeredPipeline) bool {
err := g.doReopen(ctx, pipeline.rootNode)
if err != nil {
errors = multierror.Append(errors, err)
}
return true
})
return errors.ErrorOrNil()
}
// Recursively reopen every node in the graph.
func (g *graph) doReopen(ctx context.Context, node *linkedNode) error {
// Process the current Node
err := node.node.Reopen()
if err != nil {
return err
}
// Process any child nodes. This is depth-first.
for _, child := range node.next {
err = g.doReopen(ctx, child)
if err != nil {
return err
}
}
return nil
}
func (g *graph) validate() error {
var errors *multierror.Error
g.roots.Range(func(_ PipelineID, pipeline *registeredPipeline) bool {
err := g.doValidate(nil, pipeline.rootNode)
if err != nil {
errors = multierror.Append(errors, err)
}
return true
})
return errors.ErrorOrNil()
}
func (g *graph) doValidate(parent, node *linkedNode) error {
isInner := len(node.next) > 0
switch {
case len(node.next) == 0 && node.node.Type() != NodeTypeSink:
return fmt.Errorf("non-sink node has no children")
case !isInner && parent == nil:
return fmt.Errorf("sink node at root")
case !isInner && (parent.node.Type() != NodeTypeFormatter && parent.node.Type() != NodeTypeFormatterFilter):
return fmt.Errorf("sink node without preceding formatter or formatter filter")
case !isInner:
return nil
}
// Process any child nodes. This is depth-first.
for _, child := range node.next {
err := g.doValidate(node, child)
if err != nil {
return err
}
}
return nil
}