forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tail_test.go
145 lines (122 loc) · 3.43 KB
/
tail_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package tail
import (
"io/ioutil"
"os"
"runtime"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTailFromBeginning(t *testing.T) {
if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" {
t.Skip("Skipping CI testing due to race conditions")
}
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
tt := NewTail()
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser()
tt.SetParser(p)
defer tt.Stop()
defer tmpfile.Close()
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
},
map[string]string{
"mytag": "foo",
"path": tmpfile.Name(),
})
}
func TestTailFromEnd(t *testing.T) {
if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" {
t.Skip("Skipping CI testing due to race conditions")
}
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
tt := NewTail()
tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser()
tt.SetParser(p)
defer tt.Stop()
defer tmpfile.Close()
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
for _, tailer := range tt.tailers {
for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() {
// wait for tailer to jump to end
runtime.Gosched()
}
}
_, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n")
require.NoError(t, err)
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
},
map[string]string{
"othertag": "foo",
"path": tmpfile.Name(),
})
assert.Len(t, acc.Metrics, 1)
}
func TestTailBadLine(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
tt := NewTail()
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser()
tt.SetParser(p)
defer tt.Stop()
defer tmpfile.Close()
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
require.NoError(t, acc.GatherError(tt.Gather))
_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n")
require.NoError(t, err)
acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
}
func TestTailDosLineendings(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n")
require.NoError(t, err)
tt := NewTail()
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser()
tt.SetParser(p)
defer tt.Stop()
defer tmpfile.Close()
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(2)
acc.AssertContainsFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
})
acc.AssertContainsFields(t, "cpu2",
map[string]interface{}{
"usage_idle": float64(200),
})
}