Skip to content

Commit

Permalink
Add Histogram support aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Alrahahleh committed Jul 1, 2016
1 parent 06cb5a0 commit d09497a
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 6 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
Expand Down
21 changes: 18 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,28 +242,43 @@ func (a *Agent) flush() {
}
}(o)
}

wg.Wait()
}

// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
var metricStream chan telegraf.Metric
stopFilter := make(chan struct{}, len(a.Config.Filters))
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
metricStream = metricC
if len(a.Config.Filters) != 0 {
for _, name := range a.Config.FiltersOrder {
filter := a.Config.Filters[name]
log.Printf("Filter %s is enabled", name)
metricStream = filter.Pipe(metricStream)
go func(f telegraf.Filter) {
f.Start(stopFilter)
}(filter)
}
}

for {
select {
case <-shutdown:
//sending shutdown signal for all filters
for range a.Config.Filters {
stopFilter <- struct{}{}
}
log.Println("Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
case m := <-metricStream:
for _, o := range a.Config.Outputs {
o.AddMetric(m)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
_ "github.com/influxdata/telegraf/plugins/filters/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down
15 changes: 15 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package telegraf

type Filter interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

//create pipe for filter
Pipe(in chan Metric) chan Metric

// start the filter
Start(shutdown chan struct{})
}
49 changes: 46 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -46,9 +47,11 @@ type Config struct {
InputFilters []string
OutputFilters []string

Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Filters map[string]telegraf.Filter
FiltersOrder []string
}

func NewConfig() *Config {
Expand All @@ -63,6 +66,8 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*internal_models.RunningInput, 0),
Outputs: make([]*internal_models.RunningOutput, 0),
Filters: make(map[string]telegraf.Filter),
FiltersOrder: make([]string, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand Down Expand Up @@ -516,6 +521,25 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filter":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addFilter(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addFilter(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}

// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand Down Expand Up @@ -583,6 +607,25 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}

func (c *Config) addFilter(name string, table *ast.Table) error {
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

if _, ok = c.Filters[name]; ok {
return fmt.Errorf("Filter already defined %s", name)
}
c.Filters[name] = filter
c.FiltersOrder = append(c.FiltersOrder, name)
return nil
}

func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
Expand Down
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/histogram"
)
65 changes: 65 additions & 0 deletions plugins/filters/histogram/aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package histogram

import (
"github.com/VividCortex/gohistogram"
"math"
)

type Aggregate struct {
hist *gohistogram.NumericHistogram
sum float64
max float64
min float64
}

func (a *Aggregate) Add(n float64) {
a.sum += n
if a.max < n {
a.max = n
}
if a.min > n {
a.min = n
}
a.hist.Add(n)
}

func (a *Aggregate) Quantile(q float64) float64 {
return a.hist.Quantile(q)
}

func (a *Aggregate) Sum() float64 {
return a.sum
}

func (a *Aggregate) CDF(x float64) float64 {
return a.hist.CDF(x)
}

func (a *Aggregate) Mean() float64 {
return a.hist.Mean()
}

func (a *Aggregate) Variance() float64 {
return a.hist.Variance()
}

func (a *Aggregate) Count() float64 {
return a.hist.Count()
}

func (a *Aggregate) Max() float64 {
return a.max
}

func (a *Aggregate) Min() float64 {
return a.min
}

func NewAggregate(n int) *Aggregate {
return &Aggregate{
hist: gohistogram.NewHistogram(n),
max: math.SmallestNonzeroFloat64,
min: math.MaxFloat64,
sum: 0,
}
}
163 changes: 163 additions & 0 deletions plugins/filters/histogram/histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package histogram

import (
"crypto/sha1"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/filters"
"io"
"log"
"sort"
"strconv"
"time"
)

const field_sep = "."

type metricID struct {
Name string
TagHash [sha1.Size]byte
}
type Histogram struct {
inch chan telegraf.Metric
outch chan telegraf.Metric
FlushInterval string
interval time.Duration
Bucketsize int
Metrics map[string][]float64
fieldMap map[metricID]map[string]*Aggregate
metricTags map[metricID]map[string]string
}

func (h *Histogram) Description() string {
return "Histogram: read metrics from inputs and create histogram for output"
}

func (h *Histogram) SampleConfig() string {
return `
## Histogram Filter
## This filter can be used to generate
## (mean varince percentile count)
## values generated are approxmation please refer to
## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm
## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
##[[filter.histogram]]
## bucket size if increase it will increase accuracy
## but it will increase memory usage
## bucketsize = 20
## [filter.histogram.metrics]
## if array is empty only count mean
## and variance will be cacluated. _ALL_METRIC special constanct
## can be used instead of metric name this will aggregate all the
## the merrtics for this filer
## metric name = [percentiles]
## tail = [0.90]
`
}

func (h *Histogram) Pipe(in chan telegraf.Metric) chan telegraf.Metric {
h.inch = in
h.outch = make(chan telegraf.Metric, 10000)
return h.outch
}

func (h *Histogram) Start(shutdown chan struct{}) {
interval, _ := time.ParseDuration(h.FlushInterval)
ticker := time.NewTicker(interval)
for {
select {
case m := <-h.inch:
if h.IsEnabled(m.Name()) {
h.AddMetric(m)
} else {
h.outch <- m
}
case <-shutdown:
log.Printf("Shuting down filters, All metric in the queue will be lost.")
return
case <-ticker.C:
h.OutputMetric()
}
}
}

func (h *Histogram) hashTags(m map[string]string) (result [sha1.Size]byte) {
hash := sha1.New()
keys := []string{}
for key := range m {
keys = append(keys, key)
}
sort.Strings(keys)
for _, item := range keys {
io.WriteString(hash, item+m[item])
}
copy(result[:], hash.Sum(nil))
return result
}

func (h *Histogram) AddMetric(metric telegraf.Metric) {
mID := metricID{
Name: metric.Name(),
TagHash: h.hashTags(metric.Tags()),
}
if h.fieldMap[mID] == nil {
h.fieldMap[mID] = make(map[string]*Aggregate)
}
if h.metricTags[mID] == nil {
h.metricTags[mID] = make(map[string]string)
}
h.metricTags[mID] = metric.Tags()
for key, val := range metric.Fields() {
switch v := val.(type) {
case float64:
if h.fieldMap[mID][key] == nil {
h.fieldMap[mID][key] = NewAggregate(h.Bucketsize)
}
hist := h.fieldMap[mID][key]
hist.Add(v)
default:
log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key)
}
}
}

func (h *Histogram) IsEnabled(name string) bool {
_, isAllEnabled := h.Metrics["_ALL_METRIC"]
_, ok := h.Metrics[name]
return ok || isAllEnabled
}

func (h *Histogram) OutputMetric() {
for mID, fields := range h.fieldMap {
mFields := make(map[string]interface{})
for key, val := range fields {
for _, perc := range h.Metrics[mID.Name] {
p := strconv.FormatFloat(perc*100, 'f', 0, 64)
mFields[key+field_sep+"p"+p] = val.Quantile(perc)
}
mFields[key+field_sep+"variance"] = val.Variance()
mFields[key+field_sep+"mean"] = val.Mean()
mFields[key+field_sep+"count"] = val.Count()
mFields[key+field_sep+"sum"] = val.Sum()
mFields[key+field_sep+"max"] = val.Max()
mFields[key+field_sep+"min"] = val.Min()
}
metric, _ := telegraf.NewMetric(mID.Name, h.metricTags[mID], mFields, time.Now().UTC())
h.outch <- metric
delete(h.fieldMap, mID)
delete(h.metricTags, mID)
}
}

func (h *Histogram) Reset() {
h.fieldMap = make(map[metricID]map[string]*Aggregate)
h.metricTags = make(map[metricID]map[string]string)
}

func init() {
filters.Add("histogram", func() telegraf.Filter {
return &Histogram{
fieldMap: make(map[metricID]map[string]*Aggregate),
metricTags: make(map[metricID]map[string]string),
}
})
}
Loading

0 comments on commit d09497a

Please sign in to comment.