forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.go
682 lines (596 loc) · 18.3 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
package telegraf
import (
"errors"
"fmt"
"io/ioutil"
"path/filepath"
"reflect"
"sort"
"strings"
"time"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
)
// Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
// This lives outside the agent because mergeStruct doesn't need to handle maps normally.
// We just copy the elements manually in ApplyAgent.
Tags map[string]string
agent *Agent
plugins map[string]plugins.Plugin
pluginConfigurations map[string]*ConfiguredPlugin
outputs map[string]outputs.Output
agentFieldsSet []string
pluginFieldsSet map[string][]string
pluginConfigurationFieldsSet map[string][]string
outputFieldsSet map[string][]string
}
// Plugins returns the configured plugins as a map of name -> plugins.Plugin
func (c *Config) Plugins() map[string]plugins.Plugin {
return c.plugins
}
// Outputs returns the configured outputs as a map of name -> outputs.Output
func (c *Config) Outputs() map[string]outputs.Output {
return c.outputs
}
// TagFilter is the name of a tag, and the values on which to filter
type TagFilter struct {
Name string
Filter []string
}
// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists
// Also lists the tags to filter
type ConfiguredPlugin struct {
Name string
Drop []string
Pass []string
TagDrop []TagFilter
TagPass []TagFilter
Interval time.Duration
}
// ShouldPass returns true if the metric should pass, false if should drop
func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
if strings.HasPrefix(measurement, pat) {
return true
}
}
return false
}
if cp.Drop != nil {
for _, pat := range cp.Drop {
if strings.HasPrefix(measurement, pat) {
return false
}
}
return true
}
if cp.TagPass != nil {
for _, pat := range cp.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
return true
}
}
}
}
return false
}
if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
return false
}
}
}
}
return true
}
return true
}
// ApplyOutput loads the Output struct built from the config into the given Output struct.
// Overrides only values in the given struct that were set in the config.
func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil {
return mergeStruct(v, c.outputs[name], c.outputFieldsSet[name])
}
return nil
}
// ApplyAgent loads the Agent struct built from the config into the given Agent struct.
// Overrides only values in the given struct that were set in the config.
func (c *Config) ApplyAgent(a *Agent) error {
if c.agent != nil {
for key, value := range c.Tags {
a.Tags[key] = value
}
return mergeStruct(a, c.agent, c.agentFieldsSet)
}
return nil
}
// ApplyPlugin loads the Plugin struct built from the config into the given Plugin struct.
// Overrides only values in the given struct that were set in the config.
// Additionally return a ConfiguredPlugin, which is always generated from the config.
func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) {
if c.plugins[name] != nil {
err := mergeStruct(v, c.plugins[name], c.pluginFieldsSet[name])
if err != nil {
return nil, err
}
return c.pluginConfigurations[name], nil
}
return nil, nil
}
// Couldn't figure out how to get this to work with the declared function.
// PluginsDeclared returns the name of all plugins declared in the config.
func (c *Config) PluginsDeclared() []string {
var names []string
for name := range c.plugins {
names = append(names, name)
}
sort.Strings(names)
return names
}
// OutputsDeclared returns the name of all outputs declared in the config.
func (c *Config) OutputsDeclared() []string {
var names []string
for name := range c.outputs {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
var tags []string
for k, v := range c.Tags {
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tags)
return strings.Join(tags, " ")
}
type hasConfig interface {
BasicConfig() string
}
type hasDescr interface {
Description() string
}
var header = `# Telegraf configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared plugins.
# Even if a plugin has no configuration, it must be declared in here
# to be active. Declaring a plugin means just specifying the name
# as a section with no variables. To deactivate a plugin, comment
# out the name and any variables.
# Use 'telegraf -config telegraf.toml -test' to see what metrics a config
# file would generate.
# One rule that plugins conform to is wherever a connection string
# can be passed, the values '' and 'localhost' are treated specially.
# They indicate to the plugin to use their own builtin configuration to
# connect to the local system.
# NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work.
# Tags can also be specified via a normal map, but only one form at a time:
[tags]
# dc = "us-east-1"
# Configuration for telegraf agent
[agent]
# Default data collection interval for all plugins
interval = "10s"
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Default data flushing interval for all outputs
flush_interval = "10s"
# Jitter the flush interval by a random range
# ie, a jitter of 5s and interval 10s means flush will happen every 10-15s
flush_jitter = "5s"
# Number of times to retry each data flush
flush_retries = 2
# Run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
hostname = ""
###############################################################################
# OUTPUTS #
###############################################################################
[outputs]
`
var pluginHeader = `
###############################################################################
# PLUGINS #
###############################################################################
`
var servicePluginHeader = `
###############################################################################
# SERVICE PLUGINS #
###############################################################################
`
// PrintSampleConfig prints the sample config
func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
fmt.Printf(header)
// Filter outputs
var onames []string
for oname := range outputs.Outputs {
if len(outputFilters) == 0 || sliceContains(oname, outputFilters) {
onames = append(onames, oname)
}
}
sort.Strings(onames)
// Print Outputs
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
fmt.Printf("\n# %s\n[outputs.%s]", output.Description(), oname)
config := output.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
}
}
// Filter plugins
var pnames []string
for pname := range plugins.Plugins {
if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
// Print Plugins
fmt.Printf(pluginHeader)
servPlugins := make(map[string]plugins.ServicePlugin)
for _, pname := range pnames {
creator := plugins.Plugins[pname]
plugin := creator()
switch p := plugin.(type) {
case plugins.ServicePlugin:
servPlugins[pname] = p
continue
}
printConfig(pname, plugin)
}
// Print Service Plugins
fmt.Printf(servicePluginHeader)
for name, plugin := range servPlugins {
printConfig(name, plugin)
}
}
type printer interface {
Description() string
SampleConfig() string
}
func printConfig(name string, p printer) {
fmt.Printf("\n# %s\n[%s]", p.Description(), name)
config := p.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
}
}
func sliceContains(name string, list []string) bool {
for _, b := range list {
if b == name {
return true
}
}
return false
}
// PrintPluginConfig prints the config usage of a single plugin.
func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok {
printConfig(name, creator())
} else {
return errors.New(fmt.Sprintf("Plugin %s not found", name))
}
return nil
}
// PrintOutputConfig prints the config usage of a single output.
func PrintOutputConfig(name string) error {
if creator, ok := outputs.Outputs[name]; ok {
printConfig(name, creator())
} else {
return errors.New(fmt.Sprintf("Output %s not found", name))
}
return nil
}
// Used for fuzzy matching struct field names in FieldByNameFunc calls below
func fieldMatch(field string) func(string) bool {
return func(name string) bool {
r := strings.NewReplacer("_", "")
return strings.ToLower(name) == strings.ToLower(r.Replace(field))
}
}
// A very limited merge. Merges the fields named in the fields parameter, replacing most values, but appending to arrays.
func mergeStruct(base, overlay interface{}, fields []string) error {
baseValue := reflect.ValueOf(base).Elem()
overlayValue := reflect.ValueOf(overlay).Elem()
if baseValue.Kind() != reflect.Struct {
return fmt.Errorf("Tried to merge something that wasn't a struct: type %v was %v", baseValue.Type(), baseValue.Kind())
}
if baseValue.Type() != overlayValue.Type() {
return fmt.Errorf("Tried to merge two different types: %v and %v", baseValue.Type(), overlayValue.Type())
}
for _, field := range fields {
overlayFieldValue := overlayValue.FieldByNameFunc(fieldMatch(field))
if !overlayFieldValue.IsValid() {
return fmt.Errorf("could not find field in %v matching %v", overlayValue.Type(), field)
}
if overlayFieldValue.Kind() == reflect.Slice {
baseFieldValue := baseValue.FieldByNameFunc(fieldMatch(field))
baseFieldValue.Set(reflect.AppendSlice(baseFieldValue, overlayFieldValue))
} else {
baseValue.FieldByNameFunc(fieldMatch(field)).Set(overlayFieldValue)
}
}
return nil
}
func (c *Config) LoadDirectory(path string) error {
directoryEntries, err := ioutil.ReadDir(path)
if err != nil {
return err
}
for _, entry := range directoryEntries {
if entry.IsDir() {
continue
}
name := entry.Name()
if name[len(name)-5:] != ".conf" {
continue
}
subConfig, err := LoadConfig(filepath.Join(path, name))
if err != nil {
return err
}
if subConfig.agent != nil {
err = mergeStruct(c.agent, subConfig.agent, subConfig.agentFieldsSet)
if err != nil {
return err
}
for _, field := range subConfig.agentFieldsSet {
if !sliceContains(field, c.agentFieldsSet) {
c.agentFieldsSet = append(c.agentFieldsSet, field)
}
}
}
for pluginName, plugin := range subConfig.plugins {
if _, ok := c.plugins[pluginName]; !ok {
c.plugins[pluginName] = plugin
c.pluginFieldsSet[pluginName] = subConfig.pluginFieldsSet[pluginName]
c.pluginConfigurations[pluginName] = subConfig.pluginConfigurations[pluginName]
c.pluginConfigurationFieldsSet[pluginName] = subConfig.pluginConfigurationFieldsSet[pluginName]
continue
}
err = mergeStruct(c.plugins[pluginName], plugin, subConfig.pluginFieldsSet[pluginName])
if err != nil {
return err
}
for _, field := range subConfig.pluginFieldsSet[pluginName] {
if !sliceContains(field, c.pluginFieldsSet[pluginName]) {
c.pluginFieldsSet[pluginName] = append(c.pluginFieldsSet[pluginName], field)
}
}
err = mergeStruct(c.pluginConfigurations[pluginName], subConfig.pluginConfigurations[pluginName], subConfig.pluginConfigurationFieldsSet[pluginName])
if err != nil {
return err
}
for _, field := range subConfig.pluginConfigurationFieldsSet[pluginName] {
if !sliceContains(field, c.pluginConfigurationFieldsSet[pluginName]) {
c.pluginConfigurationFieldsSet[pluginName] = append(c.pluginConfigurationFieldsSet[pluginName], field)
}
}
}
for outputName, output := range subConfig.outputs {
if _, ok := c.outputs[outputName]; !ok {
c.outputs[outputName] = output
c.outputFieldsSet[outputName] = subConfig.outputFieldsSet[outputName]
continue
}
err = mergeStruct(c.outputs[outputName], output, subConfig.outputFieldsSet[outputName])
if err != nil {
return err
}
for _, field := range subConfig.outputFieldsSet[outputName] {
if !sliceContains(field, c.outputFieldsSet[outputName]) {
c.outputFieldsSet[outputName] = append(c.outputFieldsSet[outputName], field)
}
}
}
}
return nil
}
// hazmat area. Keeping the ast parsing here.
// LoadConfig loads the given config file and returns a *Config pointer
func LoadConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
tbl, err := toml.Parse(data)
if err != nil {
return nil, err
}
c := &Config{
Tags: make(map[string]string),
plugins: make(map[string]plugins.Plugin),
pluginConfigurations: make(map[string]*ConfiguredPlugin),
outputs: make(map[string]outputs.Output),
pluginFieldsSet: make(map[string][]string),
pluginConfigurationFieldsSet: make(map[string][]string),
outputFieldsSet: make(map[string][]string),
}
for name, val := range tbl.Fields {
subtbl, ok := val.(*ast.Table)
if !ok {
return nil, errors.New("invalid configuration")
}
switch name {
case "agent":
err := c.parseAgent(subtbl)
if err != nil {
return nil, err
}
case "tags":
if err = toml.UnmarshalTable(subtbl, c.Tags); err != nil {
return nil, err
}
case "outputs":
for outputName, outputVal := range subtbl.Fields {
outputSubtbl, ok := outputVal.(*ast.Table)
if !ok {
return nil, err
}
err = c.parseOutput(outputName, outputSubtbl)
if err != nil {
return nil, err
}
}
default:
err = c.parsePlugin(name, subtbl)
if err != nil {
return nil, err
}
}
}
return c, nil
}
// Needs to have the field names, for merging later.
func extractFieldNames(ast *ast.Table) []string {
// A reasonable capacity?
var names []string
for name := range ast.Fields {
names = append(names, name)
}
return names
}
// Parse the agent config out of the given *ast.Table.
func (c *Config) parseAgent(agentAst *ast.Table) error {
c.agentFieldsSet = extractFieldNames(agentAst)
agent := &Agent{}
err := toml.UnmarshalTable(agentAst, agent)
if err != nil {
return err
}
c.agent = agent
return nil
}
// Parse an output config out of the given *ast.Table.
func (c *Config) parseOutput(name string, outputAst *ast.Table) error {
c.outputFieldsSet[name] = extractFieldNames(outputAst)
creator, ok := outputs.Outputs[name]
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
output := creator()
err := toml.UnmarshalTable(outputAst, output)
if err != nil {
return err
}
c.outputs[name] = output
return nil
}
// Parse a plugin config, plus plugin meta-config, out of the given *ast.Table.
func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
creator, ok := plugins.Plugins[name]
if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name)
}
plugin := creator()
cp := &ConfiguredPlugin{Name: name}
cpFields := make([]string, 0, 5)
if node, ok := pluginAst.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
}
}
cpFields = append(cpFields, "pass")
}
}
}
if node, ok := pluginAst.Fields["drop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
}
}
cpFields = append(cpFields, "drop")
}
}
}
if node, ok := pluginAst.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return err
}
cp.Interval = dur
cpFields = append(cpFields, "interval")
}
}
}
if node, ok := pluginAst.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
cpFields = append(cpFields, "tagpass")
}
}
}
}
if node, ok := pluginAst.Fields["tagdrop"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
cpFields = append(cpFields, "tagdrop")
}
}
}
}
delete(pluginAst.Fields, "drop")
delete(pluginAst.Fields, "pass")
delete(pluginAst.Fields, "interval")
delete(pluginAst.Fields, "tagdrop")
delete(pluginAst.Fields, "tagpass")
c.pluginFieldsSet[name] = extractFieldNames(pluginAst)
c.pluginConfigurationFieldsSet[name] = cpFields
err := toml.UnmarshalTable(pluginAst, plugin)
if err != nil {
return err
}
c.plugins[name] = plugin
c.pluginConfigurations[name] = cp
return nil
}