Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kibana input plugin #4585

Merged
merged 4 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kernel"
_ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat"
_ "github.com/influxdata/telegraf/plugins/inputs/kibana"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs"
Expand Down
63 changes: 63 additions & 0 deletions plugins/inputs/kibana/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Kibana input plugin

The [kibana](https://www.elastic.co/) plugin queries Kibana status API to
obtain the health status of Kibana and some useful metrics.

This plugin has been tested and works on Kibana 6.x versions.

### Configuration

```toml
[[inputs.kibana]]
## specify a list of one or more Kibana servers
servers = ["http://localhost:5601"]

## Timeout for HTTP requests
timeout = "5s"

## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```

### Status mappings

When reporting health (green/yellow/red), additional field `status_code`
is reported. Field contains mapping from status:string to status_code:int
with following rules:

- `green` - 1
- `yellow` - 2
- `red` - 3
- `unknown` - 0

### Measurements & Fields

- kibana
- status_code: integer (1, 2, 3, 0)
- heap_max_bytes: integer
- heap_used_bytes: integer
- uptime_ms: integer
- response_time_avg_ms: float
- response_time_max_ms: integer
- concurrent_connections: integer
- requests_per_sec: float

### Tags

- status (Kibana health: green, yellow, red)
- name (Kibana reported name)
- uuid (Kibana reported UUID)
- version (Kibana version)
- source (Kibana server hostname or IP)

### Example Output

kibana,host=myhost,name=my-kibana,source=localhost:5601,version=6.3.2 concurrent_connections=0i,heap_max_bytes=136478720i,heap_used_bytes=119231088i,response_time_avg_ms=0i,response_time_max_ms=0i,status="green",status_code=1i,uptime_ms=2187428019i 1534864502000000000
230 changes: 230 additions & 0 deletions plugins/inputs/kibana/kibana.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package kibana

import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

const statusPath = "/api/status"

type kibanaStatus struct {
Name string `json:"name"`
Version version `json:"version"`
Status status `json:"status"`
Metrics metrics `json:"metrics"`
}

type version struct {
Number string `json:"number"`
BuildHash string `json:"build_hash"`
BuildNumber int `json:"build_number"`
BuildSnapshot bool `json:"build_snapshot"`
}

type status struct {
Overall overallStatus `json:"overall"`
Statuses interface{} `json:"statuses"`
}

type overallStatus struct {
State string `json:"state"`
}

type metrics struct {
UptimeInMillis int64 `json:"uptime_in_millis"`
ConcurrentConnections int64 `json:"concurrent_connections"`
CollectionIntervalInMilles int64 `json:"collection_interval_in_millis"`
ResponseTimes responseTimes `json:"response_times"`
Process process `json:"process"`
Requests requests `json:"requests"`
}

type responseTimes struct {
AvgInMillis float64 `json:"avg_in_millis"`
MaxInMillis int64 `json:"max_in_millis"`
}

type process struct {
Mem mem `json:"mem"`
}

type requests struct {
Total int64 `json:"total"`
}

type mem struct {
HeapMaxInBytes int64 `json:"heap_max_in_bytes"`
HeapUsedInBytes int64 `json:"heap_used_in_bytes"`
}

const sampleConfig = `
## specify a list of one or more Kibana servers
servers = ["http://localhost:5601"]

## Timeout for HTTP requests
timeout = "5s"

## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`

type Kibana struct {
Local bool
Servers []string
Username string
Password string
Timeout internal.Duration
tls.ClientConfig

client *http.Client
}

func NewKibana() *Kibana {
return &Kibana{
Timeout: internal.Duration{Duration: time.Second * 5},
}
}

// perform status mapping
func mapHealthStatusToCode(s string) int {
switch strings.ToLower(s) {
case "green":
return 1
case "yellow":
return 2
case "red":
return 3
}
return 0
}

// SampleConfig returns sample configuration for this plugin.
func (k *Kibana) SampleConfig() string {
return sampleConfig
}

// Description returns the plugin description.
func (k *Kibana) Description() string {
return "Read status information from one or more Kibana servers"
}

func (k *Kibana) Gather(acc telegraf.Accumulator) error {
if k.client == nil {
client, err := k.createHttpClient()

if err != nil {
return err
}
k.client = client
}

var wg sync.WaitGroup
wg.Add(len(k.Servers))

for _, serv := range k.Servers {
go func(baseUrl string, acc telegraf.Accumulator) {
defer wg.Done()
if err := k.gatherKibanaStatus(baseUrl, acc); err != nil {
acc.AddError(fmt.Errorf("[url=%s]: %s", baseUrl, err))
return
}
}(serv, acc)
}

wg.Wait()
return nil
}

func (k *Kibana) createHttpClient() (*http.Client, error) {
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: k.Timeout.Duration,
}

return client, nil
}

