diff --git a/Godeps b/Godeps index 2ac95a90455eb..0550ddcc85cdd 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc +github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 diff --git a/agent/agent.go b/agent/agent.go index 1423ef773eeee..a213bb5ec723c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -242,28 +242,43 @@ func (a *Agent) flush() { } }(o) } - wg.Wait() } // flusher monitors the metrics input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { + var metricStream chan telegraf.Metric + stopFilter := make(chan struct{}, len(a.Config.Filters)) // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 200) - ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) + metricStream = metricC + if len(a.Config.Filters) != 0 { + for _, name := range a.Config.FiltersOrder { + filter := a.Config.Filters[name] + log.Printf("Filter %s is enabled", name) + metricStream = filter.Pipe(metricStream) + go func(f telegraf.Filter) { + f.Start(stopFilter) + }(filter) + } + } for { select { case <-shutdown: + //sending shutdown signal for all filters + for range a.Config.Filters { + stopFilter <- struct{}{} + } log.Println("Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: + case m := <-metricStream: for _, o := range a.Config.Outputs { o.AddMetric(m) } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 6681ad0739ec4..e12b2cd0aa34a 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,8 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/plugins/filters" + _ "github.com/influxdata/telegraf/plugins/filters/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" @@ -31,12 +33,16 @@ var fSampleConfig = flag.Bool("sample-config", false, var fPidfile = flag.String("pidfile", "", "file to write our pid to") var fInputFilters = flag.String("input-filter", "", "filter the inputs to enable, separator is :") +var fFilterFilters = flag.String("filter-filter", "", + "filter to be enabled, separator is :") var fInputList = flag.Bool("input-list", false, "print available input plugins.") var fOutputFilters = flag.String("output-filter", "", "filter the outputs to enable, separator is :") var fOutputList = flag.Bool("output-list", false, "print available output plugins.") +var fFilterList = flag.Bool("filter-list", false, + "print available filter plugins.") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") var fInputFiltersLegacy = flag.String("filter", "", @@ -70,6 +76,8 @@ The flags are: -input-list print all the plugins inputs -output-filter filter the output plugins to enable, separator is : -output-list print all the available outputs + -filter-filter filter the filter plugins to enable, separator is : + -filter-list print all the available filters -usage print usage for a plugin, ie, 'telegraf -usage mysql' -debug print metrics as they're generated to stdout -quiet run in quiet mode @@ -121,6 +129,11 @@ func main() { inputFilters = strings.Split(":"+inputFilter+":", ":") } + var filterFilters []string + if *fFilterFilters != "" { + filterFilters = strings.Split(":"+strings.TrimSpace(*fFilterFilters)+":", ":") + } + var outputFilters []string if *fOutputFiltersLegacy != "" { fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" + @@ -140,7 +153,7 @@ func main() { fmt.Println(v) return case "config": - config.PrintSampleConfig(inputFilters, outputFilters) + config.PrintSampleConfig(inputFilters, filterFilters, outputFilters) return } } @@ -153,6 +166,14 @@ func main() { return } + if *fFilterList { + fmt.Println("Available Filter Plugins:") + for k, _ := range filters.Filters { + fmt.Printf(" %s\n", k) + } + return + } + if *fInputList { fmt.Println("Available Input Plugins:") for k, _ := range inputs.Inputs { @@ -168,7 +189,7 @@ func main() { } if *fSampleConfig { - config.PrintSampleConfig(inputFilters, outputFilters) + config.PrintSampleConfig(inputFilters, filterFilters, outputFilters) return } diff --git a/filter.go b/filter.go new file mode 100644 index 0000000000000..410114d6846ef --- /dev/null +++ b/filter.go @@ -0,0 +1,15 @@ +package telegraf + +type Filter interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + //create pipe for filter + Pipe(in chan Metric) chan Metric + + // start the filter + Start(shutdown chan struct{}) +} diff --git a/internal/config/config.go b/internal/config/config.go index fdc9a8753e283..0b9b8bc602554 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -46,9 +47,11 @@ type Config struct { InputFilters []string OutputFilters []string - Agent *AgentConfig - Inputs []*internal_models.RunningInput - Outputs []*internal_models.RunningOutput + Agent *AgentConfig + Inputs []*internal_models.RunningInput + Outputs []*internal_models.RunningOutput + Filters map[string]telegraf.Filter + FiltersOrder []string } func NewConfig() *Config { @@ -63,6 +66,8 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*internal_models.RunningInput, 0), Outputs: make([]*internal_models.RunningOutput, 0), + Filters: make(map[string]telegraf.Filter), + FiltersOrder: make([]string, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -77,6 +82,14 @@ type AgentConfig struct { // ie, if Interval=10s then always collect on :00, :10, :20, etc. RoundInterval bool + // By default, precision will be set to the same timestamp order as the + // collection interval, with the maximum being 1s. + // ie, when interval = "10s", precision will be "1s" + // when interval = "250ms", precision will be "1ms" + // Precision will NOT be used for service inputs. It is up to each individual + // service input to set the timestamp at the appropriate precision. + Precision internal.Duration + // CollectionJitter is used to jitter the collection by a random amount. // Each plugin will sleep for a random time within jitter before collecting. // This can be used to avoid many plugins querying things like sysfs at the @@ -108,11 +121,10 @@ type AgentConfig struct { // does _not_ deactivate FlushInterval. FlushBufferWhenFull bool - // TODO(cam): Remove UTC and Precision parameters, they are no longer + // TODO(cam): Remove UTC and parameter, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability - UTC bool `toml:"utc"` - Precision string + UTC bool `toml:"utc"` // Debug is the option for running in debug mode Debug bool @@ -209,6 +221,11 @@ var header = `# Telegraf Configuration ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" + ## By default, precision will be set to the same timestamp order as the + ## collection interval, with the maximum being 1s. + ## Precision will NOT be used for service inputs, such as logparser and statsd. + ## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". + precision = "" ## Run telegraf in debug mode debug = false ## Run telegraf in quiet mode @@ -239,7 +256,7 @@ var serviceInputHeader = ` ` // PrintSampleConfig prints the sample config -func PrintSampleConfig(inputFilters []string, outputFilters []string) { +func PrintSampleConfig(inputFilters []string, filterFilters []string, outputFilters []string) { fmt.Printf(header) if len(outputFilters) != 0 { @@ -257,6 +274,17 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) { printFilteredOutputs(pnames, true) } + if len(filterFilters) != 0 { + printFilteredFilters(filterFilters, false) + } else { + var pnames []string + for pname := range filters.Filters { + pnames = append(pnames, pname) + } + sort.Strings(pnames) + printFilteredFilters(pnames, true) + } + fmt.Printf(inputHeader) if len(inputFilters) != 0 { printFilteredInputs(inputFilters, false) @@ -315,6 +343,20 @@ func printFilteredInputs(inputFilters []string, commented bool) { } } +func printFilteredFilters(printOnly []string, commented bool) { + var names []string + for item := range filters.Filters { + if sliceContains(item, printOnly) { + names = append(names, item) + } + } + sort.Strings(names) + for _, name := range names { + creator := filters.Filters[name] + filter := creator() + printConfig(name, filter, "filters", commented) + } +} func printFilteredOutputs(outputFilters []string, commented bool) { // Filter outputs var onames []string @@ -516,6 +558,25 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filter": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + if err = c.addFilter(pluginName, pluginSubTable); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addFilter(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } + // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -527,6 +588,13 @@ func (c *Config) LoadConfig(path string) error { return nil } +// trimBOM trims the Byte-Order-Marks from the beginning of the file. +// this is for Windows compatability only. +// see https://github.com/influxdata/telegraf/issues/1378 +func trimBOM(f []byte) []byte { + return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf")) +} + // parseFile loads a TOML configuration from a provided path and // returns the AST produced from the TOML parser. When loading the file, it // will find environment variables and replace them. @@ -535,6 +603,8 @@ func parseFile(fpath string) (*ast.Table, error) { if err != nil { return nil, err } + // ugh windows why + contents = trimBOM(contents) env_vars := envVarRe.FindAll(contents, -1) for _, env_var := range env_vars { @@ -583,6 +653,25 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return nil } +func (c *Config) addFilter(name string, table *ast.Table) error { + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + if _, ok = c.Filters[name]; ok { + return fmt.Errorf("Filter already defined %s", name) + } + c.Filters[name] = filter + c.FiltersOrder = append(c.FiltersOrder, name) + return nil +} + func (c *Config) addInput(name string, table *ast.Table) error { if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) { return nil diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 0000000000000..7d6a9242b5b0b --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/filters/histogram" +) diff --git a/plugins/filters/histogram/aggregate.go b/plugins/filters/histogram/aggregate.go new file mode 100644 index 0000000000000..2b233a72b28c0 --- /dev/null +++ b/plugins/filters/histogram/aggregate.go @@ -0,0 +1,65 @@ +package histogram + +import ( + "github.com/VividCortex/gohistogram" + "math" +) + +type Aggregate struct { + hist *gohistogram.NumericHistogram + sum float64 + max float64 + min float64 +} + +func (a *Aggregate) Add(n float64) { + a.sum += n + if a.max < n { + a.max = n + } + if a.min > n { + a.min = n + } + a.hist.Add(n) +} + +func (a *Aggregate) Quantile(q float64) float64 { + return a.hist.Quantile(q) +} + +func (a *Aggregate) Sum() float64 { + return a.sum +} + +func (a *Aggregate) CDF(x float64) float64 { + return a.hist.CDF(x) +} + +func (a *Aggregate) Mean() float64 { + return a.hist.Mean() +} + +func (a *Aggregate) Variance() float64 { + return a.hist.Variance() +} + +func (a *Aggregate) Count() float64 { + return a.hist.Count() +} + +func (a *Aggregate) Max() float64 { + return a.max +} + +func (a *Aggregate) Min() float64 { + return a.min +} + +func NewAggregate(n int) *Aggregate { + return &Aggregate{ + hist: gohistogram.NewHistogram(n), + max: math.SmallestNonzeroFloat64, + min: math.MaxFloat64, + sum: 0, + } +} diff --git a/plugins/filters/histogram/histogram.go b/plugins/filters/histogram/histogram.go new file mode 100644 index 0000000000000..bd824b753b3da --- /dev/null +++ b/plugins/filters/histogram/histogram.go @@ -0,0 +1,290 @@ +package histogram + +import ( + "crypto/sha1" + "github.com/gobwas/glob" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/filters" + "io" + "log" + "regexp" + "sort" + "strconv" + "strings" + "time" +) + +const field_sep = "." + +type metricID struct { + Name string + TagHash [sha1.Size]byte +} + +type rollup struct { + Name string + Measurements []string + Tags map[string]string + Functions []string + Pass bool +} +type Histogram struct { + inch chan telegraf.Metric + outch chan telegraf.Metric + FlushInterval string + interval time.Duration + Bucketsize int + Rollup []string + rules []rollup + rollupMap map[metricID]*rollup + fieldMap map[metricID]map[string]*Aggregate + metricTags map[metricID]map[string]string + matchGlobs map[string]glob.Glob +} + +func (h *Histogram) Description() string { + return "Histogram: read metrics from inputs and create histogram for output" +} + +func (h *Histogram) SampleConfig() string { + return ` + ## Histogram Filter + ## This filter can be used to generate + ## values generated are approxmation please refer to + ## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm + ## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + [[filter.histogram]] + ## bucket size if increase it will increase accuracy of mean, variance and percentile + ## but it will increase memory usage + bucketsize = 20 + ## rollup to create + ## Name is the name of rollup. + ## Tag is list of tags to match against metric ex (Tag key1 value1) (Tag key2 value2) + ## Tag value does support glob matching + ## Measurements list of mesurment to match against metrics (Measurements cpu* en*) + ## Functions list to be applied on matched metrics + ## Pass not to drop the original metric default false ex (Pass) + ## supported functions sum, min, max, mean, variance, numbers for percentile ex 0.90 + rollup = [ + "(Name new) (Tag interface en*) (Functions mean 0.90)", + "(Name cpu_value) (Measurements cpu) (Functions mean sum)", + ] +` +} + +func (h *Histogram) Pipe(in chan telegraf.Metric) chan telegraf.Metric { + h.inch = in + h.outch = make(chan telegraf.Metric, 10000) + return h.outch +} + +func (h *Histogram) processLine(list []string) { + var r rollup + re := regexp.MustCompile("\\(|\\)") + for _, item := range list { + item = re.ReplaceAllString(item, "") + item = strings.Trim(item, " ") + match := strings.Split(item, " ") + if len(match) == 0 { + match = []string{item} + } + switch match[0] { + case "Name": + r.Name = match[1] + case "Tag": + if len(match) < 3 { + log.Printf("Each Tag should consist of a name and a value (Tag tag-name tag-value)") + continue + } + if r.Tags == nil { + r.Tags = make(map[string]string) + } + r.Tags[match[1]] = match[2] + case "Measurements": + r.Measurements = match[1:] + case "Functions": + r.Functions = match[1:] + case "Pass": + r.Pass = true + default: + log.Printf("Unkown command (%s)", match[0]) + } + } + if r.Name != "" { + h.rules = append(h.rules, r) + } else { + log.Printf("Each rollup should have a name (Name [rollup name])") + } +} + +func (h *Histogram) parseRollup() { + re := regexp.MustCompile("([^)]+)") + for _, item := range h.Rollup { + list := re.FindAllString(item, -1) + if list == nil { + log.Printf("Please make sure that rollup well formated (%s).", item) + continue + } + h.processLine(list) + } +} + +func (h *Histogram) Start(shutdown chan struct{}) { + interval, _ := time.ParseDuration(h.FlushInterval) + ticker := time.NewTicker(interval) + h.parseRollup() + for { + select { + case m := <-h.inch: + r, ok := h.matchMetric(m) + if ok { + h.AddMetric(m, r) + if r.Pass { + h.outch <- m + } + } else { + h.outch <- m + } + case <-shutdown: + log.Printf("Shuting down filters, All metric in the queue will be lost.") + return + case <-ticker.C: + h.OutputMetric() + } + } +} + +func (h *Histogram) hashTags(m map[string]string) (result [sha1.Size]byte) { + hash := sha1.New() + keys := []string{} + for key := range m { + keys = append(keys, key) + } + sort.Strings(keys) + for _, item := range keys { + io.WriteString(hash, item+m[item]) + } + copy(result[:], hash.Sum(nil)) + return result +} + +func (h *Histogram) AddMetric(metric telegraf.Metric, r *rollup) { + mID := metricID{ + Name: r.Name, + TagHash: h.hashTags(metric.Tags()), + } + h.rollupMap[mID] = r + if h.fieldMap[mID] == nil { + h.fieldMap[mID] = make(map[string]*Aggregate) + } + if h.metricTags[mID] == nil { + h.metricTags[mID] = make(map[string]string) + } + h.metricTags[mID] = metric.Tags() + for key, val := range metric.Fields() { + switch v := val.(type) { + case float64: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(v) + case int: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(float64(v)) + case int64: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(float64(v)) + default: + log.Printf("When Histogram enabled all the fields should be of type float64 [field name %s]", key) + } + } +} + +func (h *Histogram) matchMetric(metric telegraf.Metric) (*rollup, bool) { + glob := func(toMatch string, matchTo []string) bool { + for _, item := range matchTo { + g, ok := h.matchGlobs[item] + if !ok { + h.matchGlobs[item] = glob.MustCompile(item) + g = h.matchGlobs[item] + } + if g.Match(toMatch) { + return true + } + } + return false + } + for _, y := range h.rules { + mTags := metric.Tags() + for k, v := range y.Tags { + tValue, ok := mTags[k] + if ok { + l := []string{v} + if glob(tValue, l) { + return &y, true + } + } + } + if glob(metric.Name(), y.Measurements) { + return &y, true + } + } + return nil, false +} +func (h *Histogram) OutputMetric() { + for mID, fields := range h.fieldMap { + mFields := make(map[string]interface{}) + for key, val := range fields { + for _, x := range h.rollupMap[mID].Functions { + p, err := strconv.ParseFloat(x, 64) + if err == nil { + mFields[key+field_sep+"p"+x] = val.Quantile(p) + continue + } + switch x { + case "variance": + mFields[key+field_sep+"variance"] = val.Variance() + case "mean": + mFields[key+field_sep+"mean"] = val.Mean() + case "count": + mFields[key+field_sep+"count"] = val.Count() + case "sum": + mFields[key+field_sep+"sum"] = val.Sum() + case "max": + mFields[key+field_sep+"max"] = val.Max() + case "min": + mFields[key+field_sep+"min"] = val.Min() + } + } + } + metric, _ := telegraf.NewMetric(mID.Name, h.metricTags[mID], mFields, time.Now().UTC()) + h.outch <- metric + delete(h.rollupMap, mID) + delete(h.fieldMap, mID) + delete(h.metricTags, mID) + } +} + +func (h *Histogram) Reset() { + h.fieldMap = make(map[metricID]map[string]*Aggregate) + h.metricTags = make(map[metricID]map[string]string) + h.rollupMap = make(map[metricID]*rollup) +} + +func init() { + filters.Add("histogram", func() telegraf.Filter { + return &Histogram{ + fieldMap: make(map[metricID]map[string]*Aggregate), + metricTags: make(map[metricID]map[string]string), + rollupMap: make(map[metricID]*rollup), + matchGlobs: make(map[string]glob.Glob), + } + }) +} diff --git a/plugins/filters/histogram/histogram_test.go b/plugins/filters/histogram/histogram_test.go new file mode 100644 index 0000000000000..d48b682be665a --- /dev/null +++ b/plugins/filters/histogram/histogram_test.go @@ -0,0 +1,132 @@ +package histogram + +import ( + "github.com/gobwas/glob" + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestHistFirstPass(t *testing.T) { + hist := &Histogram{ + fieldMap: make(map[metricID]map[string]*Aggregate), + metricTags: make(map[metricID]map[string]string), + rollupMap: make(map[metricID]*rollup), + matchGlobs: make(map[string]glob.Glob), + FlushInterval: "1s", + Bucketsize: 20, + Rollup: []string{ + "(Name new) (Tag interface en*) (Functions mean 0.90)", + }, + } + shutdown := make(chan struct{}) + in := make(chan telegraf.Metric) + out := hist.Pipe(in) + go func() { + hist.Start(shutdown) + }() + metric, _ := telegraf.NewMetric("metric_1", map[string]string{ + "name": "ali", + }, map[string]interface{}{ + "ed": 1.1, + }, time.Now().UTC()) + in <- metric + item := <-out + shutdown <- struct{}{} + mTags := item.Tags() + fields := metric.Fields() + assert.Equal(t, item.Name(), "metric_1", "Metric name match") + ed, ok := fields["ed"].(float64) + assert.True(t, ok, "Field value is not flaot") + assert.Equal(t, ed, 1.1, "Field match") + assert.Equal(t, mTags["name"], mTags["name"], "Field match") +} + +func TestHistFirstAggregate(t *testing.T) { + hist := &Histogram{ + fieldMap: make(map[metricID]map[string]*Aggregate), + metricTags: make(map[metricID]map[string]string), + rollupMap: make(map[metricID]*rollup), + matchGlobs: make(map[string]glob.Glob), + FlushInterval: "1s", + Bucketsize: 20, + Rollup: []string{ + "(Name rollup_1) (Measurements metric_1) (Functions mean 0.90)", + }, + } + shutdown := make(chan struct{}) + in := make(chan telegraf.Metric) + out := hist.Pipe(in) + go func() { + hist.Start(shutdown) + }() + metric, _ := telegraf.NewMetric("metric_1", map[string]string{ + "name": "ali", + }, map[string]interface{}{ + "ed": 1.1, + }, time.Now().UTC()) + in <- metric + item := <-out + shutdown <- struct{}{} + mTags := item.Tags() + fields := item.Fields() + assert.Equal(t, item.Name(), "rollup_1", "Metric name match") + if assert.NotNil(t, fields["ed.mean"], "Mean is present") { + ed, ok := fields["ed.mean"].(float64) + assert.True(t, ok, "Field value is not flaot") + assert.Equal(t, ed, 1.1, "Field match") + assert.Equal(t, mTags["name"], mTags["name"], "Field match") + } + if assert.NotNil(t, fields["ed.p0.90"], "0.90 perc not present") { + ed, ok := fields["ed.p0.90"].(float64) + assert.True(t, ok, "Field value is not flaot") + assert.Equal(t, ed, 1.1, "Field match") + assert.Equal(t, mTags["name"], mTags["name"], "Field match") + } +} + +func TestHistPassOldAggregate(t *testing.T) { + hist := &Histogram{ + fieldMap: make(map[metricID]map[string]*Aggregate), + metricTags: make(map[metricID]map[string]string), + rollupMap: make(map[metricID]*rollup), + matchGlobs: make(map[string]glob.Glob), + FlushInterval: "1s", + Bucketsize: 20, + Rollup: []string{ + "(Name rollup_1) (Measurements metric_1) (Functions mean 0.90) (Pass)", + }, + } + shutdown := make(chan struct{}) + in := make(chan telegraf.Metric) + out := hist.Pipe(in) + go func() { + hist.Start(shutdown) + }() + metric, _ := telegraf.NewMetric("metric_1", map[string]string{ + "name": "ali", + }, map[string]interface{}{ + "ed": 1.1, + }, time.Now().UTC()) + in <- metric + originalMetric := <-out + assert.Equal(t, originalMetric.Name(), "metric_1", "Original matric should present (Pass flag exists)") + item := <-out + shutdown <- struct{}{} + mTags := item.Tags() + fields := item.Fields() + assert.Equal(t, item.Name(), "rollup_1", "Metric name match") + if assert.NotNil(t, fields["ed.mean"], "Mean is present") { + ed, ok := fields["ed.mean"].(float64) + assert.True(t, ok, "Field value is not flaot") + assert.Equal(t, ed, 1.1, "Field match") + assert.Equal(t, mTags["name"], mTags["name"], "Field match") + } + if assert.NotNil(t, fields["ed.p0.90"], "0.90 perc not present") { + ed, ok := fields["ed.p0.90"].(float64) + assert.True(t, ok, "Field value is not flaot") + assert.Equal(t, ed, 1.1, "Field match") + assert.Equal(t, mTags["name"], mTags["name"], "Field match") + } +} diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 0000000000000..e391762849899 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Filter + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}