Skip to content

Commit

Permalink
add librato output plugin, update datadog plugin to skip non-number m…
Browse files Browse the repository at this point in the history
…etrics
  • Loading branch information
jipperinbham committed Oct 26, 2015
1 parent cb951eb commit 8df0c63
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 5 deletions.
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
_ "github.com/influxdb/telegraf/outputs/librato"
_ "github.com/influxdb/telegraf/outputs/mqtt"
_ "github.com/influxdb/telegraf/outputs/opentsdb"
)
9 changes: 9 additions & 0 deletions outputs/datadog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Datadog Output Plugin

This plugin writes to the [Datadog Metrics API](http://docs.datadoghq.com/api/#metrics)
and requires an `apikey` which can be obtained [here](https://app.datadoghq.com/account/settings#api)
for the account.

If the point value being sent cannot be converted to a float64, the metric is skipped.

Metrics are grouped by converting any `_` characters to `.` in the Point Name.
16 changes: 11 additions & 5 deletions outputs/datadog/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"sort"
Expand Down Expand Up @@ -65,20 +66,25 @@ func (d *Datadog) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
ts := TimeSeries{
Series: make([]*Metric, len(points)),
}
for index, pt := range points {
ts := TimeSeries{}
var tempSeries = make([]*Metric, len(points))
var acceptablePoints = 0
for _, pt := range points {
metric := &Metric{
Metric: strings.Replace(pt.Name(), "_", ".", -1),
Tags: buildTags(pt.Tags()),
Host: pt.Tags()["host"],
}
if p, err := buildPoint(pt); err == nil {
metric.Points[0] = p
tempSeries[acceptablePoints] = metric
acceptablePoints += 1
} else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
}
ts.Series[index] = metric
}
ts.Series = make([]*Metric, acceptablePoints)
copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts)
if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
Expand Down
12 changes: 12 additions & 0 deletions outputs/librato/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Librato Output Plugin

This plugin writes to the [Librato Metrics API](http://dev.librato.com/v1/metrics#metrics)
and requires an `api_user` and `api_token` which can be obtained [here](https://metrics.librato.com/account/api_tokens)
for the account.

The `source_tag` option in the Configuration file is used to send contextual information from
Point Tags to the API.

If the point value being sent cannot be converted to a float64, the metric is skipped.

Currently, the plugin does not send any associated Point Tags.
165 changes: 165 additions & 0 deletions outputs/librato/librato.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package librato

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"

"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/duration"
"github.com/influxdb/telegraf/outputs"
)

type Librato struct {
ApiUser string
ApiToken string
SourceTag string
Timeout duration.Duration

apiUrl string
client *http.Client
}

var sampleConfig = `
# Librator API Docs
# http://dev.librato.com/v1/metrics-authentication
# Librato API user
api_user = "telegraf@influxdb.com" # required.
# Librato API token
api_token = "my-secret-token" # required.
# Tag Field to populate source attribute (optional)
# This is typically the _hostname_ from which the metric was obtained.
source_tag = "hostname"
# Connection timeout.
# timeout = "5s"
`

type Metrics struct {
Gauges []*Gauge `json:"gauges"`
}

type Gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
Source string `json:"source"`
MeasureTime int64 `json:"measure_time"`
}

const librato_api = "https://metrics-api.librato.com/v1/metrics"

func NewLibrato(apiUrl string) *Librato {
return &Librato{
apiUrl: apiUrl,
}
}

func (l *Librato) Connect() error {
if l.ApiUser == "" || l.ApiToken == "" {
return fmt.Errorf("api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Timeout: l.Timeout.Duration,
}
return nil
}

func (l *Librato) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
metrics := Metrics{}
var tempGauges = make([]*Gauge, len(points))
var acceptablePoints = 0
for _, pt := range points {
if gauge, err := l.buildGauge(pt); err == nil {
tempGauges[acceptablePoints] = gauge
acceptablePoints += 1
} else {
log.Printf("unable to build Gauge for %s, skipping\n", pt.Name())
}
}
metrics.Gauges = make([]*Gauge, acceptablePoints)
copy(metrics.Gauges, tempGauges[0:])
metricsBytes, err := json.Marshal(metrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.ApiUser, l.ApiToken)

resp, err := l.client.Do(req)
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}

return nil
}

func (l *Librato) SampleConfig() string {
return sampleConfig
}

func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to."
}

func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) {
gauge := &Gauge{
Name: pt.Name(),
MeasureTime: pt.Time().Unix(),
}
if err := gauge.setValue(pt.Fields()["value"]); err != nil {
return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error())
}
if l.SourceTag != "" {
if source, ok := pt.Tags()[l.SourceTag]; ok {
gauge.Source = source
} else {
return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag)
}
}
return gauge, nil
}

func (g *Gauge) setValue(v interface{}) error {
switch d := v.(type) {
case int:
g.Value = float64(int(d))
case int32:
g.Value = float64(int32(d))
case int64:
g.Value = float64(int64(d))
case float32:
g.Value = float64(d)
case float64:
g.Value = float64(d)
default:
return fmt.Errorf("undeterminable type %+v", d)
}
return nil
}

func (l *Librato) Close() error {
return nil
}

func init() {
outputs.Add("librato", func() outputs.Output {
return NewLibrato(librato_api)
})
}
Loading

0 comments on commit 8df0c63

Please sign in to comment.