Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parse_multivalue config to collectd parser #4403

Merged
merged 1 commit into from
Jul 12, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
@@ -479,6 +479,12 @@ You can also change the path to the typesdb or add additional typesdb using
collectd_security_level = "encrypt"
## Path of to TypesDB specifications
collectd_typesdb = ["/usr/share/collectd/types.db"]

# Multi-value plugins can be handled two ways.
# "split" will parse and store the multi-value plugin data into separate measurements
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
# "split" is the default behavior for backward compatability with previous versions of influxdb.
collectd_parse_multivalue = "split"
```

# Dropwizard:
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1285,6 +1285,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["collectd_parse_multivalue"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CollectdSplit = str.Value
}
}
}

if node, ok := tbl.Fields["collectd_typesdb"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
@@ -1348,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
delete(tbl.Fields, "collectd_typesdb")
delete(tbl.Fields, "collectd_parse_multivalue")
delete(tbl.Fields, "dropwizard_metric_registry_path")
delete(tbl.Fields, "dropwizard_time_path")
delete(tbl.Fields, "dropwizard_time_format")
109 changes: 79 additions & 30 deletions plugins/parsers/collectd/parser.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,10 @@ type CollectdParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string

popts network.ParseOpts
//whether or not to split multi value metric into multiple metrics
//default value is split
ParseMultiValue string
popts network.ParseOpts
}

func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
@@ -32,6 +35,7 @@ func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (*CollectdParser, error) {
popts := network.ParseOpts{}

@@ -64,7 +68,8 @@ func NewCollectdParser(
}
}

parser := CollectdParser{popts: popts}
parser := CollectdParser{popts: popts,
ParseMultiValue: split}
return &parser, nil
}

@@ -76,7 +81,7 @@ func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {

metrics := []telegraf.Metric{}
for _, valueList := range valueLists {
metrics = append(metrics, UnmarshalValueList(valueList)...)
metrics = append(metrics, UnmarshalValueList(valueList, p.ParseMultiValue)...)
}

if len(p.DefaultTags) > 0 {
@@ -111,47 +116,91 @@ func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
}

// UnmarshalValueList translates a ValueList into a Telegraf metric.
func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric {
func UnmarshalValueList(vl *api.ValueList, multiValue string) []telegraf.Metric {
timestamp := vl.Time.UTC()

var metrics []telegraf.Metric
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})

// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}
//set multiValue to default "split" if nothing is specified
if multiValue == "" {
multiValue = "split"
}
switch multiValue {
case "split":
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})

// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}

if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}

// Drop invalid points
m, err := metric.New(name, tags, fields, timestamp)
if err != nil {
log.Printf("E! Dropping metric %v: %v", name, err)
continue
}

metrics = append(metrics, m)
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
case "join":
name := vl.Identifier.Plugin
tags := make(map[string]string)
fields := make(map[string]interface{})
for i := range vl.Values {
switch value := vl.Values[i].(type) {
case api.Gauge:
fields[vl.DSName(i)] = float64(value)
case api.Derive:
fields[vl.DSName(i)] = float64(value)
case api.Counter:
fields[vl.DSName(i)] = float64(value)
}

if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}
}

// Drop invalid points
m, err := metric.New(name, tags, fields, timestamp)
if err != nil {
log.Printf("E! Dropping metric %v: %v", name, err)
continue
}

metrics = append(metrics, m)
default:
log.Printf("parse-multi-value config can only be 'split' or 'join'")
}
return metrics
}
20 changes: 17 additions & 3 deletions plugins/parsers/collectd/parser_test.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (

"collectd.org/api"
"collectd.org/network"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
@@ -76,7 +77,7 @@ var multiMetric = testCase{
api.Derive(42),
api.Gauge(42),
},
DSNames: []string(nil),
DSNames: []string{"t1", "t2"},
},
},
[]metricData{
@@ -108,7 +109,7 @@ var multiMetric = testCase{
}

func TestNewCollectdParser(t *testing.T) {
parser, err := NewCollectdParser("", "", []string{})
parser, err := NewCollectdParser("", "", []string{}, "join")
require.Nil(t, err)
require.Equal(t, parser.popts.SecurityLevel, network.None)
require.NotNil(t, parser.popts.PasswordLookup)
@@ -133,6 +134,19 @@ func TestParse(t *testing.T) {
}
}

func TestParseMultiValueSplit(t *testing.T) {
buf, err := writeValueList(multiMetric.vl)
require.Nil(t, err)
bytes, err := buf.Bytes()
require.Nil(t, err)

parser := &CollectdParser{ParseMultiValue: "split"}
metrics, err := parser.Parse(bytes)
require.Nil(t, err)

assert.Equal(t, 2, len(metrics))
}

func TestParse_DefaultTags(t *testing.T) {
buf, err := writeValueList(singleMetric.vl)
require.Nil(t, err)
@@ -266,7 +280,7 @@ func TestParseLine(t *testing.T) {
bytes, err := buf.Bytes()
require.Nil(t, err)

parser, err := NewCollectdParser("", "", []string{})
parser, err := NewCollectdParser("", "", []string{}, "split")
require.Nil(t, err)
metric, err := parser.ParseLine(string(bytes))
require.Nil(t, err)
8 changes: 6 additions & 2 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
@@ -66,6 +66,9 @@ type Config struct {
// Dataset specification for collectd
CollectdTypesDB []string

// whether to split or join multivalue metrics
CollectdSplit string

// DataType only applies to value, this will be the type to parse value to
DataType string

@@ -109,7 +112,7 @@ func NewParser(config *Config) (Parser, error) {
config.Templates, config.DefaultTags)
case "collectd":
parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB)
config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit)
case "dropwizard":
parser, err = NewDropwizardParser(
config.DropwizardMetricRegistryPath,
@@ -172,8 +175,9 @@ func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (Parser, error) {
return collectd.NewCollectdParser(authFile, securityLevel, typesDB)
return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split)
}

func NewDropwizardParser(