-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add valuecounter aggregator plugin (#3523)
- Loading branch information
1 parent
3626a52
commit a059689
Showing
5 changed files
with
309 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# ValueCounter Aggregator Plugin | ||
|
||
The valuecounter plugin counts the occurrence of values in fields and emits the | ||
counter once every 'period' seconds. | ||
|
||
A use case for the valuecounter plugin is when you are processing a HTTP access | ||
log (with the logparser input) and want to count the HTTP status codes. | ||
|
||
The fields which will be counted must be configured with the `fields` | ||
configuration directive. When no `fields` is provided the plugin will not count | ||
any fields. The results are emitted in fields in the format: | ||
`originalfieldname_fieldvalue = count`. | ||
|
||
Valuecounter only works on fields of the type int, bool or string. Float fields | ||
are being dropped to prevent the creating of too many fields. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
[[aggregators.valuecounter]] | ||
## General Aggregator Arguments: | ||
## The period on which to flush & clear the aggregator. | ||
period = "30s" | ||
## If true, the original metric will be dropped by the | ||
## aggregator and will not get sent to the output plugins. | ||
drop_original = false | ||
## The fields for which the values will be counted | ||
fields = ["status"] | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
- measurement1 | ||
- field_value1 | ||
- field_value2 | ||
|
||
### Tags: | ||
|
||
No tags are applied by this aggregator. | ||
|
||
### Example Output: | ||
|
||
Example for parsing a HTTP access log. | ||
|
||
telegraf.conf: | ||
``` | ||
[[inputs.logparser]] | ||
files = ["/tmp/tst.log"] | ||
[inputs.logparser.grok] | ||
patterns = ['%{DATA:url:tag} %{NUMBER:response:string}'] | ||
measurement = "access" | ||
[[aggregators.valuecounter]] | ||
namepass = ["access"] | ||
fields = ["response"] | ||
``` | ||
|
||
/tmp/tst.log | ||
``` | ||
/some/path 200 | ||
/some/path 401 | ||
/some/path 200 | ||
``` | ||
|
||
``` | ||
$ telegraf --config telegraf.conf --quiet | ||
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011 | ||
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282 | ||
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697 | ||
access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package valuecounter | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/aggregators" | ||
) | ||
|
||
type aggregate struct { | ||
name string | ||
tags map[string]string | ||
fieldCount map[string]int | ||
} | ||
|
||
// ValueCounter an aggregation plugin | ||
type ValueCounter struct { | ||
cache map[uint64]aggregate | ||
Fields []string | ||
} | ||
|
||
// NewValueCounter create a new aggregation plugin which counts the occurances | ||
// of fields and emits the count. | ||
func NewValueCounter() telegraf.Aggregator { | ||
vc := &ValueCounter{} | ||
vc.Reset() | ||
return vc | ||
} | ||
|
||
var sampleConfig = ` | ||
## General Aggregator Arguments: | ||
## The period on which to flush & clear the aggregator. | ||
period = "30s" | ||
## If true, the original metric will be dropped by the | ||
## aggregator and will not get sent to the output plugins. | ||
drop_original = false | ||
## The fields for which the values will be counted | ||
fields = [] | ||
` | ||
|
||
// SampleConfig generates a sample config for the ValueCounter plugin | ||
func (vc *ValueCounter) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns the description of the ValueCounter plugin | ||
func (vc *ValueCounter) Description() string { | ||
return "Count the occurance of values in fields." | ||
} | ||
|
||
// Add is run on every metric which passes the plugin | ||
func (vc *ValueCounter) Add(in telegraf.Metric) { | ||
id := in.HashID() | ||
|
||
// Check if the cache already has an entry for this metric, if not create it | ||
if _, ok := vc.cache[id]; !ok { | ||
a := aggregate{ | ||
name: in.Name(), | ||
tags: in.Tags(), | ||
fieldCount: make(map[string]int), | ||
} | ||
vc.cache[id] = a | ||
} | ||
|
||
// Check if this metric has fields which we need to count, if so increment | ||
// the count. | ||
for fk, fv := range in.Fields() { | ||
for _, cf := range vc.Fields { | ||
if fk == cf { | ||
// Do not process float types to prevent memory from blowing up | ||
switch fv.(type) { | ||
default: | ||
log.Printf("I! Valuecounter: Unsupported field type. " + | ||
"Must be an int, string or bool. Ignoring.") | ||
continue | ||
case uint64, int64, string, bool: | ||
} | ||
fn := fmt.Sprintf("%v_%v", fk, fv) | ||
vc.cache[id].fieldCount[fn]++ | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Push emits the counters | ||
func (vc *ValueCounter) Push(acc telegraf.Accumulator) { | ||
for _, agg := range vc.cache { | ||
fields := map[string]interface{}{} | ||
|
||
for field, count := range agg.fieldCount { | ||
fields[field] = count | ||
} | ||
|
||
acc.AddFields(agg.name, fields, agg.tags) | ||
} | ||
} | ||
|
||
// Reset the cache, executed after each push | ||
func (vc *ValueCounter) Reset() { | ||
vc.cache = make(map[uint64]aggregate) | ||
} | ||
|
||
func init() { | ||
aggregators.Add("valuecounter", func() telegraf.Aggregator { | ||
return NewValueCounter() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package valuecounter | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
"github.com/influxdata/telegraf/testutil" | ||
) | ||
|
||
// Create a valuecounter with config | ||
func NewTestValueCounter(fields []string) telegraf.Aggregator { | ||
vc := &ValueCounter{ | ||
Fields: fields, | ||
} | ||
vc.Reset() | ||
|
||
return vc | ||
} | ||
|
||
var m1, _ = metric.New("m1", | ||
map[string]string{"foo": "bar"}, | ||
map[string]interface{}{ | ||
"status": 200, | ||
"somefield": 20.1, | ||
"foobar": "bar", | ||
}, | ||
time.Now(), | ||
) | ||
|
||
var m2, _ = metric.New("m1", | ||
map[string]string{"foo": "bar"}, | ||
map[string]interface{}{ | ||
"status": "OK", | ||
"ignoreme": "string", | ||
"andme": true, | ||
"boolfield": false, | ||
}, | ||
time.Now(), | ||
) | ||
|
||
func BenchmarkApply(b *testing.B) { | ||
vc := NewTestValueCounter([]string{"status"}) | ||
|
||
for n := 0; n < b.N; n++ { | ||
vc.Add(m1) | ||
vc.Add(m2) | ||
} | ||
} | ||
|
||
// Test basic functionality | ||
func TestBasic(t *testing.T) { | ||
vc := NewTestValueCounter([]string{"status"}) | ||
acc := testutil.Accumulator{} | ||
|
||
vc.Add(m1) | ||
vc.Add(m2) | ||
vc.Add(m1) | ||
vc.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"status_200": 2, | ||
"status_OK": 1, | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test with multiple fields to count | ||
func TestMultipleFields(t *testing.T) { | ||
vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"}) | ||
acc := testutil.Accumulator{} | ||
|
||
vc.Add(m1) | ||
vc.Add(m2) | ||
vc.Add(m2) | ||
vc.Add(m1) | ||
vc.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"status_200": 2, | ||
"status_OK": 2, | ||
"boolfield_false": 2, | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test with a reset between two runs | ||
func TestWithReset(t *testing.T) { | ||
vc := NewTestValueCounter([]string{"status"}) | ||
acc := testutil.Accumulator{} | ||
|
||
vc.Add(m1) | ||
vc.Add(m1) | ||
vc.Add(m2) | ||
vc.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"status_200": 2, | ||
"status_OK": 1, | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
|
||
acc.ClearMetrics() | ||
vc.Reset() | ||
|
||
vc.Add(m2) | ||
vc.Add(m2) | ||
vc.Add(m1) | ||
vc.Push(&acc) | ||
|
||
expectedFields = map[string]interface{}{ | ||
"status_200": 1, | ||
"status_OK": 2, | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} |