From b0c2322ea1c06c9f263778644d8081e65b4c7b14 Mon Sep 17 00:00:00 2001 From: Ali Alrahahleh Date: Tue, 14 Jun 2016 01:56:30 -0700 Subject: [PATCH] Add Histogram support aggregation --- agent/agent.go | 65 +++------------ cmd/telegraf/telegraf.go | 1 + filter.go | 19 +++++ internal/config/config.go | 67 ++++++++------- plugins/filters/all/all.go | 5 ++ plugins/filters/histogram/histogram.go | 110 +++++++++++++++++++++++++ plugins/filters/registry.go | 11 +++ 7 files changed, 192 insertions(+), 86 deletions(-) create mode 100644 filter.go create mode 100644 plugins/filters/all/all.go create mode 100644 plugins/filters/histogram/histogram.go create mode 100644 plugins/filters/registry.go diff --git a/agent/agent.go b/agent/agent.go index a9e8bc4be2464..a8343c67a58f1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2,11 +2,9 @@ package agent import ( "fmt" - "github.com/VividCortex/gohistogram" "log" "os" "runtime" - "strconv" "sync" "time" @@ -18,17 +16,13 @@ import ( // Agent runs telegraf and collects data based on the given config type Agent struct { - Config *config.Config - fieldMap map[string]map[string]*gohistogram.NumericHistogram - metricTags map[string]map[string]string + Config *config.Config } // NewAgent returns an Agent struct based off the given Config func NewAgent(config *config.Config) (*Agent, error) { a := &Agent{ - Config: config, - fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram), - metricTags: make(map[string]map[string]string), + Config: config, } if !a.Config.Agent.OmitHostname { @@ -241,7 +235,9 @@ func (a *Agent) flush() { for _, o := range a.Config.Outputs { go func(output *internal_models.RunningOutput) { defer wg.Done() - a.AddHistMetricToOutput(output) + if a.Config.Filter != nil { + a.Config.Filter.OutputMetric(output) + } err := output.Write() if err != nil { log.Printf("Error writing to output [%s]: %s\n", @@ -249,8 +245,10 @@ func (a *Agent) flush() { } }(o) } - wg.Wait() + if a.Config.Filter != nil { + a.Config.Filter.Reset() + } } // flusher monitors the metrics input channel and flushes on the minimum interval @@ -271,8 +269,8 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() case m := <-metricC: - if _, ok := a.Config.Agent.Histogram[m.Name()]; ok { - a.AddMetric(m) + if a.Config.Filter != nil && a.Config.Filter.IsEnabled(m.Name()) { + a.Config.Filter.AddMetric(m) } else { for _, o := range a.Config.Outputs { o.AddMetric(m) @@ -282,49 +280,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er } } -func (a *Agent) AddMetric(metric telegraf.Metric) { - name := metric.Name() - if a.fieldMap[name] == nil { - a.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram) - } - if a.metricTags[name] == nil { - a.metricTags[name] = make(map[string]string) - } - a.metricTags[name] = metric.Tags() - for key, val := range metric.Fields() { - switch v := val.(type) { - case float64: - if a.fieldMap[name][key] == nil { - a.fieldMap[name][key] = gohistogram.NewHistogram(a.Config.Agent.HistogramBuckSize) - } - hist := a.fieldMap[name][key] - hist.Add(v) - default: - log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key) - } - } -} - -func (a *Agent) AddHistMetricToOutput(output *internal_models.RunningOutput) { - for name, fields := range a.fieldMap { - mFields := make(map[string]interface{}) - for key, val := range fields { - for _, perc := range a.Config.Agent.Histogram[name] { - p := strconv.FormatFloat(perc*100, 'f', 0, 64) - mFields[key+"_p"+p] = val.Quantile(perc) - } - mFields[key+"_variance"] = val.Variance() - mFields[key+"_mean"] = val.Mean() - mFields[key+"_count"] = val.Count() - } - m, _ := telegraf.NewMetric(name, a.metricTags[name], mFields, time.Now().UTC()) - output.AddMetric(m) - - delete(a.fieldMap, name) - delete(a.metricTags, name) - } -} - // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 6681ad0739ec4..42a7c287b1430 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/filters/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" diff --git a/filter.go b/filter.go new file mode 100644 index 0000000000000..2df62a79e1c7d --- /dev/null +++ b/filter.go @@ -0,0 +1,19 @@ +package telegraf + +type Filter interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + //Output metric to outputs list + OutputMetric(output interface{}) + //Add metric to the middleware + AddMetric(metric Metric) + //Called on each metric to check if this middle ware enabled + //or not for that metric + IsEnabled(name string) bool + //clear metrics to output + Reset() +} diff --git a/internal/config/config.go b/internal/config/config.go index aec1970f726d2..fa53146cda47d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -49,16 +50,16 @@ type Config struct { Agent *AgentConfig Inputs []*internal_models.RunningInput Outputs []*internal_models.RunningOutput + Filter telegraf.Filter } func NewConfig() *Config { c := &Config{ // Agent defaults: Agent: &AgentConfig{ - Interval: internal.Duration{Duration: 10 * time.Second}, - RoundInterval: true, - FlushInterval: internal.Duration{Duration: 10 * time.Second}, - HistogramBuckSize: 20, + Interval: internal.Duration{Duration: 10 * time.Second}, + RoundInterval: true, + FlushInterval: internal.Duration{Duration: 10 * time.Second}, }, Tags: make(map[string]string), @@ -122,15 +123,6 @@ type AgentConfig struct { Quiet bool Hostname string OmitHostname bool - - // Supported Histogram method - // (metric_name) = list of percintil (0.95, 0.39) - // Note if Histogram is enabled for a metric All field will be - // Sampled - Histogram map[string][]float64 - - // Histogram bucketsize - HistogramBuckSize int } // Inputs returns a list of strings of the configured inputs. @@ -227,24 +219,6 @@ var header = `# Telegraf Configuration hostname = "" ## If set to true, do no set the "host" tag in the telegraf agent. omit_hostname = false - ## Supported Histogram method - ## (metric_name) = list of percintile (0.95, 0.39) - ## Note if Histogram is enabled for a metric. All field will - ## be Sampled - ## value generated are approxmation please refer to - ## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm - ## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf - ##[agent.histogram] - ## (metric_name) = (percintile list ex [0.95, 0.50]) - ## Empty array will generate count median and variance - ## (metric_name) = [] - - - ##Histogram bucket size - ##A larger bin size yields more accurate approximations at the - ##cost of increased memory utilization and performance - ##histogram_buck_size = 20 - ############################################################################### @@ -544,6 +518,21 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filter": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + if err = c.addFilter(pluginName, pluginSubTable); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + case []*ast.Table: + return fmt.Errorf("You can only specify one filter in plugin name %s, file %s.", pluginName, path) + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } + // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -611,6 +600,22 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return nil } +func (c *Config) addFilter(name string, table *ast.Table) error { + fmt.Printf("%v", filters.Filters) + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + c.Filter = filter + return nil +} + func (c *Config) addInput(name string, table *ast.Table) error { if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) { return nil diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 0000000000000..7d6a9242b5b0b --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/filters/histogram" +) diff --git a/plugins/filters/histogram/histogram.go b/plugins/filters/histogram/histogram.go new file mode 100644 index 0000000000000..524cb528c54cc --- /dev/null +++ b/plugins/filters/histogram/histogram.go @@ -0,0 +1,110 @@ +package histogram + +import ( + "github.com/VividCortex/gohistogram" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" + "log" + "strconv" + "sync" + "time" +) + +type Histogram struct { + sync.RWMutex + Bucketsize int + Metrics map[string][]float64 + fieldMap map[string]map[string]*gohistogram.NumericHistogram + metricTags map[string]map[string]string +} + +func (h *Histogram) Description() string { + return "Read Histogram-formatted JSON metrics from one or more HTTP endpoints" +} + +func (h *Histogram) SampleConfig() string { + return ` + ## Histogram Filter + ## This filter can be used to generate + ## (mean varince percentile count) + ## values generated are approxmation please refer to + ## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm + ## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + ##[filter.histogram] + ## bucket size if increase it will increase accuracy + ## but it will increase memory usage + ## bucketsize = 20 + ## [filter.histogram.metrics] + ## metric name = [percentiles] ## if array is empty only count mean + ## and variance will be cacluated + ## tail = [0.90] +` +} + +func (h *Histogram) AddMetric(metric telegraf.Metric) { + name := metric.Name() + h.Lock() + defer h.Unlock() + if h.fieldMap[name] == nil { + h.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram) + } + if h.metricTags[name] == nil { + h.metricTags[name] = make(map[string]string) + } + h.metricTags[name] = metric.Tags() + for key, val := range metric.Fields() { + switch v := val.(type) { + case float64: + if h.fieldMap[name][key] == nil { + h.fieldMap[name][key] = gohistogram.NewHistogram(h.Bucketsize) + } + hist := h.fieldMap[name][key] + hist.Add(v) + default: + log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key) + } + } +} + +func (h *Histogram) IsEnabled(name string) bool { + _, ok := h.Metrics[name] + return ok +} + +func (h *Histogram) OutputMetric(output interface{}) { + h.RLock() + defer h.RUnlock() + for name, fields := range h.fieldMap { + mFields := make(map[string]interface{}) + for key, val := range fields { + for _, perc := range h.Metrics[name] { + p := strconv.FormatFloat(perc*100, 'f', 0, 64) + mFields[key+"_p"+p] = val.Quantile(perc) + } + mFields[key+"_variance"] = val.Variance() + mFields[key+"_mean"] = val.Mean() + mFields[key+"_count"] = val.Count() + } + metric, _ := telegraf.NewMetric(name, h.metricTags[name], mFields, time.Now().UTC()) + if out, ok := output.(*internal_models.RunningOutput); ok { + out.AddMetric(metric) + } + } +} + +func (h *Histogram) Reset() { + h.Lock() + defer h.Unlock() + h.fieldMap = make(map[string]map[string]*gohistogram.NumericHistogram) + h.metricTags = make(map[string]map[string]string) +} + +func init() { + filters.Add("histogram", func() telegraf.Filter { + return &Histogram{ + fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram), + metricTags: make(map[string]map[string]string), + } + }) +} diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 0000000000000..e391762849899 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Filter + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}