-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This was primarily intended to consume InfluxDB-style expvars, particularly InfluxDB's `/debug/vars` endpoint. That endpoint follows a structure like ```json { "httpd::8086": { "name": "httpd", "tags": { "bind": ":8086" }, "values": { "pointsWrittenOK": 33756, "queryReq": 19, "queryRespBytes": 26973, "req": 428, "writeReq": 205, "writeReqBytes": 3939161 } } } ``` There are an arbitrary number of top-level keys in the JSON response at the configured URLs, and this plugin will iterate through all of their values looking for objects with keys "name", "tags", and "values" indicating a metric to be consumed by telegraf. Running this on current master of InfluxDB, I am able to record nearly the same information that is normally stored in the `_internal` database; the only measurement missing from `_internal` is `runtime`, which is present under the "memstats" key but does not follow the format and so is not consumed in this plugin. ``` $ influx -database=telegraf -execute 'SHOW FIELD KEYS FROM /influxdb/' name: influxdb_influxdb_engine ---------------------------- fieldKey blksWrite blksWriteBytes blksWriteBytesC pointsWrite pointsWriteDedupe name: influxdb_influxdb_httpd --------------------------- fieldKey pingReq pointsWrittenOK queryReq queryRespBytes req writeReq writeReqBytes name: influxdb_influxdb_shard --------------------------- fieldKey fieldsCreate seriesCreate writePointsOk writeReq name: influxdb_influxdb_subscriber -------------------------------- fieldKey pointsWritten name: influxdb_influxdb_wal ------------------------- fieldKey autoFlush flushDuration idleFlush memSize metaFlush pointsFlush pointsWrite pointsWriteReq seriesFlush name: influxdb_influxdb_write --------------------------- fieldKey pointReq pointReqLocal req subWriteOk writeOk ```
- Loading branch information
1 parent
34f0c59
commit 4f3d6dd
Showing
4 changed files
with
325 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# influxdb plugin | ||
|
||
The influxdb plugin collects InfluxDB-formatted data from JSON endpoints. | ||
|
||
With a configuration of: | ||
|
||
```toml | ||
[[plugins.influxdb]] | ||
name = "produce" | ||
urls = [ | ||
"http://127.0.0.1:8086/debug/vars", | ||
"http://192.168.2.1:8086/debug/vars" | ||
] | ||
``` | ||
|
||
And if 127.0.0.1 responds with this JSON: | ||
|
||
```json | ||
{ | ||
"k1": { | ||
"name": "fruit", | ||
"tags": { | ||
"kind": "apple" | ||
}, | ||
"values": { | ||
"inventory": 371, | ||
"sold": 112 | ||
} | ||
}, | ||
"k2": { | ||
"name": "fruit", | ||
"tags": { | ||
"kind": "banana" | ||
}, | ||
"values": { | ||
"inventory": 1000, | ||
"sold": 403 | ||
} | ||
} | ||
} | ||
``` | ||
|
||
And if 192.168.2.1 responds like so: | ||
|
||
```json | ||
{ | ||
"k3": { | ||
"name": "transactions", | ||
"tags": {}, | ||
"values": { | ||
"total": 100, | ||
"balance": 184.75 | ||
} | ||
} | ||
} | ||
``` | ||
|
||
Then the collected metrics will be: | ||
|
||
``` | ||
influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0 | ||
influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0 | ||
influxdb_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75 | ||
``` | ||
|
||
There are two important details to note about the collected metrics: | ||
|
||
1. Even though the values in JSON are being displayed as integers, the metrics are reported as floats. | ||
JSON encoders usually don't print the fractional part for round floats. | ||
Because you cannot change the type of an existing field in InfluxDB, we assume all numbers are floats. | ||
|
||
2. The top-level keys' names (in the example above, `"k1"`, `"k2"`, and `"k3"`) are not considered when recording the metrics. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package influxdb | ||
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/influxdb/telegraf/plugins" | ||
) | ||
|
||
type InfluxDB struct { | ||
Name string | ||
URLs []string `toml:"urls"` | ||
} | ||
|
||
func (*InfluxDB) Description() string { | ||
return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints" | ||
} | ||
|
||
func (*InfluxDB) SampleConfig() string { | ||
return ` | ||
# Reads InfluxDB-formatted JSON from given URLs. | ||
# Works with InfluxDB debug endpoints out of the box, but other services can use this format too. | ||
# See the influxdb plugin's README for more details. | ||
[[plugins.influxdb]] | ||
# Name to use for measurement | ||
name = "influxdb" | ||
# Multiple URLs from which to read InfluxDB-formatted JSON | ||
urls = [ | ||
"http://localhost:8086/debug/vars" | ||
] | ||
` | ||
} | ||
|
||
func (i *InfluxDB) Gather(acc plugins.Accumulator) error { | ||
errorChannel := make(chan error, len(i.URLs)) | ||
|
||
var wg sync.WaitGroup | ||
for _, u := range i.URLs { | ||
wg.Add(1) | ||
go func(url string) { | ||
defer wg.Done() | ||
if err := i.gatherURL(acc, url); err != nil { | ||
errorChannel <- fmt.Errorf("[name=%s][url=%s]: %s", i.Name, url, err) | ||
} | ||
}(u) | ||
} | ||
|
||
wg.Wait() | ||
close(errorChannel) | ||
|
||
// If there weren't any errors, we can return nil now. | ||
if len(errorChannel) == 0 { | ||
return nil | ||
} | ||
|
||
// There were errors, so join them all together as one big error. | ||
errorStrings := make([]string, 0, len(errorChannel)) | ||
for err := range errorChannel { | ||
errorStrings = append(errorStrings, err.Error()) | ||
} | ||
|
||
return errors.New(strings.Join(errorStrings, "\n")) | ||
} | ||
|
||
type point struct { | ||
Name string `json:"name"` | ||
Tags map[string]string `json:"tags"` | ||
Values map[string]interface{} `json:"values"` | ||
} | ||
|
||
// Gathers data from a particular URL | ||
// Parameters: | ||
// acc : The telegraf Accumulator to use | ||
// url : endpoint to send request to | ||
// | ||
// Returns: | ||
// error: Any error that may have occurred | ||
func (i *InfluxDB) gatherURL( | ||
acc plugins.Accumulator, | ||
url string, | ||
) error { | ||
resp, err := http.Get(url) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
// It would be nice to be able to decode into a map[string]point, but | ||
// we'll get a decoder error like: | ||
// `json: cannot unmarshal array into Go value of type influxdb.point` | ||
// if any of the values aren't objects. | ||
// To avoid that error, we decode by hand. | ||
dec := json.NewDecoder(resp.Body) | ||
|
||
// Parse beginning of object | ||
if t, err := dec.Token(); err != nil { | ||
return err | ||
} else if t != json.Delim('{') { | ||
return errors.New("document root must be a JSON object") | ||
} | ||
|
||
// Loop through rest of object | ||
for { | ||
// Nothing left in this object, we're done | ||
if !dec.More() { | ||
break | ||
} | ||
|
||
// Read in a string key. We don't do anything with the top-level keys, so it's discarded. | ||
_, err := dec.Token() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Attempt to parse a whole object into a point. | ||
// It might be a non-object, like a string or array. | ||
// If we fail to decode it into a point, ignore it and move on. | ||
var p point | ||
if err := dec.Decode(&p); err != nil { | ||
continue | ||
} | ||
|
||
// If the object was a point, but was not fully initialized, ignore it and move on. | ||
if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 { | ||
continue | ||
} | ||
|
||
// Add a tag to indicate the source of the data. | ||
p.Tags["url"] = url | ||
|
||
acc.AddFields( | ||
i.Name+"_"+p.Name, | ||
p.Values, | ||
p.Tags, | ||
) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func init() { | ||
plugins.Add("influxdb", func() plugins.Plugin { | ||
return &InfluxDB{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package influxdb_test | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/influxdb/telegraf/plugins/influxdb" | ||
"github.com/influxdb/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBasic(t *testing.T) { | ||
js := ` | ||
{ | ||
"_1": { | ||
"name": "foo", | ||
"tags": { | ||
"id": "ex1" | ||
}, | ||
"values": { | ||
"i": -1, | ||
"f": 0.5, | ||
"b": true, | ||
"s": "string" | ||
} | ||
}, | ||
"ignored": { | ||
"willBeRecorded": false | ||
}, | ||
"ignoredAndNested": { | ||
"hash": { | ||
"is": "nested" | ||
} | ||
}, | ||
"array": [ | ||
"makes parsing more difficult than necessary" | ||
], | ||
"string": "makes parsing more difficult than necessary", | ||
"_2": { | ||
"name": "bar", | ||
"tags": { | ||
"id": "ex2" | ||
}, | ||
"values": { | ||
"x": "x" | ||
} | ||
}, | ||
"pointWithoutFields_willNotBeIncluded": { | ||
"name": "asdf", | ||
"tags": { | ||
"id": "ex3" | ||
}, | ||
"values": {} | ||
} | ||
} | ||
` | ||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path == "/endpoint" { | ||
_, _ = w.Write([]byte(js)) | ||
} else { | ||
w.WriteHeader(http.StatusNotFound) | ||
} | ||
})) | ||
defer fakeServer.Close() | ||
|
||
plugin := &influxdb.InfluxDB{ | ||
Name: "test", | ||
URLs: []string{fakeServer.URL + "/endpoint"}, | ||
} | ||
|
||
var acc testutil.Accumulator | ||
require.NoError(t, plugin.Gather(&acc)) | ||
|
||
require.Len(t, acc.Points, 2) | ||
require.NoError(t, acc.ValidateTaggedFieldsValue( | ||
"test_foo", | ||
map[string]interface{}{ | ||
// JSON will truncate floats to integer representations. | ||
// Since there's no distinction in JSON, we can't assume it's an int. | ||
"i": -1.0, | ||
"f": 0.5, | ||
"b": true, | ||
"s": "string", | ||
}, | ||
map[string]string{ | ||
"id": "ex1", | ||
"url": fakeServer.URL + "/endpoint", | ||
}, | ||
)) | ||
require.NoError(t, acc.ValidateTaggedFieldsValue( | ||
"test_bar", | ||
map[string]interface{}{ | ||
"x": "x", | ||
}, | ||
map[string]string{ | ||
"id": "ex2", | ||
"url": fakeServer.URL + "/endpoint", | ||
}, | ||
)) | ||
} |