From 829b0d5279a555bc90e8b13d1bed21d8ec4ef768 Mon Sep 17 00:00:00 2001 From: Ali Alrahahleh Date: Sat, 11 Jun 2016 00:23:41 -0700 Subject: [PATCH] Add Histogram support aggregation --- Godeps | 1 + agent/agent.go | 21 ++- cmd/telegraf/telegraf.go | 25 +++- filter.go | 15 +++ internal/config/config.go | 104 +++++++++++++- plugins/filters/all/all.go | 5 + plugins/filters/histogram/aggregate.go | 65 +++++++++ plugins/filters/histogram/histogram.go | 180 +++++++++++++++++++++++++ plugins/filters/registry.go | 11 ++ 9 files changed, 415 insertions(+), 12 deletions(-) create mode 100644 filter.go create mode 100644 plugins/filters/all/all.go create mode 100644 plugins/filters/histogram/aggregate.go create mode 100644 plugins/filters/histogram/histogram.go create mode 100644 plugins/filters/registry.go diff --git a/Godeps b/Godeps index 2ac95a90455eb..0550ddcc85cdd 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc +github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 diff --git a/agent/agent.go b/agent/agent.go index 1423ef773eeee..a213bb5ec723c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -242,28 +242,43 @@ func (a *Agent) flush() { } }(o) } - wg.Wait() } // flusher monitors the metrics input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { + var metricStream chan telegraf.Metric + stopFilter := make(chan struct{}, len(a.Config.Filters)) // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 200) - ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) + metricStream = metricC + if len(a.Config.Filters) != 0 { + for _, name := range a.Config.FiltersOrder { + filter := a.Config.Filters[name] + log.Printf("Filter %s is enabled", name) + metricStream = filter.Pipe(metricStream) + go func(f telegraf.Filter) { + f.Start(stopFilter) + }(filter) + } + } for { select { case <-shutdown: + //sending shutdown signal for all filters + for range a.Config.Filters { + stopFilter <- struct{}{} + } log.Println("Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: + case m := <-metricStream: for _, o := range a.Config.Outputs { o.AddMetric(m) } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 6681ad0739ec4..dde8389288b16 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,8 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/plugins/filters" + _ "github.com/influxdata/telegraf/plugins/filters/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" @@ -31,12 +33,16 @@ var fSampleConfig = flag.Bool("sample-config", false, var fPidfile = flag.String("pidfile", "", "file to write our pid to") var fInputFilters = flag.String("input-filter", "", "filter the inputs to enable, separator is :") +var fFilterFilters = flag.String("filter-filter", "", + "filter to be enabled, separator is :") var fInputList = flag.Bool("input-list", false, "print available input plugins.") var fOutputFilters = flag.String("output-filter", "", "filter the outputs to enable, separator is :") var fOutputList = flag.Bool("output-list", false, "print available output plugins.") +var fFilterList = flag.Bool("filter-list", false, + "print available filter plugins.") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") var fInputFiltersLegacy = flag.String("filter", "", @@ -70,6 +76,8 @@ The flags are: -input-list print all the plugins inputs -output-filter filter the output plugins to enable, separator is : -output-list print all the available outputs + -filter-filter filter the filter plugins to enable, separator is : + -filter-list print all the available filters -usage print usage for a plugin, ie, 'telegraf -usage mysql' -debug print metrics as they're generated to stdout -quiet run in quiet mode @@ -121,6 +129,11 @@ func main() { inputFilters = strings.Split(":"+inputFilter+":", ":") } + var filterFilters []string + if *fFilterFilters != "" { + filterFilters = strings.Split(":" + strings.TrimSpace(*fFilterFilters) + ":", ":") + } + var outputFilters []string if *fOutputFiltersLegacy != "" { fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" + @@ -140,7 +153,7 @@ func main() { fmt.Println(v) return case "config": - config.PrintSampleConfig(inputFilters, outputFilters) + config.PrintSampleConfig(inputFilters, filterFilters, outputFilters) return } } @@ -153,6 +166,14 @@ func main() { return } + if *fFilterList { + fmt.Println("Available Filter Plugins:") + for k, _ := range filters.Filters { + fmt.Printf(" %s\n", k) + } + return + } + if *fInputList { fmt.Println("Available Input Plugins:") for k, _ := range inputs.Inputs { @@ -168,7 +189,7 @@ func main() { } if *fSampleConfig { - config.PrintSampleConfig(inputFilters, outputFilters) + config.PrintSampleConfig(inputFilters, filterFilters, outputFilters) return } diff --git a/filter.go b/filter.go new file mode 100644 index 0000000000000..410114d6846ef --- /dev/null +++ b/filter.go @@ -0,0 +1,15 @@ +package telegraf + +type Filter interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + //create pipe for filter + Pipe(in chan Metric) chan Metric + + // start the filter + Start(shutdown chan struct{}) +} diff --git a/internal/config/config.go b/internal/config/config.go index fdc9a8753e283..cf78be7e80433 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -46,9 +47,11 @@ type Config struct { InputFilters []string OutputFilters []string - Agent *AgentConfig - Inputs []*internal_models.RunningInput - Outputs []*internal_models.RunningOutput + Agent *AgentConfig + Inputs []*internal_models.RunningInput + Outputs []*internal_models.RunningOutput + Filters map[string]telegraf.Filter + FiltersOrder []string } func NewConfig() *Config { @@ -63,6 +66,8 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*internal_models.RunningInput, 0), Outputs: make([]*internal_models.RunningOutput, 0), + Filters: make(map[string]telegraf.Filter), + FiltersOrder: make([]string, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -77,6 +82,14 @@ type AgentConfig struct { // ie, if Interval=10s then always collect on :00, :10, :20, etc. RoundInterval bool + // By default, precision will be set to the same timestamp order as the + // collection interval, with the maximum being 1s. + // ie, when interval = "10s", precision will be "1s" + // when interval = "250ms", precision will be "1ms" + // Precision will NOT be used for service inputs. It is up to each individual + // service input to set the timestamp at the appropriate precision. + Precision internal.Duration + // CollectionJitter is used to jitter the collection by a random amount. // Each plugin will sleep for a random time within jitter before collecting. // This can be used to avoid many plugins querying things like sysfs at the @@ -108,11 +121,10 @@ type AgentConfig struct { // does _not_ deactivate FlushInterval. FlushBufferWhenFull bool - // TODO(cam): Remove UTC and Precision parameters, they are no longer + // TODO(cam): Remove UTC and parameter, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability - UTC bool `toml:"utc"` - Precision string + UTC bool `toml:"utc"` // Debug is the option for running in debug mode Debug bool @@ -209,6 +221,11 @@ var header = `# Telegraf Configuration ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## By default, precision will be set to the same timestamp order as the + ## collection interval, with the maximum being 1s. + ## Precision will NOT be used for service inputs, such as logparser and statsd. + ## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". + precision = "" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode @@ -239,7 +256,7 @@ var serviceInputHeader = ` ` // PrintSampleConfig prints the sample config -func PrintSampleConfig(inputFilters []string, outputFilters []string) { +func PrintSampleConfig(inputFilters []string, filterFilters []string, outputFilters []string) { fmt.Printf(header) if len(outputFilters) != 0 { @@ -257,6 +274,18 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) { printFilteredOutputs(pnames, true) } + if len(filterFilters) != 0 { + printFilteredFilters(filterFilters, false) + } else { + var pnames []string + for pname := range filters.Filters { + pnames = append(pnames, pname) + } + sort.Strings(pnames) + printFilteredFilters(pnames, true) + } + + fmt.Printf(inputHeader) if len(inputFilters) != 0 { printFilteredInputs(inputFilters, false) @@ -315,6 +344,20 @@ func printFilteredInputs(inputFilters []string, commented bool) { } } +func printFilteredFilters(printOnly []string, commented bool) { + var names []string + for item := range filters.Filters { + if sliceContains(item, printOnly) { + names = append(names, item) + } + } + sort.Strings(names) + for _, name := range names { + creator := filters.Filters[name] + filter := creator() + printConfig(name, filter, "filters", commented) + } +} func printFilteredOutputs(outputFilters []string, commented bool) { // Filter outputs var onames []string @@ -516,6 +559,25 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filter": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + if err = c.addFilter(pluginName, pluginSubTable); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addFilter(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } + // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -527,6 +589,13 @@ func (c *Config) LoadConfig(path string) error { return nil } +// trimBOM trims the Byte-Order-Marks from the beginning of the file. +// this is for Windows compatability only. +// see https://github.com/influxdata/telegraf/issues/1378 +func trimBOM(f []byte) []byte { + return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf")) +} + // parseFile loads a TOML configuration from a provided path and // returns the AST produced from the TOML parser. When loading the file, it // will find environment variables and replace them. @@ -535,6 +604,8 @@ func parseFile(fpath string) (*ast.Table, error) { if err != nil { return nil, err } + // ugh windows why + contents = trimBOM(contents) env_vars := envVarRe.FindAll(contents, -1) for _, env_var := range env_vars { @@ -583,6 +654,25 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return nil } +func (c *Config) addFilter(name string, table *ast.Table) error { + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + if _, ok = c.Filters[name]; ok { + return fmt.Errorf("Filter already defined %s", name) + } + c.Filters[name] = filter + c.FiltersOrder = append(c.FiltersOrder, name) + return nil +} + func (c *Config) addInput(name string, table *ast.Table) error { if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) { return nil diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 0000000000000..7d6a9242b5b0b --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/filters/histogram" +) diff --git a/plugins/filters/histogram/aggregate.go b/plugins/filters/histogram/aggregate.go new file mode 100644 index 0000000000000..2b233a72b28c0 --- /dev/null +++ b/plugins/filters/histogram/aggregate.go @@ -0,0 +1,65 @@ +package histogram + +import ( + "github.com/VividCortex/gohistogram" + "math" +) + +type Aggregate struct { + hist *gohistogram.NumericHistogram + sum float64 + max float64 + min float64 +} + +func (a *Aggregate) Add(n float64) { + a.sum += n + if a.max < n { + a.max = n + } + if a.min > n { + a.min = n + } + a.hist.Add(n) +} + +func (a *Aggregate) Quantile(q float64) float64 { + return a.hist.Quantile(q) +} + +func (a *Aggregate) Sum() float64 { + return a.sum +} + +func (a *Aggregate) CDF(x float64) float64 { + return a.hist.CDF(x) +} + +func (a *Aggregate) Mean() float64 { + return a.hist.Mean() +} + +func (a *Aggregate) Variance() float64 { + return a.hist.Variance() +} + +func (a *Aggregate) Count() float64 { + return a.hist.Count() +} + +func (a *Aggregate) Max() float64 { + return a.max +} + +func (a *Aggregate) Min() float64 { + return a.min +} + +func NewAggregate(n int) *Aggregate { + return &Aggregate{ + hist: gohistogram.NewHistogram(n), + max: math.SmallestNonzeroFloat64, + min: math.MaxFloat64, + sum: 0, + } +} diff --git a/plugins/filters/histogram/histogram.go b/plugins/filters/histogram/histogram.go new file mode 100644 index 0000000000000..a491597c4bc7a --- /dev/null +++ b/plugins/filters/histogram/histogram.go @@ -0,0 +1,180 @@ +package histogram + +import ( + "crypto/sha1" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/filters" + "io" + "log" + "sort" + "strconv" + "time" +) + +const field_sep = "." + +type metricID struct { + Name string + TagHash [sha1.Size]byte +} +type Histogram struct { + inch chan telegraf.Metric + outch chan telegraf.Metric + FlushInterval string + interval time.Duration + Bucketsize int + Metrics map[string][]float64 + fieldMap map[metricID]map[string]*Aggregate + metricTags map[metricID]map[string]string +} + +func (h *Histogram) Description() string { + return "Histogram: read metrics from inputs and create histogram for output" +} + +func (h *Histogram) SampleConfig() string { + return ` + ## Histogram Filter + ## This filter can be used to generate + ## (mean varince percentile count) + ## values generated are approxmation please refer to + ## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm + ## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + [[filter.histogram]] + ## bucket size if increase it will increase accuracy + ## but it will increase memory usage + bucketsize = 20 + [filter.histogram.metrics] + ## if array is empty only count mean + ## and variance will be cacluated. _ALL_METRIC special constanct + ## can be used instead of metric name this will aggregate all the + ## the merrtics for this filer + ## metric name = [percentiles] + tail = [0.90] +` +} + +func (h *Histogram) Pipe(in chan telegraf.Metric) chan telegraf.Metric { + h.inch = in + h.outch = make(chan telegraf.Metric, 10000) + return h.outch +} + +func (h *Histogram) Start(shutdown chan struct{}) { + interval, _ := time.ParseDuration(h.FlushInterval) + ticker := time.NewTicker(interval) + for { + select { + case m := <-h.inch: + if h.IsEnabled(m.Name()) { + h.AddMetric(m) + } else { + h.outch <- m + } + case <-shutdown: + log.Printf("Shuting down filters, All metric in the queue will be lost.") + return + case <-ticker.C: + h.OutputMetric() + } + } +} + +func (h *Histogram) hashTags(m map[string]string) (result [sha1.Size]byte) { + hash := sha1.New() + keys := []string{} + for key := range m { + keys = append(keys, key) + } + sort.Strings(keys) + for _, item := range keys { + io.WriteString(hash, item+m[item]) + } + copy(result[:], hash.Sum(nil)) + return result +} + +func (h *Histogram) AddMetric(metric telegraf.Metric) { + mID := metricID{ + Name: metric.Name(), + TagHash: h.hashTags(metric.Tags()), + } + if h.fieldMap[mID] == nil { + h.fieldMap[mID] = make(map[string]*Aggregate) + } + if h.metricTags[mID] == nil { + h.metricTags[mID] = make(map[string]string) + } + h.metricTags[mID] = metric.Tags() + for key, val := range metric.Fields() { + switch v := val.(type) { + case float64: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(v) + case int: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(float64(v)) + case int64: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(float64(v)) + default: + log.Printf("When Histogram enabled all the fields should be of type float64 [field name %s]", key) + } + } +} + +func (h *Histogram) IsEnabled(name string) bool { + _, isAllEnabled := h.Metrics["_ALL_METRIC"] + _, ok := h.Metrics[name] + return ok || isAllEnabled +} + +func (h *Histogram) OutputMetric() { + all_percentile := h.Metrics["_ALL_METRIC"] + for mID, fields := range h.fieldMap { + mFields := make(map[string]interface{}) + for key, val := range fields { + percentile, ok := h.Metrics[mID.Name] + if !ok { + percentile = all_percentile + } + for _, perc := range percentile { + p := strconv.FormatFloat(perc*100, 'f', 0, 64) + mFields[key+field_sep+"p"+p] = val.Quantile(perc) + } + mFields[key+field_sep+"variance"] = val.Variance() + mFields[key+field_sep+"mean"] = val.Mean() + mFields[key+field_sep+"count"] = val.Count() + mFields[key+field_sep+"sum"] = val.Sum() + mFields[key+field_sep+"max"] = val.Max() + mFields[key+field_sep+"min"] = val.Min() + } + metric, _ := telegraf.NewMetric(mID.Name, h.metricTags[mID], mFields, time.Now().UTC()) + h.outch <- metric + delete(h.fieldMap, mID) + delete(h.metricTags, mID) + } +} + +func (h *Histogram) Reset() { + h.fieldMap = make(map[metricID]map[string]*Aggregate) + h.metricTags = make(map[metricID]map[string]string) +} + +func init() { + filters.Add("histogram", func() telegraf.Filter { + return &Histogram{ + fieldMap: make(map[metricID]map[string]*Aggregate), + metricTags: make(map[metricID]map[string]string), + } + }) +} diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 0000000000000..e391762849899 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Filter + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}