Skip to content

Commit

Permalink
Add Kibana input plugin (influxdata#4585)
Browse files Browse the repository at this point in the history
  • Loading branch information
lpic10 authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent b3ead9e commit 8cd7a6f
Show file tree
Hide file tree
Showing 5 changed files with 559 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kernel"
_ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat"
_ "github.com/influxdata/telegraf/plugins/inputs/kibana"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs"
Expand Down
63 changes: 63 additions & 0 deletions plugins/inputs/kibana/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Kibana input plugin

The [kibana](https://www.elastic.co/) plugin queries Kibana status API to
obtain the health status of Kibana and some useful metrics.

This plugin has been tested and works on Kibana 6.x versions.

### Configuration

```toml
[[inputs.kibana]]
## specify a list of one or more Kibana servers
servers = ["http://localhost:5601"]

## Timeout for HTTP requests
timeout = "5s"

## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```

### Status mappings

When reporting health (green/yellow/red), additional field `status_code`
is reported. Field contains mapping from status:string to status_code:int
with following rules:

- `green` - 1
- `yellow` - 2
- `red` - 3
- `unknown` - 0

### Measurements & Fields

- kibana
- status_code: integer (1, 2, 3, 0)
- heap_max_bytes: integer
- heap_used_bytes: integer
- uptime_ms: integer
- response_time_avg_ms: float
- response_time_max_ms: integer
- concurrent_connections: integer
- requests_per_sec: float

### Tags

- status (Kibana health: green, yellow, red)
- name (Kibana reported name)
- uuid (Kibana reported UUID)
- version (Kibana version)
- source (Kibana server hostname or IP)

### Example Output

kibana,host=myhost,name=my-kibana,source=localhost:5601,version=6.3.2 concurrent_connections=0i,heap_max_bytes=136478720i,heap_used_bytes=119231088i,response_time_avg_ms=0i,response_time_max_ms=0i,status="green",status_code=1i,uptime_ms=2187428019i 1534864502000000000
230 changes: 230 additions & 0 deletions plugins/inputs/kibana/kibana.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package kibana

import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

const statusPath = "/api/status"

type kibanaStatus struct {
Name string `json:"name"`
Version version `json:"version"`
Status status `json:"status"`
Metrics metrics `json:"metrics"`
}

type version struct {
Number string `json:"number"`
BuildHash string `json:"build_hash"`
BuildNumber int `json:"build_number"`
BuildSnapshot bool `json:"build_snapshot"`
}

type status struct {
Overall overallStatus `json:"overall"`
Statuses interface{} `json:"statuses"`
}

type overallStatus struct {
State string `json:"state"`
}

type metrics struct {
UptimeInMillis int64 `json:"uptime_in_millis"`
ConcurrentConnections int64 `json:"concurrent_connections"`
CollectionIntervalInMilles int64 `json:"collection_interval_in_millis"`
ResponseTimes responseTimes `json:"response_times"`
Process process `json:"process"`
Requests requests `json:"requests"`
}

type responseTimes struct {
AvgInMillis float64 `json:"avg_in_millis"`
MaxInMillis int64 `json:"max_in_millis"`
}

type process struct {
Mem mem `json:"mem"`
}

type requests struct {
Total int64 `json:"total"`
}

type mem struct {
HeapMaxInBytes int64 `json:"heap_max_in_bytes"`
HeapUsedInBytes int64 `json:"heap_used_in_bytes"`
}

const sampleConfig = `
## specify a list of one or more Kibana servers
servers = ["http://localhost:5601"]
## Timeout for HTTP requests
timeout = "5s"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`

type Kibana struct {
Local bool
Servers []string
Username string
Password string
Timeout internal.Duration
tls.ClientConfig

client *http.Client
}

func NewKibana() *Kibana {
return &Kibana{
Timeout: internal.Duration{Duration: time.Second * 5},
}
}

// perform status mapping
func mapHealthStatusToCode(s string) int {
switch strings.ToLower(s) {
case "green":
return 1
case "yellow":
return 2
case "red":
return 3
}
return 0
}

// SampleConfig returns sample configuration for this plugin.
func (k *Kibana) SampleConfig() string {
return sampleConfig
}

// Description returns the plugin description.
func (k *Kibana) Description() string {
return "Read status information from one or more Kibana servers"
}

func (k *Kibana) Gather(acc telegraf.Accumulator) error {
if k.client == nil {
client, err := k.createHttpClient()

if err != nil {
return err
}
k.client = client
}

var wg sync.WaitGroup
wg.Add(len(k.Servers))

for _, serv := range k.Servers {
go func(baseUrl string, acc telegraf.Accumulator) {
defer wg.Done()
if err := k.gatherKibanaStatus(baseUrl, acc); err != nil {
acc.AddError(fmt.Errorf("[url=%s]: %s", baseUrl, err))
return
}
}(serv, acc)
}

wg.Wait()
return nil
}

func (k *Kibana) createHttpClient() (*http.Client, error) {
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: k.Timeout.Duration,
}

return client, nil
}

func (k *Kibana) gatherKibanaStatus(baseUrl string, acc telegraf.Accumulator) error {

kibanaStatus := &kibanaStatus{}
url := baseUrl + statusPath

host, err := k.gatherJsonData(url, kibanaStatus)
if err != nil {
return err
}

fields := make(map[string]interface{})
tags := make(map[string]string)

tags["name"] = kibanaStatus.Name
tags["source"] = host
tags["version"] = kibanaStatus.Version.Number
tags["status"] = kibanaStatus.Status.Overall.State

fields["status_code"] = mapHealthStatusToCode(kibanaStatus.Status.Overall.State)

fields["uptime_ms"] = kibanaStatus.Metrics.UptimeInMillis
fields["concurrent_connections"] = kibanaStatus.Metrics.ConcurrentConnections
fields["heap_max_bytes"] = kibanaStatus.Metrics.Process.Mem.HeapMaxInBytes
fields["heap_used_bytes"] = kibanaStatus.Metrics.Process.Mem.HeapUsedInBytes
fields["response_time_avg_ms"] = kibanaStatus.Metrics.ResponseTimes.AvgInMillis
fields["response_time_max_ms"] = kibanaStatus.Metrics.ResponseTimes.MaxInMillis
fields["requests_per_sec"] = float64(kibanaStatus.Metrics.Requests.Total) / float64(kibanaStatus.Metrics.CollectionIntervalInMilles) * 1000

acc.AddFields("kibana", fields, tags)

return nil
}

func (k *Kibana) gatherJsonData(url string, v interface{}) (host string, err error) {

request, err := http.NewRequest("GET", url, nil)

if (k.Username != "") || (k.Password != "") {
request.SetBasicAuth(k.Username, k.Password)
}

response, err := k.client.Do(request)
if err != nil {
return "", err
}

defer response.Body.Close()

if err = json.NewDecoder(response.Body).Decode(v); err != nil {
return request.Host, err
}

return request.Host, nil
}

func init() {
inputs.Add("kibana", func() telegraf.Input {
return NewKibana()
})
}
66 changes: 66 additions & 0 deletions plugins/inputs/kibana/kibana_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kibana

import (
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/influxdata/telegraf/testutil"
)

func defaultTags() map[string]string {
return map[string]string{
"name": "my-kibana",
"source": "example.com:5601",
"version": "6.3.2",
"status": "green",
}
}

type transportMock struct {
statusCode int
body string
}

func newTransportMock(statusCode int, body string) http.RoundTripper {
return &transportMock{
statusCode: statusCode,
body: body,
}
}

func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
res := &http.Response{
Header: make(http.Header),
Request: r,
StatusCode: t.statusCode,
}
res.Header.Set("Content-Type", "application/json")
res.Body = ioutil.NopCloser(strings.NewReader(t.body))
return res, nil
}

func checkKibanaStatusResult(t *testing.T, acc *testutil.Accumulator) {
tags := defaultTags()
acc.AssertContainsTaggedFields(t, "kibana", kibanaStatusExpected, tags)
}

func TestGather(t *testing.T) {
ks := newKibanahWithClient()
ks.Servers = []string{"http://example.com:5601"}
ks.client.Transport = newTransportMock(http.StatusOK, kibanaStatusResponse)

var acc testutil.Accumulator
if err := acc.GatherError(ks.Gather); err != nil {
t.Fatal(err)
}

checkKibanaStatusResult(t, &acc)
}

func newKibanahWithClient() *Kibana {
ks := NewKibana()
ks.client = &http.Client{}
return ks
}
Loading

0 comments on commit 8cd7a6f

Please sign in to comment.