-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c483e16
commit e12d9f9
Showing
4 changed files
with
252 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# Graphite Output Plugin | ||
|
||
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html) via raw TCP. | ||
|
||
Parameters: | ||
|
||
Servers []string | ||
Prefix string | ||
Timeout int | ||
|
||
* `servers`: List of strings, ["mygraphiteserver:2003"]. | ||
* `prefix`: String use to prefix all sent metrics. | ||
* `timeout`: Connection timeout in second. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package graphite | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"github.com/influxdb/influxdb/client/v2" | ||
"github.com/influxdb/telegraf/plugins/outputs" | ||
"log" | ||
"math/rand" | ||
"net" | ||
"strings" | ||
"time" | ||
) | ||
|
||
type Graphite struct { | ||
// URL is only for backwards compatability | ||
Servers []string | ||
Prefix string | ||
Timeout int | ||
conns []net.Conn | ||
} | ||
|
||
var sampleConfig = ` | ||
# TCP raw endpoint for your graphite instance. | ||
servers = ["mygraphiteserver:2003"] # default "localhost:2003" | ||
# Prefix metrics name | ||
prefix = "" # default "" | ||
# Connection timeout in second (for the connection with Carbon(Graphite)) | ||
timeout = 2 # default 2s | ||
` | ||
|
||
func (g *Graphite) Connect() error { | ||
// Set default values | ||
if g.Timeout <= 0 { | ||
g.Timeout = 2 | ||
} | ||
if len(g.Servers) == 0 { | ||
g.Servers = append(g.Servers, "localhost:2003") | ||
} | ||
// Get Connections | ||
var conns []net.Conn | ||
for _, server := range g.Servers { | ||
conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second) | ||
if err == nil { | ||
conns = append(conns, conn) | ||
} | ||
} | ||
g.conns = conns | ||
return nil | ||
} | ||
|
||
func (g *Graphite) Close() error { | ||
// Closing all connections | ||
for _, conn := range g.conns { | ||
conn.Close() | ||
} | ||
return nil | ||
} | ||
|
||
func (g *Graphite) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (g *Graphite) Description() string { | ||
return "Configuration for Graphite server to send metrics to using TCP raw protocol" | ||
} | ||
|
||
// Choose a random server in the cluster to write to until a successful write | ||
// occurs, logging each unsuccessful. If all servers fail, return error. | ||
func (g *Graphite) Write(points []*client.Point) error { | ||
// Prepare data | ||
var bp []string | ||
for _, point := range points { | ||
// Get name | ||
name := point.Name() | ||
// Convert UnixNano to Unix timestamps | ||
timestamp := point.UnixNano() / 1000000000 | ||
|
||
for field_name, value := range point.Fields() { | ||
// Convert value | ||
value_str := fmt.Sprintf("%#v", value) | ||
// Write graphite point | ||
var graphitePoint string | ||
if name == field_name { | ||
graphitePoint = fmt.Sprintf("%s.%s %s %d\n", | ||
strings.Replace(point.Tags()["host"], ".", "_", -1), | ||
strings.Replace(name, ".", "_", -1), | ||
value_str, | ||
timestamp) | ||
} else { | ||
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n", | ||
strings.Replace(point.Tags()["host"], ".", "_", -1), | ||
strings.Replace(name, ".", "_", -1), | ||
strings.Replace(field_name, ".", "_", -1), | ||
value_str, | ||
timestamp) | ||
} | ||
if g.Prefix != "" { | ||
graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint) | ||
} | ||
bp = append(bp, graphitePoint) | ||
//fmt.Printf(graphitePoint) | ||
} | ||
} | ||
graphitePoints := strings.Join(bp, "") | ||
|
||
// This will get set to nil if a successful write occurs | ||
err := errors.New("Could not write to any Graphite server in cluster\n") | ||
|
||
// Send data to a random server | ||
p := rand.Perm(len(g.conns)) | ||
for _, n := range p { | ||
if _, e := fmt.Fprintf(g.conns[n], graphitePoints); e != nil { | ||
// Error | ||
log.Println("ERROR: " + err.Error()) | ||
// Let's try the next one | ||
} else { | ||
// Success | ||
err = nil | ||
break | ||
} | ||
} | ||
// try to reconnect | ||
if err != nil { | ||
g.Connect() | ||
} | ||
return err | ||
} | ||
|
||
func init() { | ||
outputs.Add("graphite", func() outputs.Output { | ||
return &Graphite{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package graphite | ||
|
||
import ( | ||
"bufio" | ||
"net" | ||
"net/textproto" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdb/influxdb/client/v2" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestGraphiteError(t *testing.T) { | ||
// Init plugin | ||
g := Graphite{ | ||
Servers: []string{"127.0.0.1:2003", "127.0.0.1:12003"}, | ||
Prefix: "my.prefix", | ||
} | ||
// Init points | ||
pt1, _ := client.NewPoint( | ||
"mymeasurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"mymeasurement": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
// Prepare point list | ||
var points []*client.Point | ||
points = append(points, pt1) | ||
// Error | ||
err1 := g.Connect() | ||
require.NoError(t, err1) | ||
err2 := g.Write(points) | ||
require.Error(t, err2) | ||
assert.Equal(t, "Could not write to any Graphite server in cluster\n", err2.Error()) | ||
} | ||
|
||
func TestGraphiteOK(t *testing.T) { | ||
var wg sync.WaitGroup | ||
// Init plugin | ||
g := Graphite{ | ||
Prefix: "my.prefix", | ||
} | ||
// Init points | ||
pt1, _ := client.NewPoint( | ||
"mymeasurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"mymeasurement": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
pt2, _ := client.NewPoint( | ||
"mymeasurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"value": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
pt3, _ := client.NewPoint( | ||
"my_measurement", | ||
map[string]string{"host": "192.168.0.1"}, | ||
map[string]interface{}{"value": float64(3.14)}, | ||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||
) | ||
// Prepare point list | ||
var points []*client.Point | ||
points = append(points, pt1) | ||
points = append(points, pt2) | ||
points = append(points, pt3) | ||
// Start TCP server | ||
wg.Add(1) | ||
go TCPServer(t, &wg) | ||
wg.Wait() | ||
// Connect | ||
wg.Add(1) | ||
err1 := g.Connect() | ||
wg.Wait() | ||
require.NoError(t, err1) | ||
// Send Data | ||
err2 := g.Write(points) | ||
require.NoError(t, err2) | ||
wg.Add(1) | ||
// Waiting TCPserver | ||
wg.Wait() | ||
g.Close() | ||
} | ||
|
||
func TCPServer(t *testing.T, wg *sync.WaitGroup) { | ||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") | ||
wg.Done() | ||
conn, _ := tcpServer.Accept() | ||
wg.Done() | ||
reader := bufio.NewReader(conn) | ||
tp := textproto.NewReader(reader) | ||
data1, _ := tp.ReadLine() | ||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data1) | ||
data2, _ := tp.ReadLine() | ||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.value 3.14 1289430000", data2) | ||
data3, _ := tp.ReadLine() | ||
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) | ||
conn.Close() | ||
wg.Done() | ||
} |