-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.go
164 lines (142 loc) · 3.31 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package spanner
import (
"context"
"fmt"
"sync"
"time"
"github.com/zalgonoise/attr"
"github.com/zalgonoise/logx"
)
const (
maxBatchSize = 2048
maxExportSize = 1024
defaultDelay = 5 * time.Second
maxTimeout = 30 * time.Second
)
var (
zeroTime time.Time
)
// SpanProcessor will handle routing ended Spans to an Exporter
type SpanProcessor interface {
// Handle routes the input Span `span` to the SpanProcessor's Exporter
Handle(span Span)
// Shutdown gracefully stops the SpanProcessor, returning an error
Shutdown(ctx context.Context) error
// Flush will force-push the existing SpanData in the SpanProcessor's batch into the
// Exporter, even if not yet scheduled to do so
Flush(ctx context.Context) error
}
type processor struct {
sync.Mutex
stopOnce sync.Once
rec bool
exporter Exporter
stopCh chan struct{}
queue chan Span
timer *time.Timer
batch []SpanData
}
// NewProcessor creates a new SpanProcessor configured with the input Exporter `e`
func NewProcessor(e Exporter) SpanProcessor {
p := &processor{
exporter: e,
stopCh: make(chan struct{}),
queue: make(chan Span),
timer: time.NewTimer(defaultDelay),
batch: make([]SpanData, 0, maxBatchSize),
}
go p.runtime()
p.rec = true
return p
}
// Handle routes the input Span `span` to the SpanProcessor's Exporter
func (p *processor) Handle(span Span) {
if p.rec {
p.queue <- span
}
}
// Shutdown gracefully stops the SpanProcessor, returning an error
func (p *processor) Shutdown(ctx context.Context) error {
p.Lock()
p.rec = false
p.Unlock()
var err error
p.stopOnce.Do(func() {
wait := make(chan struct{})
go func() {
p.stopCh <- struct{}{}
err := p.export(ctx)
exporterErr := p.exporter.Shutdown(ctx)
if exporterErr != nil {
if err != nil {
err = fmt.Errorf("%w -- %v", exporterErr, err)
}
}
close(wait)
}()
select {
case <-wait:
case <-ctx.Done():
if cErr := ctx.Err(); cErr != nil {
if err != nil {
err = fmt.Errorf("%w -- %v", cErr, err)
} else {
err = cErr
}
}
}
})
return err
}
// Flush will force-push the existing SpanData in the SpanProcessor's batch into the
// Exporter, even if not yet scheduled to do so
func (p *processor) Flush(ctx context.Context) error {
return p.export(ctx)
}
func (p *processor) runtime() {
defer p.timer.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
// shutdown signal
case <-p.stopCh:
return
// export triggered
case <-p.timer.C:
err := p.export(ctx)
if err != nil {
logx.Error("[processor] spanner export failed", attr.String("error", err.Error()))
}
// span enqueued
case span := <-p.queue:
sd := span.Extract()
p.Lock()
p.batch = append(p.batch, sd)
toExport := len(p.batch) >= maxExportSize
p.Unlock()
if toExport {
if !p.timer.Stop() {
<-p.timer.C
}
err := p.export(ctx)
if err != nil {
logx.Error("[processor] spanner export failed", attr.String("error", err.Error()))
}
}
}
}
}
func (p *processor) export(ctx context.Context) error {
p.timer.Reset(defaultDelay)
ctx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()
if len(p.batch) > 0 {
err := p.exporter.Export(ctx, p.batch)
p.batch = p.batch[:0]
if err != nil {
return err
}
}
return nil
}