Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Histogram support aggregation #1364

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
Expand Down
21 changes: 18 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,28 +242,43 @@ func (a *Agent) flush() {
}
}(o)
}

wg.Wait()
}

// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
var metricStream chan telegraf.Metric
stopFilter := make(chan struct{}, len(a.Config.Filters))
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
metricStream = metricC
if len(a.Config.Filters) != 0 {
for _, name := range a.Config.FiltersOrder {
filter := a.Config.Filters[name]
log.Printf("Filter %s is enabled", name)
metricStream = filter.Pipe(metricStream)
go func(f telegraf.Filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this in a goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter suppose to work in is own context like other plugins it may have timer inside and i don't want to block the agent code from collecting metrics or flushing them. consider filter as pipe with input and output (black box). Histogram filter for example has timer for flushing metrics on specific intervals

f.Start(stopFilter)
}(filter)
}
}

for {
select {
case <-shutdown:
//sending shutdown signal for all filters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't filters just use the shutdown signal directly?

Copy link
Contributor Author

@alimousazy alimousazy Jul 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do that but I had problem with carrying the stop signal since the last receiver should not send the signal again or it will block for ever make it hard to shutdown telegraf process (clean shutdown)

for range a.Config.Filters {
stopFilter <- struct{}{}
}
log.Println("Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
case m := <-metricStream:
for _, o := range a.Config.Outputs {
o.AddMetric(m)
}
Expand Down
25 changes: 23 additions & 2 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/plugins/filters"
_ "github.com/influxdata/telegraf/plugins/filters/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand All @@ -31,12 +33,16 @@ var fSampleConfig = flag.Bool("sample-config", false,
var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fInputFilters = flag.String("input-filter", "",
"filter the inputs to enable, separator is :")
var fFilterFilters = flag.String("filter-filter", "",
"filter to be enabled, separator is :")
var fInputList = flag.Bool("input-list", false,
"print available input plugins.")
var fOutputFilters = flag.String("output-filter", "",
"filter the outputs to enable, separator is :")
var fOutputList = flag.Bool("output-list", false,
"print available output plugins.")
var fFilterList = flag.Bool("filter-list", false,
"print available filter plugins.")
var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'")
var fInputFiltersLegacy = flag.String("filter", "",
Expand Down Expand Up @@ -70,6 +76,8 @@ The flags are:
-input-list print all the plugins inputs
-output-filter filter the output plugins to enable, separator is :
-output-list print all the available outputs
-filter-filter filter the filter plugins to enable, separator is :
-filter-list print all the available filters
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
Expand Down Expand Up @@ -121,6 +129,11 @@ func main() {
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

var filterFilters []string
if *fFilterFilters != "" {
filterFilters = strings.Split(":"+strings.TrimSpace(*fFilterFilters)+":", ":")
}

var outputFilters []string
if *fOutputFiltersLegacy != "" {
fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" +
Expand All @@ -140,7 +153,7 @@ func main() {
fmt.Println(v)
return
case "config":
config.PrintSampleConfig(inputFilters, outputFilters)
config.PrintSampleConfig(inputFilters, filterFilters, outputFilters)
return
}
}
Expand All @@ -153,6 +166,14 @@ func main() {
return
}

if *fFilterList {
fmt.Println("Available Filter Plugins:")
for k, _ := range filters.Filters {
fmt.Printf(" %s\n", k)
}
return
}

if *fInputList {
fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs {
Expand All @@ -168,7 +189,7 @@ func main() {
}

if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
config.PrintSampleConfig(inputFilters, filterFilters, outputFilters)
return
}

Expand Down
15 changes: 15 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package telegraf

type Filter interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

//create pipe for filter
Pipe(in chan Metric) chan Metric

// start the filter
Start(shutdown chan struct{})
}
103 changes: 96 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -46,9 +47,11 @@ type Config struct {
InputFilters []string
OutputFilters []string

Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
Filters map[string]telegraf.Filter
FiltersOrder []string
}

func NewConfig() *Config {
Expand All @@ -63,6 +66,8 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*internal_models.RunningInput, 0),
Outputs: make([]*internal_models.RunningOutput, 0),
Filters: make(map[string]telegraf.Filter),
FiltersOrder: make([]string, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand All @@ -77,6 +82,14 @@ type AgentConfig struct {
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// By default, precision will be set to the same timestamp order as the
// collection interval, with the maximum being 1s.
// ie, when interval = "10s", precision will be "1s"
// when interval = "250ms", precision will be "1ms"
// Precision will NOT be used for service inputs. It is up to each individual
// service input to set the timestamp at the appropriate precision.
Precision internal.Duration

// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
Expand Down Expand Up @@ -108,11 +121,10 @@ type AgentConfig struct {
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// TODO(cam): Remove UTC and parameter, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
UTC bool `toml:"utc"`
Precision string
UTC bool `toml:"utc"`

// Debug is the option for running in debug mode
Debug bool
Expand Down Expand Up @@ -209,6 +221,11 @@ var header = `# Telegraf Configuration
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"

## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns".
precision = ""
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
Expand Down Expand Up @@ -239,7 +256,7 @@ var serviceInputHeader = `
`

// PrintSampleConfig prints the sample config
func PrintSampleConfig(inputFilters []string, outputFilters []string) {
func PrintSampleConfig(inputFilters []string, filterFilters []string, outputFilters []string) {
fmt.Printf(header)

if len(outputFilters) != 0 {
Expand All @@ -257,6 +274,17 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) {
printFilteredOutputs(pnames, true)
}

if len(filterFilters) != 0 {
printFilteredFilters(filterFilters, false)
} else {
var pnames []string
for pname := range filters.Filters {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
printFilteredFilters(pnames, true)
}

fmt.Printf(inputHeader)
if len(inputFilters) != 0 {
printFilteredInputs(inputFilters, false)
Expand Down Expand Up @@ -315,6 +343,20 @@ func printFilteredInputs(inputFilters []string, commented bool) {
}
}

func printFilteredFilters(printOnly []string, commented bool) {
var names []string
for item := range filters.Filters {
if sliceContains(item, printOnly) {
names = append(names, item)
}
}
sort.Strings(names)
for _, name := range names {
creator := filters.Filters[name]
filter := creator()
printConfig(name, filter, "filters", commented)
}
}
func printFilteredOutputs(outputFilters []string, commented bool) {
// Filter outputs
var onames []string
Expand Down Expand Up @@ -516,6 +558,25 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filter":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
if err = c.addFilter(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addFilter(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}

// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand All @@ -527,6 +588,13 @@ func (c *Config) LoadConfig(path string) error {
return nil
}

// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatability only.
// see https://github.com/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}

// parseFile loads a TOML configuration from a provided path and
// returns the AST produced from the TOML parser. When loading the file, it
// will find environment variables and replace them.
Expand All @@ -535,6 +603,8 @@ func parseFile(fpath string) (*ast.Table, error) {
if err != nil {
return nil, err
}
// ugh windows why
contents = trimBOM(contents)

env_vars := envVarRe.FindAll(contents, -1)
for _, env_var := range env_vars {
Expand Down Expand Up @@ -583,6 +653,25 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}

func (c *Config) addFilter(name string, table *ast.Table) error {
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

if _, ok = c.Filters[name]; ok {
return fmt.Errorf("Filter already defined %s", name)
}
c.Filters[name] = filter
c.FiltersOrder = append(c.FiltersOrder, name)
return nil
}

func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
Expand Down
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/histogram"
)
Loading