Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Processor & Aggregator Plugins #1777

Merged
merged 13 commits into from
Oct 12, 2016
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Release Notes

- Telegraf now supports two new types of plugins: processors & aggregators.

- On systemd Telegraf will no longer redirect it's stdout to /var/log/telegraf/telegraf.log.
On most systems, the logs will be directed to the systemd journal and can be
accessed by `journalctl -u telegraf.service`. Consult the systemd journal
Expand All @@ -11,6 +13,7 @@ continue sending logs to /var/log/telegraf/telegraf.log.

### Features

- [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support.
- [#1861](https://github.com/influxdata/telegraf/pull/1861): adding the tags in the graylog output plugin
- [#1732](https://github.com/influxdata/telegraf/pull/1732): Telegraf systemd service, log to journal.
- [#1782](https://github.com/influxdata/telegraf/pull/1782): Allow numeric and non-string values for tag_keys.
Expand Down Expand Up @@ -62,6 +65,8 @@ continue sending logs to /var/log/telegraf/telegraf.log.
- [#1836](https://github.com/influxdata/telegraf/pull/1836): Fix snmp table field initialization for non-automatic table.
- [#1724](https://github.com/influxdata/telegraf/issues/1724): cgroups path being parsed as metric.
- [#1886](https://github.com/influxdata/telegraf/issues/1886): Fix phpfpm fcgi client panic when URL does not exist.
- [#1344](https://github.com/influxdata/telegraf/issues/1344): Fix config file parse error logging.
- [#1771](https://github.com/influxdata/telegraf/issues/1771): Delete nil fields in the metric maker.

## v1.0.1 [2016-09-26]

Expand Down
190 changes: 185 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

1. [Sign the CLA](http://influxdb.com/community/cla.html)
1. Make changes or write plugin (see below for details)
1. Add your plugin to `plugins/inputs/all/all.go` or `plugins/outputs/all/all.go`
1. Add your plugin to one of: `plugins/{inputs,outputs,aggregators,processors}/all/all.go`
1. If your plugin requires a new Go package,
[add it](https://github.com/influxdata/telegraf/blob/master/CONTRIBUTING.md#adding-a-dependency)
1. Write a README for your plugin, if it's an input plugin, it should be structured
Expand All @@ -16,8 +16,8 @@ for a good example.

## GoDoc

Public interfaces for inputs, outputs, metrics, and the accumulator can be found
on the GoDoc
Public interfaces for inputs, outputs, processors, aggregators, metrics,
and the accumulator can be found on the GoDoc

[![GoDoc](https://godoc.org/github.com/influxdata/telegraf?status.svg)](https://godoc.org/github.com/influxdata/telegraf)

Expand Down Expand Up @@ -46,7 +46,7 @@ and submit new inputs.

### Input Plugin Guidelines

* A plugin must conform to the `telegraf.Input` interface.
* A plugin must conform to the [`telegraf.Input`](https://godoc.org/github.com/influxdata/telegraf#Input) interface.
* Input Plugins should call `inputs.Add` in their `init` function to register themselves.
See below for a quick example.
* Input Plugins must be added to the
Expand Down Expand Up @@ -177,7 +177,7 @@ similar constructs.

### Output Plugin Guidelines

* An output must conform to the `outputs.Output` interface.
* An output must conform to the [`telegraf.Output`](https://godoc.org/github.com/influxdata/telegraf#Output) interface.
* Outputs should call `outputs.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
Expand Down Expand Up @@ -275,6 +275,186 @@ and `Stop()` methods.
* Same as the `Output` guidelines, except that they must conform to the
`output.ServiceOutput` interface.

## Processor Plugins

This section is for developers who want to create a new processor plugin.

### Processor Plugin Guidelines

* A processor must conform to the [`telegraf.Processor`](https://godoc.org/github.com/influxdata/telegraf#Processor) interface.
* Processors should call `processors.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdata/telegraf/plugins/processors/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
processor can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this processor does.

### Processor Example

```go
package printer

// printer.go

import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)

type Printer struct {
}

var sampleConfig = `
`

func (p *Printer) SampleConfig() string {
return sampleConfig
}

func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
fmt.Println(metric.String())
}
return in
}

func init() {
processors.Add("printer", func() telegraf.Processor {
return &Printer{}
})
}
```

## Aggregator Plugins

This section is for developers who want to create a new aggregator plugin.

### Aggregator Plugin Guidelines

* A aggregator must conform to the [`telegraf.Aggregator`](https://godoc.org/github.com/influxdata/telegraf#Aggregator) interface.
* Aggregators should call `aggregators.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdata/telegraf/plugins/aggregators/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
aggregator can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this aggregator does.
* The Aggregator plugin will need to keep caches of metrics that have passed
through it. This should be done using the builtin `HashID()` function of each
metric.
* When the `Reset()` function is called, all caches should be cleared.

### Aggregator Example

```go
package min

// min.go

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type Min struct {
// caches for metric fields, names, and tags
fieldCache map[uint64]map[string]float64
nameCache map[uint64]string
tagCache map[uint64]map[string]string
}

func NewMin() telegraf.Aggregator {
m := &Min{}
m.Reset()
return m
}

var sampleConfig = `
## period is the flush & clear interval of the aggregator.
period = "30s"
## If true drop_original will drop the original metrics and
## only send aggregates.
drop_original = false
`

func (m *Min) SampleConfig() string {
return sampleConfig
}

func (m *Min) Description() string {
return "Keep the aggregate min of each metric passing through."
}

func (m *Min) Add(in telegraf.Metric) {
id := in.HashID()
if _, ok := m.nameCache[id]; !ok {
// hit an uncached metric, create caches for first time:
m.nameCache[id] = in.Name()
m.tagCache[id] = in.Tags()
m.fieldCache[id] = make(map[string]float64)
for k, v := range in.Fields() {
if fv, ok := convert(v); ok {
m.fieldCache[id][k] = fv
}
}
} else {
for k, v := range in.Fields() {
if fv, ok := convert(v); ok {
if _, ok := m.fieldCache[id][k]; !ok {
// hit an uncached field of a cached metric
m.fieldCache[id][k] = fv
continue
}
if fv < m.fieldCache[id][k] {
// set new minimum
m.fieldCache[id][k] = fv
}
}
}
}
}

func (m *Min) Push(acc telegraf.Accumulator) {
for id, _ := range m.nameCache {
fields := map[string]interface{}{}
for k, v := range m.fieldCache[id] {
fields[k+"_min"] = v
}
acc.AddFields(m.nameCache[id], fields, m.tagCache[id])
}
}

func (m *Min) Reset() {
m.fieldCache = make(map[uint64]map[string]float64)
m.nameCache = make(map[uint64]string)
m.tagCache = make(map[uint64]map[string]string)
}

func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
default:
return 0, false
}
}

func init() {
aggregators.Add("min", func() telegraf.Aggregator {
return NewMin()
})
}
```

## Unit Tests

### Execute short tests
Expand Down
6 changes: 3 additions & 3 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ github.com/eclipse/paho.mqtt.golang 0f7a459f04f13a41b7ed752d47944528d4bf9a86
github.com/go-sql-driver/mysql 1fca743146605a172a266e1654e01e5cd5669bee
github.com/gobwas/glob 49571a1557cd20e6a2410adc6421f85b66c730b5
github.com/golang/protobuf 552c7b9542c194800fd493123b3798ef0a832032
github.com/golang/snappy 427fb6fc07997f43afa32f35e850833760e489a7
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a
github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/consul 5aa90455ce78d4d41578bafc86305e6e6b28d7d2
github.com/hpcloud/tail b2940955ab8b26e19d43a43c4da0475dd81bdb56
github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb e094138084855d444195b252314dfee9eae34cab
github.com/influxdata/influxdb fc57c0f7c635df3873f3d64f0ed2100ddc94d5ae
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc
Expand Down Expand Up @@ -56,7 +56,7 @@ github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
golang.org/x/crypto c197bcf24cde29d3f73c7b4ac6fd41f4384e8af6
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
Expand Down
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,42 +86,43 @@ if you don't have it already. You also must build with golang version 1.5+.
## How to use it:

```console
$ telegraf -help
$ telegraf --help
Telegraf, The plugin-driven server agent for collecting and reporting metrics.

Usage:

telegraf <flags>
telegraf [commands|flags]

The flags are:
The commands & flags are:

-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
config print out full sample configuration to stdout
version print the version to stdout

--config <file> configuration file to load
--test gather metrics once, print them to stdout, and exit
--config-directory directory containing additional *.conf files
--input-filter filter the input plugins to enable, separator is :
--output-filter filter the output plugins to enable, separator is :
--usage print usage for a plugin, ie, 'telegraf -usage mysql'
--debug print metrics as they're generated to stdout
--quiet run in quiet mode

Examples:

# generate a telegraf config file:
telegraf -sample-config > telegraf.conf
telegraf config > telegraf.conf

# generate config with only cpu input & influxdb output plugins defined
telegraf -sample-config -input-filter cpu -output-filter influxdb
telegraf config -input-filter cpu -output-filter influxdb

# run a single telegraf collection, outputing metrics to stdout
telegraf -config telegraf.conf -test
telegraf --config telegraf.conf -test

# run telegraf with all plugins defined in config file
telegraf -config telegraf.conf
telegraf --config telegraf.conf

# run telegraf, enabling the cpu & memory input, and influxdb output plugins
telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb
telegraf --config telegraf.conf -input-filter cpu:mem -output-filter influxdb
```

## Configuration
Expand Down
12 changes: 3 additions & 9 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package telegraf

import "time"

// Accumulator is an interface for "accumulating" metrics from input plugin(s).
// The metrics are sent down a channel shared between all input plugins and then
// flushed on the configured flush_interval.
// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
type Accumulator interface {
// AddFields adds a metric to the accumulator with the given measurement
// name, fields, and tags (and timestamp). If a timestamp is not provided,
Expand All @@ -29,12 +28,7 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

AddError(err error)

Debug() bool
SetDebug(enabled bool)

SetPrecision(precision, interval time.Duration)

DisablePrecision()
AddError(err error)
}
Loading