diff --git a/README.md b/README.md index 5db1dcb07e782..8dd42c6d57261 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,7 @@ found by running `telegraf -sample-config`. * opentsdb * amqp (rabbitmq) * mqtt +* riemann ## Contributing diff --git a/outputs/all/all.go b/outputs/all/all.go index 6538af0d2a71f..b949382875224 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -7,4 +7,5 @@ import ( _ "github.com/influxdb/telegraf/outputs/kafka" _ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/opentsdb" + _ "github.com/influxdb/telegraf/outputs/riemann" ) diff --git a/outputs/riemann/riemann.go b/outputs/riemann/riemann.go new file mode 100644 index 0000000000000..3d36ca3d80bea --- /dev/null +++ b/outputs/riemann/riemann.go @@ -0,0 +1,96 @@ +package riemann + +import ( + "errors" + "fmt" + "os" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/outputs" + "github.com/allenj/raidman" +) + +type Riemann struct { + URL string + Transport string + + client raidman.Client +} + +var sampleConfig = ` + # URL of server + url = "localhost:5555" + # transport protocol to use either tcp or udp + transport = "tcp" +` + +func (r *Riemann) Connect() error { + c, err := raidman.Dial(r.Transport, r.URL) + + if err != nil { + return err + } + + r.client = *c + return nil +} + +func (r *Riemann) Close() error { + r.client.Close() + return nil +} + +func (r *Riemann) SampleConfig() string { + return sampleConfig +} + +func (r *Riemann) Description() string { + return "Configuration for the Riemann server to send metrics to" +} + +func (r *Riemann) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + + var events []*raidman.Event + for _, p := range points { + ev := buildEvent(p) + events = append(events, &ev) + } + + var senderr = r.client.SendMulti(events) + if senderr != nil { + return errors.New(fmt.Sprintf("FAILED to send riemann message: %s\n", + senderr)) + } + + return nil +} + +func buildEvent(p *client.Point) (raidman.Event) { + host := p.Tags()["host"] + + if len(host) == 0 { + hostname, err := os.Hostname() + if err != nil { + host = "unknown" + } else { + host = hostname + } + } + + var event = &raidman.Event{ + Host: host, + Service: p.Name(), + Metric: p.Fields()["value"], + } + + return *event +} + +func init() { + outputs.Add("riemann", func() outputs.Output { + return &Riemann{} + }) +} diff --git a/outputs/riemann/riemann_test.go b/outputs/riemann/riemann_test.go new file mode 100644 index 0000000000000..2d56f41916721 --- /dev/null +++ b/outputs/riemann/riemann_test.go @@ -0,0 +1,27 @@ +package riemann + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + url := testutil.GetLocalHost() + ":5555" + + r := &Riemann{ + URL: url, + Transport: "tcp", + } + + err := r.Connect() + require.NoError(t, err) + + err = r.Write(testutil.MockBatchPoints().Points()) + require.NoError(t, err) +}