Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Valuecounter aggregator plugin #3523

Merged
merged 9 commits into from
Jun 19, 2018
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ formats may be used with input plugins supporting the `data_format` option:
* [basicstats](./plugins/aggregators/basicstats)
* [minmax](./plugins/aggregators/minmax)
* [histogram](./plugins/aggregators/histogram)
* [valuecounter](./plugins/aggregators/valuecounter)

## Output Plugins

Expand Down
1 change: 1 addition & 0 deletions plugins/aggregators/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
)
70 changes: 70 additions & 0 deletions plugins/aggregators/valuecounter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# ValueCounter Aggregator Plugin

The valuecounter plugin counts the occurance of values in fields and emmits the
counter once every 'preriod' seconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: period


A usecase for the valuecounter plugin is when you are processing a HTTP access
log (with the logparser input) and want to count the HTTP status codes.

The fields which will be counted must be configured with the `field_names`
configuration directive. When no `field_names` is provided the plugin will not
count any fields. The results are emitted in fields in the format:
`originalfieldname_fieldvalue = count`.

### Configuration:

```toml
[[aggregators.valuecounter]]
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The fields for which the values will be counted
field_names = ["status"]
```

### Measurements & Fields:

- measurement1
- field_value1
- field_value2

### Tags:

No tags are applied by this aggregator.

### Example Output:

Example for parsing a HTTP access log.

telegraf.conf:
```
[[inputs.logparser]]
files = ["/tmp/tst.log"]
[inputs.logparser.grok]
patterns = ['%{DATA:url:tag} %{NUMBER:response:string}']
measurement = "access"

[[aggregators.valuecounter]]
namepass = ["access"]
field_names = ["response"]
```

/tmp/tst.log
```
/some/path 200
/some/path 401
/some/path 200
```

```
$ telegraf --config telegraf.conf --quiet

access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697

access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000
```
94 changes: 94 additions & 0 deletions plugins/aggregators/valuecounter/valuecounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package valuecounter

import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type aggregate struct {
name string
tags map[string]string
fieldCount map[string]int
}

type ValueCounter struct {
cache map[uint64]aggregate
Field_names []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this Fields, which will match with some other aggregator work.

Strictly speaking, we could probably use the normal filtering options, since this is essentially fieldpass, but I think it is important to require this to be opt in.

}

func NewValueCounter() telegraf.Aggregator {
vc := &ValueCounter{}
vc.Reset()
return vc
}

var sampleConfig = `
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The fields for which the values will be counted
field_names = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a list: fields = []

`

func (vc *ValueCounter) SampleConfig() string {
return sampleConfig
}

func (vc *ValueCounter) Description() string {
return "Count the occurance of values in fields."
}

// Run on every metric which passes the plugin
func (vc *ValueCounter) Add(in telegraf.Metric) {
id := in.HashID()

// Check if the cache already has an entry for this metric, if not create it
if _, ok := vc.cache[id]; !ok {
a := aggregate{
name: in.Name(),
tags: in.Tags(),
fieldCount: make(map[string]int),
}
vc.cache[id] = a
}

// Check if this metric has fields which we need to count, if so increment
// the count.
for fk, fv := range in.Fields() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should skip floats? One thing that would be bad is if there is a frequently changing float value, which could result in many new fields.

If we don't disallow this, maybe we should add a warning in the README.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion, will check if I can implement this

for _, cf := range vc.Field_names {
if fk == cf {
fn := fmt.Sprintf("%v_%v", fk, fv)
vc.cache[id].fieldCount[fn] += 1
}
}
}
}

// Emmit the counters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Emit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Daniel
My requirement is to filter the logs by response code. If the response code is 500 then only we have to insert in the influx db

Please suggest me the approach

func (vc *ValueCounter) Push(acc telegraf.Accumulator) {
for _, agg := range vc.cache {
fields := map[string]interface{}{}

for field, count := range agg.fieldCount {
fields[field] = count
}

acc.AddFields(agg.name, fields, agg.tags)
}
}

// Reset the cache, executed after each push
func (vc *ValueCounter) Reset() {
vc.cache = make(map[uint64]aggregate)
}

func init() {
aggregators.Add("valuecounter", func() telegraf.Aggregator {
return NewValueCounter()
})
}
128 changes: 128 additions & 0 deletions plugins/aggregators/valuecounter/valuecounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package valuecounter

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)

// Create a valuecounter with config
func NewTestValueCounter(field_names []string) telegraf.Aggregator {
vc := &ValueCounter{
//Field_names: []string{"status"},
Field_names: field_names,
}
vc.Reset()

return vc
}

var m1, _ = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"status": 200,
"somefield": 20.1,
"foobar": "bar",
},
time.Now(),
)

var m2, _ = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"status": "OK",
"ignoreme": "string",
"andme": true,
"boolfield": false,
},
time.Now(),
)

func BenchmarkApply(b *testing.B) {
vc := NewTestValueCounter([]string{"status"})

for n := 0; n < b.N; n++ {
vc.Add(m1)
vc.Add(m2)
}
}

// Test basic functionality
func TestBasic(t *testing.T) {
vc := NewTestValueCounter([]string{"status"})
acc := testutil.Accumulator{}

vc.Add(m1)
vc.Add(m2)
vc.Add(m1)
vc.Push(&acc)

expectedFields := map[string]interface{}{
"status_200": 2,
"status_OK": 1,
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test with multiple fields to count
func TestMultipleFields(t *testing.T) {
vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"})
acc := testutil.Accumulator{}

vc.Add(m1)
vc.Add(m2)
vc.Add(m2)
vc.Add(m1)
vc.Push(&acc)

expectedFields := map[string]interface{}{
"status_200": 2,
"status_OK": 2,
"somefield_20.1": 2,
"boolfield_false": 2,
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test with a reset between two runs
func TestWithReset(t *testing.T) {
vc := NewTestValueCounter([]string{"status"})
acc := testutil.Accumulator{}

vc.Add(m1)
vc.Add(m1)
vc.Add(m2)
vc.Push(&acc)

expectedFields := map[string]interface{}{
"status_200": 2,
"status_OK": 1,
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)

acc.ClearMetrics()
vc.Reset()

vc.Add(m2)
vc.Add(m2)
vc.Add(m1)
vc.Push(&acc)

expectedFields = map[string]interface{}{
"status_200": 1,
"status_OK": 2,
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}