forked from stripe/veneur
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flusher_test.go
65 lines (55 loc) · 1.58 KB
/
flusher_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package veneur
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stripe/veneur/internal/forwardtest"
"github.com/stripe/veneur/samplers/metricpb"
)
func TestServerFlushGRPC(t *testing.T) {
done := make(chan []string)
testServer := forwardtest.NewServer(func(ms []*metricpb.Metric) {
var names []string
for _, m := range ms {
names = append(names, m.Name)
}
done <- names
})
testServer.Start(t)
defer testServer.Stop()
localCfg := localConfig()
localCfg.ForwardAddress = testServer.Addr().String()
localCfg.ForwardUseGrpc = true
local := setupVeneurServer(t, localCfg, nil, nil, nil)
defer local.Shutdown()
inputs := forwardGRPCTestMetrics()
for _, input := range inputs {
local.Workers[0].ProcessMetric(input)
}
local.Flush(context.Background())
expected := []string{
testGRPCMetric("histogram"),
testGRPCMetric("timer"),
testGRPCMetric("counter"),
testGRPCMetric("gauge"),
testGRPCMetric("set"),
}
select {
case v := <-done:
assert.ElementsMatch(t, expected, v,
"Flush didn't output the right metrics")
case <-time.After(time.Second):
t.Fatal("Timed out waiting for the gRPC server to receive the flush")
}
}
// Just test that a flushing to a bad address is handled without panicing
func TestServerFlushGRPCBadAddress(t *testing.T) {
localCfg := localConfig()
localCfg.ForwardAddress = "bad-address:123"
localCfg.ForwardUseGrpc = true
local := setupVeneurServer(t, localCfg, nil, nil, nil)
defer local.Shutdown()
local.Workers[0].ProcessMetric(forwardGRPCTestMetrics()[0])
local.Flush(context.Background())
}