-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7f539c9
commit f05a14b
Showing
5 changed files
with
379 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
package prometheus | ||
|
||
// Parser inspired from | ||
// https://github.com/prometheus/prom2json/blob/master/main.go | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"math" | ||
"mime" | ||
|
||
"github.com/influxdata/telegraf" | ||
|
||
"github.com/matttproud/golang_protobuf_extensions/pbutil" | ||
dto "github.com/prometheus/client_model/go" | ||
"github.com/prometheus/common/expfmt" | ||
) | ||
|
||
// PrometheusParser is an object for Parsing incoming metrics. | ||
type PrometheusParser struct { | ||
// PromFormat | ||
PromFormat map[string]string | ||
// DefaultTags will be added to every parsed metric | ||
// DefaultTags map[string]string | ||
} | ||
|
||
// Parse returns a slice of Metrics from a text representation of a | ||
// metrics | ||
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { | ||
var metrics []telegraf.Metric | ||
var parser expfmt.TextParser | ||
// parse even if the buffer begins with a newline | ||
buf = bytes.TrimPrefix(buf, []byte("\n")) | ||
// Read raw data | ||
buffer := bytes.NewBuffer(buf) | ||
reader := bufio.NewReader(buffer) | ||
|
||
// Get format | ||
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"]) | ||
// Prepare output | ||
metricFamilies := make(map[string]*dto.MetricFamily) | ||
if err == nil && mediatype == "application/vnd.google.protobuf" && | ||
params["encoding"] == "delimited" && | ||
params["proto"] == "io.prometheus.client.MetricFamily" { | ||
for { | ||
metricFamily := &dto.MetricFamily{} | ||
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil { | ||
if err == io.EOF { | ||
break | ||
} | ||
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) | ||
} | ||
metricFamilies[metricFamily.GetName()] = metricFamily | ||
} | ||
} else { | ||
metricFamilies, err = parser.TextToMetricFamilies(reader) | ||
if err != nil { | ||
return nil, fmt.Errorf("reading text format failed: %s", err) | ||
} | ||
// read metrics | ||
for metricName, mf := range metricFamilies { | ||
for _, m := range mf.Metric { | ||
// reading tags | ||
tags := makeLabels(m) | ||
/* | ||
for key, value := range p.DefaultTags { | ||
tags[key] = value | ||
} | ||
*/ | ||
// reading fields | ||
fields := make(map[string]interface{}) | ||
if mf.GetType() == dto.MetricType_SUMMARY { | ||
// summary metric | ||
fields = makeQuantiles(m) | ||
fields["count"] = float64(m.GetHistogram().GetSampleCount()) | ||
fields["sum"] = float64(m.GetSummary().GetSampleSum()) | ||
} else if mf.GetType() == dto.MetricType_HISTOGRAM { | ||
// historgram metric | ||
fields = makeBuckets(m) | ||
fields["count"] = float64(m.GetHistogram().GetSampleCount()) | ||
fields["sum"] = float64(m.GetSummary().GetSampleSum()) | ||
|
||
} else { | ||
// standard metric | ||
fields = getNameAndValue(m) | ||
} | ||
// converting to telegraf metric | ||
if len(fields) > 0 { | ||
metric, err := telegraf.NewMetric(metricName, tags, fields) | ||
if err == nil { | ||
metrics = append(metrics, metric) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return metrics, err | ||
} | ||
|
||
// Parse one line | ||
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) { | ||
metrics, err := p.Parse([]byte(line + "\n")) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(metrics) < 1 { | ||
return nil, fmt.Errorf( | ||
"Can not parse the line: %s, for data format: prometheus", line) | ||
} | ||
|
||
return metrics[0], nil | ||
} | ||
|
||
/* | ||
// Set default tags | ||
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) { | ||
p.DefaultTags = tags | ||
} | ||
*/ | ||
|
||
// Get Quantiles from summary metric | ||
func makeQuantiles(m *dto.Metric) map[string]interface{} { | ||
fields := make(map[string]interface{}) | ||
for _, q := range m.GetSummary().Quantile { | ||
if !math.IsNaN(q.GetValue()) { | ||
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) | ||
} | ||
} | ||
return fields | ||
} | ||
|
||
// Get Buckets from histogram metric | ||
func makeBuckets(m *dto.Metric) map[string]interface{} { | ||
fields := make(map[string]interface{}) | ||
for _, b := range m.GetHistogram().Bucket { | ||
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) | ||
} | ||
return fields | ||
} | ||
|
||
// Get labels from metric | ||
func makeLabels(m *dto.Metric) map[string]string { | ||
result := map[string]string{} | ||
for _, lp := range m.Label { | ||
result[lp.GetName()] = lp.GetValue() | ||
} | ||
return result | ||
} | ||
|
||
// Get name and value from metric | ||
func getNameAndValue(m *dto.Metric) map[string]interface{} { | ||
fields := make(map[string]interface{}) | ||
if m.Gauge != nil { | ||
if !math.IsNaN(m.GetGauge().GetValue()) { | ||
fields["gauge"] = float64(m.GetGauge().GetValue()) | ||
} | ||
} else if m.Counter != nil { | ||
if !math.IsNaN(m.GetGauge().GetValue()) { | ||
fields["counter"] = float64(m.GetCounter().GetValue()) | ||
} | ||
} else if m.Untyped != nil { | ||
if !math.IsNaN(m.GetGauge().GetValue()) { | ||
fields["value"] = float64(m.GetUntyped().GetValue()) | ||
} | ||
} | ||
return fields | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
package prometheus | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) | ||
|
||
const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. | ||
# TYPE cadvisor_version_info gauge | ||
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 | ||
` | ||
|
||
const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source | ||
# TYPE get_token_fail_count counter | ||
get_token_fail_count 0 | ||
` | ||
|
||
const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source | ||
` | ||
|
||
const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. | ||
# TYPE http_request_duration_microseconds summary | ||
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 | ||
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 | ||
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 | ||
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 | ||
http_request_duration_microseconds_count{handler="prometheus"} 9 | ||
` | ||
|
||
const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. | ||
# TYPE apiserver_request_latencies histogram | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 | ||
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 | ||
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 | ||
` | ||
|
||
const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. | ||
# TYPE cadvisor_version_info gauge | ||
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 | ||
# HELP go_gc_duration_seconds A summary of the GC invocation durations. | ||
# TYPE go_gc_duration_seconds summary | ||
go_gc_duration_seconds{quantile="0"} 0.013534896000000001 | ||
go_gc_duration_seconds{quantile="0.25"} 0.02469263 | ||
go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005 | ||
go_gc_duration_seconds{quantile="0.75"} 0.03840335 | ||
go_gc_duration_seconds{quantile="1"} 0.049956604 | ||
go_gc_duration_seconds_sum 1970.341293002 | ||
go_gc_duration_seconds_count 65952 | ||
# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. | ||
# TYPE http_request_duration_microseconds summary | ||
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 | ||
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 | ||
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 | ||
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 | ||
http_request_duration_microseconds_count{handler="prometheus"} 9 | ||
# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source | ||
# TYPE get_token_fail_count counter | ||
get_token_fail_count 0 | ||
# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. | ||
# TYPE apiserver_request_latencies histogram | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 | ||
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 | ||
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 | ||
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 | ||
` | ||
|
||
const prometheusMulti = ` | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
` | ||
|
||
const prometheusMultiSomeInvalid = ` | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
cpu,cpu=cpu4 , usage_idle=99,usage_busy=1 | ||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 | ||
` | ||
|
||
func TestParseValidPrometheus(t *testing.T) { | ||
parser := PrometheusParser{} | ||
|
||
// Gauge value | ||
metrics, err := parser.Parse([]byte(validUniqueGauge)) | ||
assert.NoError(t, err) | ||
assert.Len(t, metrics, 1) | ||
assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) | ||
assert.Equal(t, map[string]interface{}{ | ||
"gauge": float64(1), | ||
}, metrics[0].Fields()) | ||
assert.Equal(t, map[string]string{ | ||
"osVersion": "CentOS Linux 7 (Core)", | ||
"dockerVersion": "1.8.2", | ||
"kernelVersion": "3.10.0-229.20.1.el7.x86_64", | ||
}, metrics[0].Tags()) | ||
|
||
// Counter value | ||
//parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) | ||
metrics, err = parser.Parse([]byte(validUniqueCounter)) | ||
assert.NoError(t, err) | ||
assert.Len(t, metrics, 1) | ||
assert.Equal(t, "get_token_fail_count", metrics[0].Name()) | ||
assert.Equal(t, map[string]interface{}{ | ||
"counter": float64(0), | ||
}, metrics[0].Fields()) | ||
assert.Equal(t, map[string]string{}, metrics[0].Tags()) | ||
|
||
// Summary data | ||
//parser.SetDefaultTags(map[string]string{}) | ||
metrics, err = parser.Parse([]byte(validUniqueSummary)) | ||
assert.NoError(t, err) | ||
assert.Len(t, metrics, 1) | ||
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) | ||
assert.Equal(t, map[string]interface{}{ | ||
"0.5": 552048.506, | ||
"0.9": 5.876804288e+06, | ||
"0.99": 5.876804288e+06, | ||
"count": 0.0, | ||
"sum": 1.8909097205e+07, | ||
}, metrics[0].Fields()) | ||
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) | ||
|
||
// histogram data | ||
metrics, err = parser.Parse([]byte(validUniqueHistogram)) | ||
assert.NoError(t, err) | ||
assert.Len(t, metrics, 1) | ||
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) | ||
assert.Equal(t, map[string]interface{}{ | ||
"500000": 2000.0, | ||
"count": 2025.0, | ||
"sum": 0.0, | ||
"250000": 1997.0, | ||
"2e+06": 2012.0, | ||
"4e+06": 2017.0, | ||
"8e+06": 2024.0, | ||
"+Inf": 2025.0, | ||
"125000": 1994.0, | ||
"1e+06": 2005.0, | ||
}, metrics[0].Fields()) | ||
assert.Equal(t, | ||
map[string]string{"verb": "POST", "resource": "bindings"}, | ||
metrics[0].Tags()) | ||
|
||
} | ||
|
||
func TestParseLineInvalidPrometheus(t *testing.T) { | ||
parser := PrometheusParser{} | ||
metric, err := parser.ParseLine(validUniqueLine) | ||
assert.NotNil(t, err) | ||
assert.Nil(t, metric) | ||
|
||
} |
Oops, something went wrong.