-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
069cb97
commit 5c3379d
Showing
4 changed files
with
261 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# Graphite Output Plugin | ||
|
||
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html) via raw TCP. | ||
|
||
Parameters: | ||
|
||
Servers []string | ||
Prefix string | ||
Timeout int | ||
|
||
* `servers`: List of strings, ["mygraphiteserver:2003"]. | ||
* `prefix`: String use to prefix all sent metrics. | ||
* `timeout`: Connection timeout in second. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package graphite | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"github.com/influxdb/influxdb/client/v2" | ||
"github.com/influxdb/telegraf/outputs" | ||
"log" | ||
"math/rand" | ||
"net" | ||
"strings" | ||
"time" | ||
) | ||
|
||
type Graphite struct { | ||
// URL is only for backwards compatability | ||
Servers []string | ||
Prefix string | ||
Timeout int | ||
conns []net.Conn | ||
} | ||
|
||
var sampleConfig = ` | ||
# TCP raw endpoint for your graphite instance. | ||
servers = ["mygraphiteserver:2003"] # default "localhost:2003" | ||
# Prefix metrics name | ||
prefix = "" # default "" | ||
# Timeout in second | ||
timeout = 2 # default 2 | ||
` | ||
|
||
func (g *Graphite) Connect() error { | ||
// Set default values | ||
if g.Timeout <= 0 { | ||
g.Timeout = 2 | ||
} | ||
if len(g.Servers) == 0 { | ||
g.Servers = append(g.Servers, "localhost:2003") | ||
} | ||
// Get Connections | ||
var conns []net.Conn | ||
for _, server := range g.Servers { | ||
conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second) | ||
if err == nil { | ||
conns = append(conns, conn) | ||
} | ||
} | ||
g.conns = conns | ||
return nil | ||
} | ||
|
||
func (g *Graphite) Close() error { | ||
// Closing all connections | ||
for _, conn := range g.conns { | ||
conn.Close() | ||
} | ||
return nil | ||
} | ||
|
||
func (g *Graphite) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (g *Graphite) Description() string { | ||
return "Configuration for Graphite server to send metrics to using TCP raw protocol" | ||
} | ||
|
||
// Choose a random server in the cluster to write to until a successful write | ||
// occurs, logging each unsuccessful. If all servers fail, return error. | ||
func (g *Graphite) Write(points []*client.Point) error { | ||
// Prepare data | ||
var bp []string | ||
for _, point := range points { | ||
// Get name | ||
name := point.Name() | ||
// Convert UnixNano to Unix timestamps | ||
timestamp := point.UnixNano() / 1000000000 | ||
|
||
for field_name, value := range point.Fields() { | ||
// Convert value | ||
var value_str string | ||
switch value := value.(type) { | ||
case int64: | ||
value_str = fmt.Sprintf("%d", int64(value)) | ||
case float64: | ||
value_str = fmt.Sprintf("%f", float64(value)) | ||
default: | ||
} | ||
// Write graphite point | ||
var graphitePoint string | ||
if name == field_name { | ||
graphitePoint = fmt.Sprintf("%s.%s %s %d\n", | ||
strings.Replace(point.Tags()["host"], ".", "_", -1), | ||
strings.Replace(name, ".", "_", -1), | ||
value_str, | ||
timestamp) | ||
} else { | ||
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n", | ||
strings.Replace(point.Tags()["host"], ".", "_", -1), | ||
strings.Replace(name, ".", "_", -1), | ||
strings.Replace(field_name, ".", "_", -1), | ||
value_str, | ||
timestamp) | ||
} | ||
if g.Prefix != "" { | ||
graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint) | ||
} | ||
bp = append(bp, graphitePoint) | ||
//fmt.Printf(graphitePoint) | ||
} | ||
} | ||
graphitePoints := strings.Join(bp, "") | ||
|
||
// This will get set to nil if a successful write occurs | ||
err := errors.New("Could not write to any Graphite server in cluster\n") | ||
|
||
// Send data to a random server | ||
p := rand.Perm(len(g.conns)) | ||
for _, n := range p { | ||
if _, e := fmt.Fprintf(g.conns[n], graphitePoints); e != nil { | ||
// Error | ||
log.Println("ERROR: " + err.Error()) | ||
// Let's try the next one | ||
} else { | ||
// Success | ||
err = nil | ||
break | ||
} | ||
} | ||
|
||
return err | ||
} | ||
|
||
func init() { | ||
outputs.Add("graphite", func() outputs.Output { | ||
return &Graphite{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package graphite | ||
|
||
import ( | ||
"bufio" | ||
"net/textproto" | ||
// "io" | ||
// "fmt" | ||
// "bytes" | ||
"net" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdb/influxdb/client/v2" | ||
|
||
// "github.com/influxdb/telegraf/testutil" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestGraphiteError(t *testing.T) { | ||
// Init plugin | ||
g := Graphite{ | ||
Servers: []string{"127.0.0.1:2003", "127.0.0.1:12003"}, | ||
Prefix: "my.prefix", | ||
} | ||
// Init points | ||
pt1, _ := client.NewPoint( | ||
"mymeasurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"mymeasurement": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
// Prepare point list | ||
var points []*client.Point | ||
points = append(points, pt1) | ||
// Error | ||
err1 := g.Connect() | ||
require.NoError(t, err1) | ||
err2 := g.Write(points) | ||
require.Error(t, err2) | ||
assert.Equal(t, "Could not write to any Graphite server in cluster\n", err2.Error()) | ||
} | ||
|
||
func TestGraphiteOK(t *testing.T) { | ||
var wg sync.WaitGroup | ||
// Init plugin | ||
g := Graphite{ | ||
Prefix: "my.prefix", | ||
} | ||
// Init points | ||
pt1, _ := client.NewPoint( | ||
"mymeasurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"mymeasurement": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
pt2, _ := client.NewPoint( | ||
"mymeasurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"value": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
pt3, _ := client.NewPoint( | ||
"my_measurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"value": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
// Prepare point list | ||
var points []*client.Point | ||
points = append(points, pt1) | ||
points = append(points, pt2) | ||
points = append(points, pt3) | ||
// Start TCP server | ||
wg.Add(1) | ||
go TCPServer(t, &wg) | ||
wg.Wait() | ||
// Connect | ||
wg.Add(1) | ||
err1 := g.Connect() | ||
wg.Wait() | ||
require.NoError(t, err1) | ||
// Send Data | ||
err2 := g.Write(points) | ||
require.NoError(t, err2) | ||
wg.Add(1) | ||
// Waiting TCPserver | ||
wg.Wait() | ||
g.Close() | ||
} | ||
|
||
func TCPServer(t *testing.T, wg *sync.WaitGroup) { | ||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") | ||
wg.Done() | ||
conn, _ := tcpServer.Accept() | ||
wg.Done() | ||
reader := bufio.NewReader(conn) | ||
tp := textproto.NewReader(reader) | ||
data1, _ := tp.ReadLine() | ||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.140000 1289430000", data1) | ||
data2, _ := tp.ReadLine() | ||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.value 3.140000 1289430000", data2) | ||
data3, _ := tp.ReadLine() | ||
assert.Equal(t, "my.prefix.192_168_0_1.measurement.value 3.140000 1289430000", data3) | ||
conn.Close() | ||
wg.Done() | ||
} |