From ba09566d92ae0daa6eacf0ba32e0403cbe0c8056 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 22 Jul 2021 16:28:34 +0200 Subject: [PATCH] Add carbon2 parser (#33) --- plugins/parsers/carbon2/README.md | 21 ++ plugins/parsers/carbon2/parser.go | 180 ++++++++++ plugins/parsers/carbon2/parser_test.go | 475 +++++++++++++++++++++++++ plugins/parsers/registry.go | 7 + plugins/serializers/carbon2/carbon2.go | 14 + 5 files changed, 697 insertions(+) create mode 100644 plugins/parsers/carbon2/README.md create mode 100644 plugins/parsers/carbon2/parser.go create mode 100644 plugins/parsers/carbon2/parser_test.go diff --git a/plugins/parsers/carbon2/README.md b/plugins/parsers/carbon2/README.md new file mode 100644 index 0000000000000..72bc2828e463d --- /dev/null +++ b/plugins/parsers/carbon2/README.md @@ -0,0 +1,21 @@ +# Carbon2 + +The carbon2 parser parses the incoming metrics in [`carbon2` format][metrics20]. + +**NOTE:** All tags (both `intrinsic_tags` and `meta_tags` are treated as telegraf +tags hance parsing and then serializing a metric will yield a different metric +than was ingested because of data model. + +[metrics20]: http://metrics20.org/implementations/ + +## Configuration + +```toml +[[inputs.file]] + files = ["example_carbon2_file"] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "carbon2" diff --git a/plugins/parsers/carbon2/parser.go b/plugins/parsers/carbon2/parser.go new file mode 100644 index 0000000000000..3e5b3359321c5 --- /dev/null +++ b/plugins/parsers/carbon2/parser.go @@ -0,0 +1,180 @@ +package carbon2 + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "strconv" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +const ( + tagNameField = "field" + tagNameMetric = "metric" +) + +type Parser struct { +} + +// Parse takes a byte buffer separated by newlines +// ie, `cpu.usage.idle 90\ncpu.usage.busy 10` and parses it into telegraf metrics. +// +// Must be thread-safe. +func (p Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + + var ( + buffer = bytes.NewBuffer(buf) + reader = bufio.NewReader(buffer) + metrics []telegraf.Metric + ) + + for { + line, err := reader.ReadBytes('\n') + if err != nil && err != io.EOF { + return nil, err + } + if err == io.EOF && len(line) == 0 { + break + } + + m, err := parseLine(line) + if err != nil { + return nil, fmt.Errorf("failed to parse line: %s, err: %w", line, err) + } + + metrics = append(metrics, m) + } + + return metrics, nil +} + +func parseLine(line []byte) (telegraf.Metric, error) { + var ( + buf = bytes.NewBuffer(line) + name string + tim time.Time + tags = make(map[string]string) + fields = make(map[string]interface{}) + gotValue bool + ) + + for { + bb, err := buf.ReadBytes(' ') + if err != nil && err != io.EOF { + return nil, err + } + + if bytes.HasPrefix(bb, []byte(" ")) { + continue + } + + idx := bytes.IndexByte(bb, '=') + if idx == -1 { + if !gotValue { + // It's a value so parse it + v, err := parseBytesForValue(bb) + if err != nil { + return nil, err + } + + // Note: this works around the fact that carbon2 serializer can + // either: + // * stitch together using '_' telegraf's metric Name and field + // name (taken from 'field' tag) and take that to be used + // as metric name, e.g. metric=memory_available + // * use a separate 'field' tag, e.g. metric=memory field=available + // + // Hence parsing and serializing a metric would yield a different + // result then ingested. Because of that reason we + fields[""] = v + gotValue = true + continue + } else { + // It's a timestamp so parse it + t, err := parseBytesForTimestamp(bb) + if err != nil { + return nil, err + } + + tim = t + break + } + } + + tag, value, err := getTag(bb, idx) + if err != nil { + return nil, err + } + + if bytes.Compare(tag, []byte(tagNameMetric)) == 0 { + // If it's a 'metric' tag then set it as metric's name + name = string(value) + } else { + tags[string(tag)] = string(value) + } + } + + if name == "" { + return nil, errors.New("metric without 'metric' tag") + } + + return metric.New(name, tags, fields, tim, telegraf.Gauge), nil +} + +func parseBytesForTimestamp(b []byte) (time.Time, error) { + s := string(bytes.TrimSpace(b)) + + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return time.Time{}, err + } + + return time.Unix(i, 0), nil +} + +func parseBytesForValue(b []byte) (interface{}, error) { + trimmed := bytes.TrimSpace(b) + + if bytes.Contains(trimmed, []byte(".")) { + vf, err := strconv.ParseFloat(string(trimmed), 64) + if err != nil { + return nil, fmt.Errorf("failed to parse value: %s, err: %w", b, err) + } + return vf, nil + } + + vi, err := strconv.ParseInt(string(trimmed), 10, 64) + if err != nil { + return nil, err + } + return vi, nil +} + +func getTag(bb []byte, equalSignIdx int) ([]byte, []byte, error) { + field := bb[:equalSignIdx] + value := bytes.TrimSpace(bb[equalSignIdx+1:]) + return field, value, nil +} + +// ParseLine takes a single string metric ie, "cpu.usage.idle 90" +// and parses it into a telegraf metric. +// +// Must be thread-safe. +// This function is only called by plugins that expect line based protocols +// Doesn't need to be implemented by non-linebased parsers (e.g. json, xml) +func (p Parser) ParseLine(line string) (telegraf.Metric, error) { + return parseLine([]byte(line)) +} + +// SetDefaultTags tells the parser to add all of the given tags +// to each parsed metric. +// NOTE: do _not_ modify the map after you've passed it here!! +func (p Parser) SetDefaultTags(tags map[string]string) { +} diff --git a/plugins/parsers/carbon2/parser_test.go b/plugins/parsers/carbon2/parser_test.go new file mode 100644 index 0000000000000..cd59218345bc0 --- /dev/null +++ b/plugins/parsers/carbon2/parser_test.go @@ -0,0 +1,475 @@ +package carbon2 + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers/carbon2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParse(t *testing.T) { + testcases := []struct { + name string + input []byte + wantErr bool + wantedFunc func() []telegraf.Metric + }{ + { + name: "basic1", + input: []byte("className=HealthTrackerKafkaDataQueueWriter cluster=open-receiver deployment=nite fullClassName=com.sumologic.health.io.HealthTrackerKafkaDataQueueWriter metric=kafka.queue.alpha_health_tracker_incidents.offer.timer mtype=count node=nite-open-receiver-1 service=open-receiver stat=p75 _primaryMetricType=carbon 0.00 1625855958"), + wantedFunc: func() []telegraf.Metric { + tags := map[string]string{ + "className": "HealthTrackerKafkaDataQueueWriter", + "cluster": "open-receiver", + "deployment": "nite", + "fullClassName": "com.sumologic.health.io.HealthTrackerKafkaDataQueueWriter", + "mtype": "count", + "node": "nite-open-receiver-1", + "service": "open-receiver", + "stat": "p75", + // meta tags + "_primaryMetricType": "carbon", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 0.0, + } + + return []telegraf.Metric{ + metric.New( + "kafka.queue.alpha_health_tracker_incidents.offer.timer", + tags, + fields, + time.Unix(1625855958, 0), + telegraf.Gauge, + ), + } + }, + }, + { + name: "basic2", + input: []byte("className=KafkaDataQueueWriter cluster=open-receiver deployment=nite fullClassName=com.sumologic.interchange.kafka.queue.KafkaDataQueueWriter metric=kafka.queue.beta_trace_ingest_traces.offer.timer mtype=count node=nite-open-receiver-1 service=open-receiver stat=p75 _primaryMetricType=carbon 0.00 1625855958"), + wantedFunc: func() []telegraf.Metric { + tags := map[string]string{ + "className": "KafkaDataQueueWriter", + "cluster": "open-receiver", + "deployment": "nite", + "fullClassName": "com.sumologic.interchange.kafka.queue.KafkaDataQueueWriter", + "mtype": "count", + "node": "nite-open-receiver-1", + "service": "open-receiver", + "stat": "p75", + "_primaryMetricType": "carbon", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 0.0, + } + + return []telegraf.Metric{ + metric.New( + "kafka.queue.beta_trace_ingest_traces.offer.timer", + tags, + fields, + time.Unix(1625855958, 0), + telegraf.Gauge, + ), + } + }, + }, + { + name: "basic3", + input: []byte("_rawName=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max cluster=open-receiver deployment=nite metric=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max node=nite-open-receiver-1 service=health _1=nite _2=nite-open-receiver-1 _3=health _4=jmx _5=memoryUsage _6=pools _7=Compressed-Class-Space _8=max name=Compressed-Class-Space 1073741824 1625855945"), + wantedFunc: func() []telegraf.Metric { + tags := map[string]string{ + "_rawName": "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + "cluster": "open-receiver", + "deployment": "nite", + "node": "nite-open-receiver-1", + "service": "health", + // meta tags + "_1": "nite", + "_2": "nite-open-receiver-1", + "_3": "health", + "_4": "jmx", + "_5": "memoryUsage", + "_6": "pools", + "_7": "Compressed-Class-Space", + "_8": "max", + "name": "Compressed-Class-Space", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 1073741824, + } + + return []telegraf.Metric{ + metric.New( + "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + tags, + fields, + time.Unix(1625855945, 0), + telegraf.Gauge, + ), + } + }, + }, + { + name: "basic3_with_new_line_at_the_end", + input: []byte("_rawName=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max cluster=open-receiver deployment=nite metric=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max node=nite-open-receiver-1 service=health _1=nite _2=nite-open-receiver-1 _3=health _4=jmx _5=memoryUsage _6=pools _7=Compressed-Class-Space _8=max name=Compressed-Class-Space 1073741824 1625855945\n"), + wantedFunc: func() []telegraf.Metric { + tags := map[string]string{ + "_rawName": "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + "cluster": "open-receiver", + "deployment": "nite", + "node": "nite-open-receiver-1", + "service": "health", + // meta tags + "_1": "nite", + "_2": "nite-open-receiver-1", + "_3": "health", + "_4": "jmx", + "_5": "memoryUsage", + "_6": "pools", + "_7": "Compressed-Class-Space", + "_8": "max", + "name": "Compressed-Class-Space", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 1073741824, + } + + return []telegraf.Metric{ + metric.New( + "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + tags, + fields, + time.Unix(1625855945, 0), + telegraf.Gauge, + ), + } + }, + }, + { + name: "multiple_metrics", + input: []byte(`_rawName=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-1.max cluster=open-receiver deployment=nite metric=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-1.max node=nite-open-receiver-1 service=health _1=nite _2=nite-open-receiver-1 _3=health _4=jmx _5=memoryUsage _6=pools _7=Compressed-Class-Space _8=max name=Compressed-Class-Space 1073741824 1625855945 +_rawName=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-2.max cluster=open-receiver deployment=nite metric=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-2.max node=nite-open-receiver-1 service=health _1=nite _2=nite-open-receiver-1 _3=health _4=jmx _5=memoryUsage _6=pools _7=Compressed-Class-Space _8=max name=Compressed-Class-Space 1073741827 1625855949`), + wantedFunc: func() []telegraf.Metric { + return []telegraf.Metric{ + metric.New( + "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-1.max", + map[string]string{ + "_rawName": "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-1.max", + "cluster": "open-receiver", + "deployment": "nite", + "node": "nite-open-receiver-1", + "service": "health", + // meta tags + "_1": "nite", + "_2": "nite-open-receiver-1", + "_3": "health", + "_4": "jmx", + "_5": "memoryUsage", + "_6": "pools", + "_7": "Compressed-Class-Space", + "_8": "max", + "name": "Compressed-Class-Space", + }, + map[string]interface{}{ + // TODO reconsider this hack + "": 1073741824, + }, + time.Unix(1625855945, 0), + telegraf.Gauge, + ), + metric.New( + "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-2.max", + map[string]string{ + "_rawName": "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space-2.max", + "cluster": "open-receiver", + "deployment": "nite", + "node": "nite-open-receiver-1", + "service": "health", + // meta tags + "_1": "nite", + "_2": "nite-open-receiver-1", + "_3": "health", + "_4": "jmx", + "_5": "memoryUsage", + "_6": "pools", + "_7": "Compressed-Class-Space", + "_8": "max", + "name": "Compressed-Class-Space", + }, + map[string]interface{}{ + // TODO reconsider this hack + "": 1073741827, + }, + time.Unix(1625855949, 0), + telegraf.Gauge, + ), + } + }, + }, + { + name: "without_metric", + input: []byte("className=HealthTrackerKafkaDataQueueWriter cluster=open-receiver deployment=nite fullClassName=com.sumologic.health.io.HealthTrackerKafkaDataQueueWriter mtype=count node=nite-open-receiver-1 service=open-receiver stat=p75 _primaryMetricType=carbon 0.00 1625855958"), + wantErr: true, + }, + } + + p := Parser{} + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + m, err := p.Parse(tc.input) + if tc.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + + for i, e := range tc.wantedFunc() { + assert.Equalf(t, e.Name(), m[i].Name(), "%d: Metric name not as expected", i) + assert.Equal(t, e.Fields(), m[i].Fields(), i) + assert.Equal(t, e.Tags(), m[i].Tags(), i) + assert.Equal(t, e.Time(), m[i].Time(), i) + } + }) + } +} + +func TestParseLine(t *testing.T) { + testcases := []struct { + name string + input string + wantErr bool + wantedFunc func() telegraf.Metric + }{ + { + name: "basic1", + input: "className=HealthTrackerKafkaDataQueueWriter cluster=open-receiver deployment=nite fullClassName=com.sumologic.health.io.HealthTrackerKafkaDataQueueWriter metric=kafka.queue.alpha_health_tracker_incidents.offer.timer mtype=count node=nite-open-receiver-1 service=open-receiver stat=p75 _primaryMetricType=carbon 0.00 1625855958", + wantedFunc: func() telegraf.Metric { + tags := map[string]string{ + "className": "HealthTrackerKafkaDataQueueWriter", + "cluster": "open-receiver", + "deployment": "nite", + "fullClassName": "com.sumologic.health.io.HealthTrackerKafkaDataQueueWriter", + "mtype": "count", + "node": "nite-open-receiver-1", + "service": "open-receiver", + "stat": "p75", + // meta tags + "_primaryMetricType": "carbon", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 0.0, + } + + return metric.New( + "kafka.queue.alpha_health_tracker_incidents.offer.timer", + tags, + fields, + time.Unix(1625855958, 0), + telegraf.Gauge, + ) + }, + }, + { + name: "basic2", + input: "className=KafkaDataQueueWriter cluster=open-receiver deployment=nite fullClassName=com.sumologic.interchange.kafka.queue.KafkaDataQueueWriter metric=kafka.queue.beta_trace_ingest_traces.offer.timer mtype=count node=nite-open-receiver-1 service=open-receiver stat=p75 _primaryMetricType=carbon 0.00 1625855958", + wantedFunc: func() telegraf.Metric { + tags := map[string]string{ + "className": "KafkaDataQueueWriter", + "cluster": "open-receiver", + "deployment": "nite", + "fullClassName": "com.sumologic.interchange.kafka.queue.KafkaDataQueueWriter", + "mtype": "count", + "node": "nite-open-receiver-1", + "service": "open-receiver", + "stat": "p75", + "_primaryMetricType": "carbon", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 0.0, + } + + return metric.New( + "kafka.queue.beta_trace_ingest_traces.offer.timer", + tags, + fields, + time.Unix(1625855958, 0), + telegraf.Gauge, + ) + }, + }, + { + name: "basic3", + input: "_rawName=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max cluster=open-receiver deployment=nite metric=nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max node=nite-open-receiver-1 service=health _1=nite _2=nite-open-receiver-1 _3=health _4=jmx _5=memoryUsage _6=pools _7=Compressed-Class-Space _8=max name=Compressed-Class-Space 1073741824 1625855945", + wantedFunc: func() telegraf.Metric { + tags := map[string]string{ + "_rawName": "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + "cluster": "open-receiver", + "deployment": "nite", + "node": "nite-open-receiver-1", + "service": "health", + // meta tags + "_1": "nite", + "_2": "nite-open-receiver-1", + "_3": "health", + "_4": "jmx", + "_5": "memoryUsage", + "_6": "pools", + "_7": "Compressed-Class-Space", + "_8": "max", + "name": "Compressed-Class-Space", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 1073741824, + } + + return metric.New( + "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + tags, + fields, + time.Unix(1625855945, 0), + telegraf.Gauge, + ) + }, + }, + { + name: "without_metric", + input: "className=HealthTrackerKafkaDataQueueWriter cluster=open-receiver deployment=nite fullClassName=com.sumologic.health.io.HealthTrackerKafkaDataQueueWriter mtype=count node=nite-open-receiver-1 service=open-receiver stat=p75 _primaryMetricType=carbon 0.00 1625855958", + wantErr: true, + }, + } + + p := Parser{} + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + m, err := p.ParseLine(tc.input) + if tc.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + + expected := tc.wantedFunc() + assert.Equalf(t, expected.Name(), m.Name(), "Metric name not as expected") + assert.Equal(t, expected.Fields(), m.Fields()) + assert.Equal(t, expected.Tags(), m.Tags()) + assert.Equal(t, expected.Time(), m.Time()) + }) + } +} + +func TestSerializeAndParseReturnsTheSameMetric(t *testing.T) { + getTestMetrics := func() []telegraf.Metric { + metrics := make([]telegraf.Metric, 0, 2) + + { + tags := map[string]string{ + "className": "KafkaDataQueueWriter", + "cluster": "open-receiver", + "deployment": "nite", + "fullClassName": "com.sumologic.interchange.kafka.queue.KafkaDataQueueWriter", + "mtype": "count", + "node": "nite-open-receiver-1", + "service": "open-receiver", + "stat": "p75", + "_primaryMetricType": "carbon", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 0.0, + } + + metrics = append(metrics, metric.New( + "kafka.queue.beta_trace_ingest_traces.offer.timer", + tags, + fields, + time.Unix(1625855958, 0), + telegraf.Gauge, + ), + ) + } + { + tags := map[string]string{ + "_rawName": "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + "cluster": "open-receiver", + "deployment": "nite", + "node": "nite-open-receiver-1", + "service": "health", + // meta tags + "_1": "nite", + "_2": "nite-open-receiver-1", + "_3": "health", + "_4": "jmx", + "_5": "memoryUsage", + "_6": "pools", + "_7": "Compressed-Class-Space", + "_8": "max", + "name": "Compressed-Class-Space", + } + fields := map[string]interface{}{ + // TODO reconsider this hack + "": 1073741824, + } + + metrics = append(metrics, metric.New( + "nite.nite-open-receiver-1.health.jmx.memoryUsage.pools.Compressed-Class-Space.max", + tags, + fields, + time.Unix(1625855945, 0), + telegraf.Gauge, + ), + ) + } + return metrics + } + + testcases := []struct { + serializerFormat string + }{ + { + serializerFormat: string(carbon2.Carbon2FormatMetricIncludesField), + }, + { + serializerFormat: string(carbon2.Carbon2FormatFieldSeparate), + }, + } + + for _, tc := range testcases { + t.Run(tc.serializerFormat, func(t *testing.T) { + s, err := carbon2.NewSerializer( + tc.serializerFormat, + carbon2.DefaultSanitizeReplaceChar, + ) + require.NoError(t, err) + + expected := getTestMetrics() + b, err := s.SerializeBatch(expected) + require.NoError(t, err) + + t.Logf("\n%s", b) + + p := Parser{} + metrics, err := p.Parse(b) + require.NoError(t, err) + require.Len(t, metrics, len(expected)) + for i, m := range metrics { + assert.Equal(t, expected[i], m) + } + + }) + } +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 4e01fb0a630fe..ceb2b3b2da329 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers/carbon2" "github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/dropwizard" @@ -265,6 +266,8 @@ func NewParser(config *Config) (Parser, error) { parser, err = NewXMLParser(config.MetricName, config.DefaultTags, config.XMLConfig) case "json_v2": parser, err = NewJSONPathParser(config.JSONV2Config) + case "carbon2": + parser, err = NewCarbon2Parser() default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -427,3 +430,7 @@ func NewJSONPathParser(jsonv2config []JSONV2Config) (Parser, error) { Configs: configs, }, nil } + +func NewCarbon2Parser() (Parser, error) { + return carbon2.Parser{}, nil +} diff --git a/plugins/serializers/carbon2/carbon2.go b/plugins/serializers/carbon2/carbon2.go index 4eb5798d64a69..34b3dcfe9da30 100644 --- a/plugins/serializers/carbon2/carbon2.go +++ b/plugins/serializers/carbon2/carbon2.go @@ -125,6 +125,12 @@ func (s *Serializer) IsMetricsFormatUnset() bool { } func serializeMetricFieldSeparate(name, fieldName string) string { + if fieldName == "" { + return fmt.Sprintf("metric=%s ", + strings.Replace(name, " ", "_", -1), + ) + } + return fmt.Sprintf("metric=%s field=%s ", strings.Replace(name, " ", "_", -1), strings.Replace(fieldName, " ", "_", -1), @@ -132,6 +138,12 @@ func serializeMetricFieldSeparate(name, fieldName string) string { } func serializeMetricIncludeField(name, fieldName string) string { + if fieldName == "" { + return fmt.Sprintf("metric=%s ", + strings.Replace(name, " ", "_", -1), + ) + } + return fmt.Sprintf("metric=%s_%s ", strings.Replace(name, " ", "_", -1), strings.Replace(fieldName, " ", "_", -1), @@ -143,6 +155,8 @@ func formatValue(fieldValue interface{}) string { case bool: // Print bools as 0s and 1s return fmt.Sprintf("%d", bool2int(v)) + case float64, float32: + return fmt.Sprintf("%f", v) default: return fmt.Sprintf("%v", v) }