Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the Librato output-plugin #722

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions plugins/outputs/librato/librato.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)

type Librato struct {
ApiUser string
ApiToken string
SourceTag string
Timeout internal.Duration
ApiUser string
ApiToken string
Debug bool
NameFromTags bool
SourceTag string
Timeout internal.Duration

apiUrl string
client *http.Client
Expand All @@ -32,9 +37,12 @@ var sampleConfig = `
## Librato API token
api_token = "my-secret-token" # required.

## Tag Field to populate source attribute (optional)
## This is typically the _hostname_ from which the metric was obtained.
source_tag = "hostname"
### Debug
# debug = false

### Tag Field to populate source attribute (optional)
### This is typically the _hostname_ from which the metric was obtained.
source_tag = "host"

## Connection timeout.
# timeout = "5s"
Expand Down Expand Up @@ -82,17 +90,27 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
metricCounter++
if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
}
}
} else {
log.Printf("unable to build Gauge for %s, skipping\n", m.Name())
if l.Debug {
log.Printf("[DEBUG] Couldn't build gauge: %v\n", err)
}
}
}

lmetrics.Gauges = make([]*Gauge, metricCounter)
copy(lmetrics.Gauges, tempGauges[0:])
metricsBytes, err := json.Marshal(metrics)
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
} else {
if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
}
}
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
if err != nil {
Expand All @@ -103,8 +121,21 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {

resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
} else {
if l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
} else {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
}
}
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
Expand All @@ -122,11 +153,20 @@ func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to."
}

func (l *Librato) buildGaugeName(m telegraf.Metric, fieldName string) string {
// Use the GraphiteSerializer
graphiteSerializer := graphite.GraphiteSerializer{}
serializedMetric := graphiteSerializer.SerializeBucketName(m, fieldName)

// Deal with slash characters:
return strings.Replace(serializedMetric, "/", "-", -1)
}

func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{}
for fieldName, value := range m.Fields() {
gauge := &Gauge{
Name: m.Name() + "_" + fieldName,
Name: l.buildGaugeName(m, fieldName),
MeasureTime: m.Time().Unix(),
}
if err := gauge.setValue(value); err != nil {
Expand All @@ -142,6 +182,10 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
l.SourceTag)
}
}
gauges = append(gauges, gauge)
}
if l.Debug {
fmt.Printf("[DEBUG] Built gauges: %v\n", gauges)
}
return gauges, nil
}
Expand Down
34 changes: 21 additions & 13 deletions plugins/outputs/librato/librato_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/testutil"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

Expand All @@ -28,6 +28,14 @@ func fakeLibrato() *Librato {
return l
}

func BuildTags(t *testing.T) {
testMetric := testutil.TestMetric(0.0, "test1")
graphiteSerializer := graphite.GraphiteSerializer{}
tags, err := graphiteSerializer.Serialize(testMetric)
fmt.Printf("Tags: %v", tags)
require.NoError(t, err)
}

func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -78,7 +86,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric(0.0, "test1"),
&Gauge{
Name: "test1",
Name: "value1.test1.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 0.0,
},
Expand All @@ -87,7 +95,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric(1.0, "test2"),
&Gauge{
Name: "test2",
Name: "value1.test2.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 1.0,
},
Expand All @@ -96,7 +104,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric(10, "test3"),
&Gauge{
Name: "test3",
Name: "value1.test3.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 10.0,
},
Expand All @@ -105,7 +113,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric(int32(112345), "test4"),
&Gauge{
Name: "test4",
Name: "value1.test4.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 112345.0,
},
Expand All @@ -114,7 +122,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric(int64(112345), "test5"),
&Gauge{
Name: "test5",
Name: "value1.test5.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 112345.0,
},
Expand All @@ -123,7 +131,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric(float32(11234.5), "test6"),
&Gauge{
Name: "test6",
Name: "value1.test6.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 11234.5,
},
Expand All @@ -132,7 +140,7 @@ func TestBuildGauge(t *testing.T) {
{
testutil.TestMetric("11234.5", "test7"),
&Gauge{
Name: "test7",
Name: "value1.test7.value",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 11234.5,
},
Expand Down Expand Up @@ -163,13 +171,13 @@ func TestBuildGauge(t *testing.T) {
func TestBuildGaugeWithSource(t *testing.T) {
pt1, _ := telegraf.NewMetric(
"test1",
map[string]string{"hostname": "192.168.0.1"},
map[string]string{"hostname": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
pt2, _ := telegraf.NewMetric(
"test2",
map[string]string{"hostnam": "192.168.0.1"},
map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
)
Expand All @@ -182,7 +190,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
{
pt1,
&Gauge{
Name: "test1",
Name: "192_168_0_1.value1.test1.value",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 0.0,
Source: "192.168.0.1",
Expand All @@ -192,7 +200,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
{
pt2,
&Gauge{
Name: "test2",
Name: "192_168_0_1.value1.test1.value",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 1.0,
},
Expand Down
50 changes: 30 additions & 20 deletions plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,49 @@ type GraphiteSerializer struct {

func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
// Get name
name := metric.Name()

// Convert UnixNano to Unix timestamps
timestamp := metric.UnixNano() / 1000000000
tag_str := buildTags(metric)

for field_name, value := range metric.Fields() {
// Convert value
value_str := fmt.Sprintf("%#v", value)
// Write graphite metric
var graphitePoint string
if name == field_name {
graphitePoint = fmt.Sprintf("%s.%s %s %d",
tag_str,
strings.Replace(name, ".", "_", -1),
value_str,
timestamp)
} else {
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d",
tag_str,
strings.Replace(name, ".", "_", -1),
strings.Replace(field_name, ".", "_", -1),
value_str,
timestamp)
}
if s.Prefix != "" {
graphitePoint = fmt.Sprintf("%s.%s", s.Prefix, graphitePoint)
}
graphitePoint = fmt.Sprintf("%s %s %d",
s.SerializeBucketName(metric, field_name),
value_str,
timestamp)
out = append(out, graphitePoint)
}
return out, nil
}

func (s *GraphiteSerializer) SerializeBucketName(metric telegraf.Metric, field_name string) string {
// Get the metric name
name := metric.Name()

// Convert UnixNano to Unix timestamps
tag_str := buildTags(metric)

// Write graphite metric
var serializedBucketName string
if name == field_name {
serializedBucketName = fmt.Sprintf("%s.%s",
tag_str,
strings.Replace(name, ".", "_", -1))
} else {
serializedBucketName = fmt.Sprintf("%s.%s.%s",
tag_str,
strings.Replace(name, ".", "_", -1),
strings.Replace(field_name, ".", "_", -1))
}
if s.Prefix != "" {
serializedBucketName = fmt.Sprintf("%s.%s", s.Prefix, serializedBucketName)
}
return serializedBucketName
}

func buildTags(metric telegraf.Metric) string {
var keys []string
tags := metric.Tags()
Expand Down
59 changes: 59 additions & 0 deletions plugins/serializers/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,62 @@ func TestSerializeMetricPrefix(t *testing.T) {
sort.Strings(expS)
assert.Equal(t, expS, mS)
}

func TestSerializeBucketNameNoHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)

s := GraphiteSerializer{}
mS := s.SerializeBucketName(m, "usage_idle")

expS := fmt.Sprintf("cpu0.us-west-2.cpu.usage_idle")
assert.Equal(t, expS, mS)
}

func TestSerializeBucketNameHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)

s := GraphiteSerializer{}
mS := s.SerializeBucketName(m, "usage_idle")

expS := fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle")
assert.Equal(t, expS, mS)
}

func TestSerializeBucketNamePrefix(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)

s := GraphiteSerializer{Prefix: "prefix"}
mS := s.SerializeBucketName(m, "usage_idle")

expS := fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle")
assert.Equal(t, expS, mS)
}