Skip to content

Commit

Permalink
Add Histogram support aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Alrahahleh committed Jun 14, 2016
1 parent 3a9c6ba commit 710f97c
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 92 deletions.
82 changes: 21 additions & 61 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package agent

import (
"fmt"
"github.com/VividCortex/gohistogram"
"log"
"os"
"runtime"
"strconv"
"sync"
"time"

Expand All @@ -18,17 +16,13 @@ import (

// Agent runs telegraf and collects data based on the given config
type Agent struct {
Config *config.Config
fieldMap map[string]map[string]*gohistogram.NumericHistogram
metricTags map[string]map[string]string
Config *config.Config
}

// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *config.Config) (*Agent, error) {
a := &Agent{
Config: config,
fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram),
metricTags: make(map[string]map[string]string),
Config: config,
}

if !a.Config.Agent.OmitHostname {
Expand Down Expand Up @@ -241,25 +235,38 @@ func (a *Agent) flush() {
for _, o := range a.Config.Outputs {
go func(output *internal_models.RunningOutput) {
defer wg.Done()
a.AddHistMetricToOutput(output)
err := output.Write()
if err != nil {
log.Printf("Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
}

wg.Wait()
if a.Config.Filter != nil {
a.Config.Filter.Reset()
}
}

// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
var metricStream chan telegraf.Metric
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
if a.Config.Filter != nil {
log.Printf("Filter is enabled")
metricStream = make(chan telegraf.Metric)
filter := a.Config.Filter
go func() {
filter.InputChannel(metricC)
filter.OutputChannel(metricStream)
filter.Start(shutdown)
}()
} else {
metricStream = metricC
}

for {
select {
Expand All @@ -270,61 +277,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
if _, ok := a.Config.Agent.Histogram[m.Name()]; ok {
a.AddMetric(m)
} else {
for _, o := range a.Config.Outputs {
o.AddMetric(m)
}
case m := <-metricStream:
for _, o := range a.Config.Outputs {
o.AddMetric(m)
}
}
}
}

func (a *Agent) AddMetric(metric telegraf.Metric) {
name := metric.Name()
if a.fieldMap[name] == nil {
a.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram)
}
if a.metricTags[name] == nil {
a.metricTags[name] = make(map[string]string)
}
a.metricTags[name] = metric.Tags()
for key, val := range metric.Fields() {
switch v := val.(type) {
case float64:
if a.fieldMap[name][key] == nil {
a.fieldMap[name][key] = gohistogram.NewHistogram(a.Config.Agent.HistogramBuckSize)
}
hist := a.fieldMap[name][key]
hist.Add(v)
default:
log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key)
}
}
}

func (a *Agent) AddHistMetricToOutput(output *internal_models.RunningOutput) {
for name, fields := range a.fieldMap {
mFields := make(map[string]interface{})
for key, val := range fields {
for _, perc := range a.Config.Agent.Histogram[name] {
p := strconv.FormatFloat(perc*100, 'f', 0, 64)
mFields[key+"_p"+p] = val.Quantile(perc)
}
mFields[key+"_variance"] = val.Variance()
mFields[key+"_mean"] = val.Mean()
mFields[key+"_count"] = val.Count()
}
m, _ := telegraf.NewMetric(name, a.metricTags[name], mFields, time.Now().UTC())
output.AddMetric(m)

delete(a.fieldMap, name)
delete(a.metricTags, name)
}
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
_ "github.com/influxdata/telegraf/plugins/filters/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down
18 changes: 18 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package telegraf

type Filter interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

//set input channel for filter
InputChannel(in chan Metric)
//set input channel for filter
OutputChannel(out chan Metric)
// rest filter
Reset()
// start the filter
Start(shutdown chan struct{})
}
67 changes: 36 additions & 31 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -49,16 +50,16 @@ type Config struct {
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Filter telegraf.Filter
}

func NewConfig() *Config {
c := &Config{
// Agent defaults:
Agent: &AgentConfig{
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
HistogramBuckSize: 20,
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
},

Tags: make(map[string]string),
Expand Down Expand Up @@ -122,15 +123,6 @@ type AgentConfig struct {
Quiet bool
Hostname string
OmitHostname bool

// Supported Histogram method
// (metric_name) = list of percintil (0.95, 0.39)
// Note if Histogram is enabled for a metric All field will be
// Sampled
Histogram map[string][]float64

// Histogram bucketsize
HistogramBuckSize int
}

// Inputs returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -227,24 +219,6 @@ var header = `# Telegraf Configuration
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
## Supported Histogram method
## (metric_name) = list of percintile (0.95, 0.39)
## Note if Histogram is enabled for a metric. All field will
## be Sampled
## value generated are approxmation please refer to
## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm
## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
##[agent.histogram]
## (metric_name) = (percintile list ex [0.95, 0.50])
## Empty array will generate count median and variance
## (metric_name) = []
##Histogram bucket size
##A larger bin size yields more accurate approximations at the
##cost of increased memory utilization and performance
##histogram_buck_size = 20
###############################################################################
Expand Down Expand Up @@ -544,6 +518,21 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filter":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addFilter(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
return fmt.Errorf("You can only specify one filter in plugin name %s, file %s.", pluginName, path)
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}

// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand Down Expand Up @@ -611,6 +600,22 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}

func (c *Config) addFilter(name string, table *ast.Table) error {
fmt.Printf("%v", filters.Filters)
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

c.Filter = filter
return nil
}

func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
Expand Down
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/histogram"
)
Loading

0 comments on commit 710f97c

Please sign in to comment.