forked from stripe-archive/falconer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
211 lines (184 loc) · 5.17 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package falconer
import (
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"github.com/stripe/veneur/ssf"
"github.com/stripe/veneur/trace"
"github.com/stripe/veneur/trace/metrics"
)
// Item is a span with an expiration time.
type Item struct {
span *ssf.SSFSpan
expiration int64
}
// Watch is a streaming query against incoming spans.
type Watch struct {
FoundChan chan *ssf.SSFSpan
Tags map[string]string
}
// Worker is a repository for spans.
type Worker struct {
SpanChan chan *ssf.SSFSpan
WatchChan chan *ssf.SSFSpan
QuitChan chan struct{}
// Using sync.Map because it matches our workload - write-once, read-many.
Items *sync.Map
Watches map[string]*Watch
// protects only Watches
watchMutex sync.Mutex
expirationDuration time.Duration
log *logrus.Logger
traceClient *trace.Client
// Count of items in the sync.Map. Kept separately because sync.Map doesn't
// have a method for computing this efficiently.
itemCount uint64
// Number of items expired since last flush.
expiredCount uint64
}
// NewWorker creates a new worker which stores spans, handles queries and expires
// old spans.
func NewWorker(log *logrus.Logger, trace *trace.Client, spanDepth int, watchDepth int, expirationDuration time.Duration) *Worker {
w := &Worker{
// We make our incoming span channel buffered so that we can use non-blocking
// writes. This improves write speed by >= 50% but risks dropping spans if
// the buffer is full.
SpanChan: make(chan *ssf.SSFSpan, spanDepth),
WatchChan: make(chan *ssf.SSFSpan, watchDepth),
QuitChan: make(chan struct{}),
Items: &sync.Map{},
Watches: make(map[string]*Watch),
watchMutex: sync.Mutex{},
expirationDuration: expirationDuration,
log: log,
traceClient: trace,
}
ticker := time.NewTicker(expirationDuration)
// Use a ticker to periodically expire spans.
go func() {
for t := range ticker.C {
w.Sweep(t.Unix())
}
}()
// Monitor the watch channel and catch any spans that might match. We do this
// separately from the write loop such that watches to not overly impact
// write throughput.
go func() {
w.WatchWork()
}()
return w
}
// Work monitors the worker's channel and processes any incoming spans.
func (w *Worker) Work() {
for {
select {
case span := <-w.SpanChan:
w.AddSpan(span)
case <-w.QuitChan:
return
}
}
}
// WatchWork iterates over watches channel comparing any incoming spans to this
// workers watches.
func (w *Worker) WatchWork() {
for {
select {
case span := <-w.WatchChan:
w.watchMutex.Lock()
for _, watch := range w.Watches {
for tagKey, tagValue := range watch.Tags {
if val, ok := span.Tags[tagKey]; ok {
if val == tagValue {
watch.FoundChan <- span
continue // Found a hit, no need to keep looking at this span
}
}
}
}
w.watchMutex.Unlock()
}
}
}
// AddSpan adds the span to this worker.
func (w *Worker) AddSpan(span *ssf.SSFSpan) {
_, dup := w.Items.LoadOrStore(span.Id, Item{
span: span,
expiration: time.Now().Add(w.expirationDuration).Unix(),
})
if dup {
w.log.WithField("span-id", span.Id).Error("Collision on span, discarding new span")
} else {
atomic.AddUint64(&w.itemCount, 1)
}
if len(w.Watches) > 0 {
select {
case w.WatchChan <- span:
default:
w.log.Warn("Failed to write watched span")
}
}
}
// AddWatch adds a watch with the given name and configures it to return matches
// to the supplied channel.
func (w *Worker) AddWatch(name string, tags map[string]string, foundChan chan *ssf.SSFSpan) {
w.watchMutex.Lock()
defer w.watchMutex.Unlock()
w.Watches[name] = &Watch{
FoundChan: foundChan,
Tags: tags,
}
}
// RemoveWatch removes the watch with the given name.
func (w *Worker) RemoveWatch(name string) {
w.watchMutex.Lock()
defer w.watchMutex.Unlock()
delete(w.Watches, name)
}
// GetTrace returns all spans with the specified trace id.
func (w *Worker) GetTrace(id int64) []*ssf.SSFSpan {
var spans []*ssf.SSFSpan
w.Items.Range(func(_, v interface{}) bool {
item := v.(Item)
if item.span.TraceId == id {
spans = append(spans, item.span)
}
return true
})
return spans
}
// FindSpans returns all spans matching the specified tags to the channel provided.
func (w *Worker) FindSpans(tags map[string]string, resultChan chan []*ssf.SSFSpan) {
var foundSpans []*ssf.SSFSpan
w.Items.Range(func(_, v interface{}) bool {
span := v.(Item).span
for fk, fv := range tags {
if v, ok := span.Tags[fk]; ok {
if v == fv {
foundSpans = append(foundSpans, span)
continue
}
}
}
return true
})
resultChan <- foundSpans
}
// Sweep deletes any expired spans. The provided time is used for comparison to
// facilitate testing.
func (w *Worker) Sweep(expireTime int64) {
expired := 0
w.Items.Range(func(k, v interface{}) bool {
item := v.(Item)
if item.expiration <= expireTime {
w.watchMutex.Lock()
w.Items.Delete(k.(int64))
w.watchMutex.Unlock()
expired++
}
return true
})
atomic.AddUint64(&w.itemCount, ^uint64(expired-1))
metrics.ReportOne(w.traceClient, ssf.Count("ssfspans.expired", float32(expired), nil))
}