-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Valuecounter aggregator plugin #3523
Merged
Merged
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
da53b95
A valuecounter aggregator plugin to count values in specified fields.
piotr1212 ac6aabe
add valuecounter to README.md
piotr1212 3981abc
Merge remote-tracking branch 'origin/master' into valuecounter
piotr1212 2a149d4
valuecounter: implement suggestions
piotr1212 ce22ce4
go fmt
piotr1212 b69e125
Merge remote-tracking branch 'origin/master' into valuecounter
piotr1212 6c77240
valuecounter: only operate on int/str/bool fields
piotr1212 ffcabc3
only check for int64, uint64, bool, and string.
piotr1212 e584089
Merge remote-tracking branch 'origin/master' into valuecounter
piotr1212 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# ValueCounter Aggregator Plugin | ||
|
||
The valuecounter plugin counts the occurrence of values in fields and emits the | ||
counter once every 'period' seconds. | ||
|
||
A use case for the valuecounter plugin is when you are processing a HTTP access | ||
log (with the logparser input) and want to count the HTTP status codes. | ||
|
||
The fields which will be counted must be configured with the `fields` | ||
configuration directive. When no `fields` is provided the plugin will not count | ||
any fields. The results are emitted in fields in the format: | ||
`originalfieldname_fieldvalue = count`. | ||
|
||
Valuecounter only works on fields of the type int, bool or string. Float fields | ||
are being dropped to prevent the creating of too many fields. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
[[aggregators.valuecounter]] | ||
## General Aggregator Arguments: | ||
## The period on which to flush & clear the aggregator. | ||
period = "30s" | ||
## If true, the original metric will be dropped by the | ||
## aggregator and will not get sent to the output plugins. | ||
drop_original = false | ||
## The fields for which the values will be counted | ||
fields = ["status"] | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
- measurement1 | ||
- field_value1 | ||
- field_value2 | ||
|
||
### Tags: | ||
|
||
No tags are applied by this aggregator. | ||
|
||
### Example Output: | ||
|
||
Example for parsing a HTTP access log. | ||
|
||
telegraf.conf: | ||
``` | ||
[[inputs.logparser]] | ||
files = ["/tmp/tst.log"] | ||
[inputs.logparser.grok] | ||
patterns = ['%{DATA:url:tag} %{NUMBER:response:string}'] | ||
measurement = "access" | ||
|
||
[[aggregators.valuecounter]] | ||
namepass = ["access"] | ||
fields = ["response"] | ||
``` | ||
|
||
/tmp/tst.log | ||
``` | ||
/some/path 200 | ||
/some/path 401 | ||
/some/path 200 | ||
``` | ||
|
||
``` | ||
$ telegraf --config telegraf.conf --quiet | ||
|
||
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011 | ||
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282 | ||
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697 | ||
|
||
access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package valuecounter | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/aggregators" | ||
) | ||
|
||
type aggregate struct { | ||
name string | ||
tags map[string]string | ||
fieldCount map[string]int | ||
} | ||
|
||
// ValueCounter an aggregation plugin | ||
type ValueCounter struct { | ||
cache map[uint64]aggregate | ||
Fields []string | ||
} | ||
|
||
// NewValueCounter create a new aggregation plugin which counts the occurances | ||
// of fields and emits the count. | ||
func NewValueCounter() telegraf.Aggregator { | ||
vc := &ValueCounter{} | ||
vc.Reset() | ||
return vc | ||
} | ||
|
||
var sampleConfig = ` | ||
## General Aggregator Arguments: | ||
## The period on which to flush & clear the aggregator. | ||
period = "30s" | ||
## If true, the original metric will be dropped by the | ||
## aggregator and will not get sent to the output plugins. | ||
drop_original = false | ||
## The fields for which the values will be counted | ||
fields = [] | ||
` | ||
|
||
// SampleConfig generates a sample config for the ValueCounter plugin | ||
func (vc *ValueCounter) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns the description of the ValueCounter plugin | ||
func (vc *ValueCounter) Description() string { | ||
return "Count the occurance of values in fields." | ||
} | ||
|
||
// Add is run on every metric which passes the plugin | ||
func (vc *ValueCounter) Add(in telegraf.Metric) { | ||
id := in.HashID() | ||
|
||
// Check if the cache already has an entry for this metric, if not create it | ||
if _, ok := vc.cache[id]; !ok { | ||
a := aggregate{ | ||
name: in.Name(), | ||
tags: in.Tags(), | ||
fieldCount: make(map[string]int), | ||
} | ||
vc.cache[id] = a | ||
} | ||
|
||
// Check if this metric has fields which we need to count, if so increment | ||
// the count. | ||
for fk, fv := range in.Fields() { | ||
for _, cf := range vc.Fields { | ||
if fk == cf { | ||
// Do not process float types to prevent memory from blowing up | ||
switch fv.(type) { | ||
default: | ||
log.Printf("I! Valuecounter: Unsupported field type. " + | ||
"Must be an int, string or bool. Ignoring.") | ||
continue | ||
case uint8, uint16, uint32, uint64, int8, | ||
int16, int32, int64, string, bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only need to support int64, uint64, bool, and string. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, thanks, updated. |
||
} | ||
fn := fmt.Sprintf("%v_%v", fk, fv) | ||
vc.cache[id].fieldCount[fn]++ | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Push emits the counters | ||
func (vc *ValueCounter) Push(acc telegraf.Accumulator) { | ||
for _, agg := range vc.cache { | ||
fields := map[string]interface{}{} | ||
|
||
for field, count := range agg.fieldCount { | ||
fields[field] = count | ||
} | ||
|
||
acc.AddFields(agg.name, fields, agg.tags) | ||
} | ||
} | ||
|
||
// Reset the cache, executed after each push | ||
func (vc *ValueCounter) Reset() { | ||
vc.cache = make(map[uint64]aggregate) | ||
} | ||
|
||
func init() { | ||
aggregators.Add("valuecounter", func() telegraf.Aggregator { | ||
return NewValueCounter() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package valuecounter | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
"github.com/influxdata/telegraf/testutil" | ||
) | ||
|
||
// Create a valuecounter with config | ||
func NewTestValueCounter(fields []string) telegraf.Aggregator { | ||
vc := &ValueCounter{ | ||
Fields: fields, | ||
} | ||
vc.Reset() | ||
|
||
return vc | ||
} | ||
|
||
var m1, _ = metric.New("m1", | ||
map[string]string{"foo": "bar"}, | ||
map[string]interface{}{ | ||
"status": 200, | ||
"somefield": 20.1, | ||
"foobar": "bar", | ||
}, | ||
time.Now(), | ||
) | ||
|
||
var m2, _ = metric.New("m1", | ||
map[string]string{"foo": "bar"}, | ||
map[string]interface{}{ | ||
"status": "OK", | ||
"ignoreme": "string", | ||
"andme": true, | ||
"boolfield": false, | ||
}, | ||
time.Now(), | ||
) | ||
|
||
func BenchmarkApply(b *testing.B) { | ||
vc := NewTestValueCounter([]string{"status"}) | ||
|
||
for n := 0; n < b.N; n++ { | ||
vc.Add(m1) | ||
vc.Add(m2) | ||
} | ||
} | ||
|
||
// Test basic functionality | ||
func TestBasic(t *testing.T) { | ||
vc := NewTestValueCounter([]string{"status"}) | ||
acc := testutil.Accumulator{} | ||
|
||
vc.Add(m1) | ||
vc.Add(m2) | ||
vc.Add(m1) | ||
vc.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"status_200": 2, | ||
"status_OK": 1, | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test with multiple fields to count | ||
func TestMultipleFields(t *testing.T) { | ||
vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"}) | ||
acc := testutil.Accumulator{} | ||
|
||
vc.Add(m1) | ||
vc.Add(m2) | ||
vc.Add(m2) | ||
vc.Add(m1) | ||
vc.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"status_200": 2, | ||
"status_OK": 2, | ||
"boolfield_false": 2, | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test with a reset between two runs | ||
func TestWithReset(t *testing.T) { | ||
vc := NewTestValueCounter([]string{"status"}) | ||
acc := testutil.Accumulator{} | ||
|
||
vc.Add(m1) | ||
vc.Add(m1) | ||
vc.Add(m2) | ||
vc.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"status_200": 2, | ||
"status_OK": 1, | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
|
||
acc.ClearMetrics() | ||
vc.Reset() | ||
|
||
vc.Add(m2) | ||
vc.Add(m2) | ||
vc.Add(m1) | ||
vc.Push(&acc) | ||
|
||
expectedFields = map[string]interface{}{ | ||
"status_200": 1, | ||
"status_OK": 2, | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should skip floats? One thing that would be bad is if there is a frequently changing float value, which could result in many new fields.
If we don't disallow this, maybe we should add a warning in the README.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion, will check if I can implement this