Skip to content

Commit

Permalink
Merge pull request #3 from puckpuck/wavefront-output
Browse files Browse the repository at this point in the history
Wavefront output
  • Loading branch information
ezeev authored Dec 8, 2016
2 parents 3cca45c + 91fdbe3 commit cbec765
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 90 deletions.
117 changes: 117 additions & 0 deletions plugins/outputs/wavefront/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Wavefront Output Plugin

This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefront data format over TCP.


## Wavefront Data format

The expected input for Wavefront is specified in the following way:

```
<metric> <value> [<timestamp>] <source|host>=<soureTagValue> [tagk1=tagv1 ...tagkN=tagvN]
```

More information about the Wavefront data format is available [here](https://community.wavefront.com/docs/DOC-1031)


By default, to ease Metrics browsing in the Wavefront UI, metrics are grouped by converting any `_` characters to `.` in the final name.
This behavior can be altered by changing the `metric_separator` and/or the `convert_paths` settings.
Most illegal characters in the metric name are automatically converted to `-`.
The `use_regex` setting can be used to ensure all illegal characters are properly handled, but can lead to performance degradation.

## Configuration:

```toml
# Configuration for Wavefront output
[[outputs.wavefront]]
## prefix for metrics keys
prefix = "my.specific.prefix."

## DNS name of the wavefront proxy server
host = "wavefront.example.com"

## Port that the Wavefront proxy server listens on
port = 2878

## wether to use "value" for name of simple fields
simple_fields = false

## character to use between metric and field name. defaults to . (dot)
metric_separator = "."

## Convert metric name paths to use metricSeperator character
## When true (default) will convert all _ (underscore) chartacters in final metric name
convert_paths = true

## Use Regex to sanitize metric and tag names from invalid characters
## Regex is more thorough, but significantly slower
use_regex = false

## point tags to use as the source name for Wavefront (if none found, host will be used)
source_tags = ["hostname", "snmp_host", "node_host"]

## Print additional debug information requires debug = true at the agent level
debug_all = false
```

Parameters:

Prefix string
Host string
Port int
SimpleFields bool
MetricSeparator string
ConvertPaths bool
UseRegex bool
SourceTags string
DebugAll bool

* `prefix`: String to use as a prefix for all sent metrics.
* `host`: Name of Wavefront proxy server
* `port`: Port that Wavefront proxy server is configured for `pushListenerPorts`
* `simple_fields`: if false (default) metric field names called `value` are converted to empty strings
* `metric_separator`: character to use to separate metric and field names. (default is `_`)
* `convert_paths`: if true (default) will convert all `_` in metric and field names to `metric_seperator`
* `use_regex`: if true (default is false) will use regex to ensure all illegal characters are converted to `-`. Regex is much slower than the default mode which will catch most illegal characters. Use with caution.
* `source_tags`: ordered list of point tags to use as the source name for Wavefront. Once a match a found that tag is used for that point. If no tags are found the host tag will be used.
* `debug_all`: Will output additional debug information. Requires `debug = true` to be configured at the agent level


##

The Wavefront proxy interface can be simulated with this reader:

```
// wavefront_proxy_mock.go
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
l, err := net.Listen("tcp", "localhost:2878")
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go func(c net.Conn) {
defer c.Close()
io.Copy(os.Stdout, c)
}(conn)
}
}
```

## Allowed values for metrics

Wavefront allows `integers` and `floats` as input values
72 changes: 45 additions & 27 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


package wavefront

import (
Expand All @@ -16,15 +14,14 @@ import (
)

type Wavefront struct {
Prefix string
Host string
Port int
Prefix string
SimpleFields bool
MetricSeparator string
ConvertPaths bool
UseRegex bool

Debug bool
SourceTags []string
DebugAll bool
}

Expand All @@ -33,16 +30,18 @@ var sanitizedChars = strings.NewReplacer(
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
">", "-", ",", "-", "?", "-", "\\", "-", "|", "-", " ", "-",
">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-",
)
// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
var sanitizedRegex, _ = regexp.Compile("[^a-zA-Z\\d_.-]")

var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-")

var pathReplacer = strings.NewReplacer("_", "_")

var sampleConfig = `
## prefix for metrics keys
prefix = "my.specific.prefix."
#prefix = "my.specific.prefix."
## DNS name of the wavefront proxy server
host = "wavefront.example.com"
Expand All @@ -51,21 +50,24 @@ var sampleConfig = `
port = 2878
## wether to use "value" for name of simple fields
simple_fields = false
#simple_fields = false
## character to use between metric and field name. defaults to . (dot)
metric_separator = "."
#metric_separator = "."
## Convert metric name paths to use metricSeperator character
## When true (edfault) will convert all _ (underscore) chartacters in final metric name
convert_paths = true
## When true (default) will convert all _ (underscore) chartacters in final metric name
#convert_paths = true
## Use Regex to sanitize metric and tag names from invalid characters
## Regex is more thorough, but significantly slower
use_regex = false
#use_regex = false
## point tags to use as the source name for Wavefront (if none found, host will be used)
#source_tags = ["hostname", "snmp_host", "node_host"]
## Print all Wavefront communication
debug = false
## Print additional debug information requires debug = true at the agent level
#debug_all = false
`

type MetricLine struct {
Expand Down Expand Up @@ -115,9 +117,7 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
for _, metric := range buildMetrics(m, w) {
messageLine := fmt.Sprintf("%s %s %v %s\n", metric.Metric, metric.Value, metric.Timestamp, metric.Tags)
if w.Debug {
log.Printf("DEBUG: output [wavefront] %s", messageLine)
}
log.Printf("D! Output [wavefront] %s", messageLine)
_, err := connection.Write([]byte(messageLine))
if err != nil {
return fmt.Errorf("Wavefront: TCP writing error %s", err.Error())
Expand All @@ -129,34 +129,53 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
}

func buildTags(mTags map[string]string, w *Wavefront) []string {
sourceTagFound := false

for _, s := range w.SourceTags {
for k, v := range mTags {
if k == s {
mTags["source"] = v
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
mTags["source"] = mTags["host"]
}
mTags["telegraf_host"] = mTags["host"]
delete(mTags, "host")

tags := make([]string, len(mTags))
index := 0
for k, v := range mTags {
if k == "host" {
k = "source"
}

if w.UseRegex {
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedRegex.ReplaceAllString(k, "-"), sanitizedRegex.ReplaceAllString(v, "-"))
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedRegex.ReplaceAllString(k, "-"), tagValueReplacer.Replace(v))
} else {
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedChars.Replace(k), sanitizedChars.Replace(v))
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedChars.Replace(k), tagValueReplacer.Replace(v))
}

index++
}

sort.Strings(tags)
return tags
}

func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricLine {
if w.DebugAll {
log.Printf("DEBUG: output [wavefront] original name: %s\n", m.Name())
log.Printf("D! Output [wavefront] original name: %s\n", m.Name())
}

ret := []*MetricLine{}
for fieldName, value := range m.Fields() {
if w.DebugAll {
log.Printf("DEBUG: output [wavefront] original field: %s\n", fieldName)
log.Printf("D! Output [wavefront] original field: %s\n", fieldName)
}

var name string
Expand All @@ -182,7 +201,7 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricLine {
}
metricValue, buildError := buildValue(value, metric.Metric)
if buildError != nil {
log.Printf("ERROR: output [wavefront] %s\n", buildError.Error())
log.Printf("E! Output [wavefront] %s\n", buildError.Error())
continue
}
metric.Value = metricValue
Expand Down Expand Up @@ -240,4 +259,3 @@ func init() {
}
})
}

67 changes: 48 additions & 19 deletions plugins/outputs/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,39 @@ func defaultWavefront() *Wavefront {
MetricSeparator: ".",
ConvertPaths: true,
UseRegex: false,
Debug: true,
}
}

func TestSourceTags(t *testing.T) {
w := defaultWavefront()
w.SourceTags = []string{"snmp_host", "hostagent"}

var tagtests = []struct {
ptIn map[string]string
outTags []string
}{
{
map[string]string{"snmp_host": "realHost", "host": "origHost"},
[]string{"source=\"realHost\"", "telegraf_host=\"origHost\""},
},
{
map[string]string{"hostagent": "realHost", "host": "origHost"},
[]string{"source=\"realHost\"", "telegraf_host=\"origHost\""},
},
{
map[string]string{"hostagent": "abc", "snmp_host": "realHost", "host": "origHost"},
[]string{"hostagent=\"abc\"", "source=\"realHost\"", "telegraf_host=\"origHost\""},
},
{
map[string]string{"something": "abc", "host": "realHost"},
[]string{"something=\"abc\"", "source=\"realHost\"", "telegraf_host=\"realHost\""},
},
}
for _, tt := range tagtests {
tags := buildTags(tt.ptIn, w)
if !reflect.DeepEqual(tags, tt.outTags) {
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags)
}
}
}

Expand All @@ -38,9 +70,9 @@ func TestBuildMetricsNoSimpleFields(t *testing.T) {
)

var metricTests = []struct {
metric telegraf.Metric
metric telegraf.Metric
metricLines []MetricLine
} {
}{
{
testutil.TestMetric(float64(1.0), "testing_just*a%metric:float"),
[]MetricLine{{Metric: w.Prefix + "testing.just-a-metric-float", Value: "1.000000"}},
Expand Down Expand Up @@ -78,9 +110,9 @@ func TestBuildMetricsWithSimpleFields(t *testing.T) {
)

var metricTests = []struct {
metric telegraf.Metric
metric telegraf.Metric
metricLines []MetricLine
} {
}{
{
testutil.TestMetric(float64(1.0), "testing_just*a%metric:float"),
[]MetricLine{{Metric: w.Prefix + "testing.just-a-metric-float.value", Value: "1.000000"}},
Expand Down Expand Up @@ -111,28 +143,24 @@ func TestBuildTags(t *testing.T) {
outTags []string
}{
{
map[string]string{"one": "two", "three": "four"},
[]string{"one=\"two\"", "three=\"four\""},
map[string]string{"one": "two", "three": "four", "host": "testHost"},
[]string{"one=\"two\"", "source=\"testHost\"", "telegraf_host=\"testHost\"", "three=\"four\""},
},
{
map[string]string{"aaa": "bbb"},
[]string{"aaa=\"bbb\""},
map[string]string{"aaa": "bbb", "host": "testHost"},
[]string{"aaa=\"bbb\"", "source=\"testHost\"", "telegraf_host=\"testHost\""},
},
{
map[string]string{"bbb": "789", "aaa": "123"},
[]string{"aaa=\"123\"", "bbb=\"789\""},
map[string]string{"bbb": "789", "aaa": "123", "host": "testHost"},
[]string{"aaa=\"123\"", "bbb=\"789\"", "source=\"testHost\"", "telegraf_host=\"testHost\""},
},
{
map[string]string{"host": "aaa", "dc": "bbb"},
[]string{"dc=\"bbb\"", "source=\"aaa\""},
[]string{"dc=\"bbb\"", "source=\"aaa\"", "telegraf_host=\"aaa\""},
},
{
map[string]string{"Sp%ci@l Chars": "\"g$t repl#ced"},
[]string{"Sp-ci-l-Chars=\"-g-t-repl-ced\""},
},
{
map[string]string{},
[]string{},
map[string]string{"Sp%ci@l Chars": "\"g*t repl#ced", "host": "testHost"},
[]string{"Sp-ci-l-Chars=\"\\\"g-t repl#ced\"", "source=\"testHost\"", "telegraf_host=\"testHost\""},
},
}
for _, tt := range tagtests {
Expand Down Expand Up @@ -179,4 +207,5 @@ func TestBuildTags(t *testing.T) {

// err = w.Write(metrics)
// require.NoError(t, err)
// }
// }

Loading

0 comments on commit cbec765

Please sign in to comment.