Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve prometheus plugin #707

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Prometheus Input Plugin

The prometheus input plugin gathers metrics from any webpage
exposing metrics with Prometheus format

### Configuration:

Example for Kubernetes apiserver
```toml
# Get all metrics from Kube-apiserver
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
```

You can use more complex configuration
to filter and some tags

```toml
# Get all metrics from Kube-apiserver
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
# Get only metrics with "apiserver_" string is in metric name
namepass = ["apiserver_"]
# Add a metric name prefix
name_prefix = "k8s_"
# Add tags to be able to make beautiful dashboards
[inputs.prometheus.tags]
kubeservice = "kube-apiserver"
```

### Measurements & Fields & Tags:

Measurements and fields could be any thing.
It just depends of what you're quering.

Example:

```
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
```

- go_goroutines,
- gauge (integer, unit)
- go_gc_duration_seconds
- field3 (integer, bytes)

- All measurements have the following tags:
- url=http://my-kube-apiserver:8080/metrics
- go_goroutines has the following tags:
- kubeservice=kube-apiserver
- go_gc_duration_seconds has the following tags:
- kubeservice=kube-apiserver

### Example Output:

Example of output with configuration given above:

```
$ ./telegraf -config telegraf.conf -test
k8s_go_goroutines,kubeservice=kube-apiserver,url=http://my-kube-apiserver:8080/metrics gauge=536 1456857329391929813
k8s_go_gc_duration_seconds,kubeservice=kube-apiserver,url=http://my-kube-apiserver:8080/metrics 0=0.038002142,0.25=0.041732467,0.5=0.04336492,0.75=0.047271799,1=0.058295811,count=0,sum=208.334617406 1456857329391929813
```
171 changes: 171 additions & 0 deletions plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package prometheus

// Parser inspired from
// https://github.com/prometheus/prom2json/blob/master/main.go

import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"mime"

"github.com/influxdata/telegraf"

"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}

// Parse returns a slice of Metrics from a text representation of a
// metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)

// Get format
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
metricFamily := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
}
metricFamilies[metricFamily.GetName()] = metricFamily
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())

} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
metric, err := telegraf.NewMetric(metricName, tags, fields)
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
}
return metrics, err
}

// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}

return metrics[0], nil
}

/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
*/

// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, q := range m.GetSummary().Quantile {
if !math.IsNaN(q.GetValue()) {
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
}
}
return fields
}

// Get Buckets from histogram metric
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
}
return fields
}

// Get labels from metric
func makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
return result
}

// Get name and value from metric
func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
return fields
}
Loading