func (k *Kibana) gatherKibanaStatus(baseUrl string, acc telegraf.Accumulator) error {

kibanaStatus := &kibanaStatus{}
url := baseUrl + statusPath

host, err := k.gatherJsonData(url, kibanaStatus)
if err != nil {
return err
}

fields := make(map[string]interface{})
tags := make(map[string]string)

tags["name"] = kibanaStatus.Name
tags["source"] = host
tags["version"] = kibanaStatus.Version.Number
tags["status"] = kibanaStatus.Status.Overall.State

fields["status_code"] = mapHealthStatusToCode(kibanaStatus.Status.Overall.State)

fields["uptime_ms"] = kibanaStatus.Metrics.UptimeInMillis
fields["concurrent_connections"] = kibanaStatus.Metrics.ConcurrentConnections
fields["heap_max_bytes"] = kibanaStatus.Metrics.Process.Mem.HeapMaxInBytes
fields["heap_used_bytes"] = kibanaStatus.Metrics.Process.Mem.HeapUsedInBytes
fields["response_time_avg_ms"] = kibanaStatus.Metrics.ResponseTimes.AvgInMillis
fields["response_time_max_ms"] = kibanaStatus.Metrics.ResponseTimes.MaxInMillis
fields["requests_per_sec"] = float64(kibanaStatus.Metrics.Requests.Total) / float64(kibanaStatus.Metrics.CollectionIntervalInMilles) * 1000

acc.AddFields("kibana", fields, tags)

return nil
}

func (k *Kibana) gatherJsonData(url string, v interface{}) (host string, err error) {

request, err := http.NewRequest("GET", url, nil)

if (k.Username != "") || (k.Password != "") {
request.SetBasicAuth(k.Username, k.Password)
}

response, err := k.client.Do(request)
if err != nil {
return "", err
}

defer response.Body.Close()

if err = json.NewDecoder(response.Body).Decode(v); err != nil {
return request.Host, err
}

return request.Host, nil
}

func init() {
inputs.Add("kibana", func() telegraf.Input {
return NewKibana()
})
}
66 changes: 66 additions & 0 deletions plugins/inputs/kibana/kibana_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kibana

import (
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/influxdata/telegraf/testutil"
)

func defaultTags() map[string]string {
return map[string]string{
"name": "my-kibana",
"source": "example.com:5601",
"version": "6.3.2",
"status": "green",
}
}

type transportMock struct {
statusCode int
body string
}

func newTransportMock(statusCode int, body string) http.RoundTripper {
return &transportMock{
statusCode: statusCode,
body: body,
}
}

func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
res := &http.Response{
Header: make(http.Header),
Request: r,
StatusCode: t.statusCode,
}
res.Header.Set("Content-Type", "application/json")
res.Body = ioutil.NopCloser(strings.NewReader(t.body))
return res, nil
}

func checkKibanaStatusResult(t *testing.T, acc *testutil.Accumulator) {
tags := defaultTags()
acc.AssertContainsTaggedFields(t, "kibana", kibanaStatusExpected, tags)
}

func TestGather(t *testing.T) {
ks := newKibanahWithClient()
ks.Servers = []string{"http://example.com:5601"}
ks.client.Transport = newTransportMock(http.StatusOK, kibanaStatusResponse)

var acc testutil.Accumulator
if err := acc.GatherError(ks.Gather); err != nil {
t.Fatal(err)
}

checkKibanaStatusResult(t, &acc)
}

func newKibanahWithClient() *Kibana {
ks := NewKibana()
ks.client = &http.Client{}
return ks
}
Loading