Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prometheusexporter] Expose job and instance labels #9115

Merged
merged 3 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `jaegerremotesamplingextension`: Add local and remote sampling stores (#8818)
- `attributesprocessor`: Add support to filter on log body (#8996)
- `prometheusremotewriteexporter`: Translate resource attributes to the target info metric (#8493)
- `prometheusexporter`: Add `job` and `instance` labels to metrics so they can be scraped with `honor_labels: true` (#9115)
- `podmanreceiver`: Add API timeout configuration option (#9014)
- `cmd/mdatagen`: Add `sem_conv_version` field to metadata.yaml that is used to set metrics SchemaURL (#9010)

Expand Down
56 changes: 32 additions & 24 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
type accumulatedValue struct {
// value contains a metric with exactly one aggregated datapoint.
value pdata.Metric

// resourceAttrs contain the resource attributes. They are used to output instance and job labels.
resourceAttrs pdata.Map

// updated indicates when metric was last changed.
updated time.Time

Expand All @@ -37,8 +41,9 @@ type accumulatedValue struct {
type accumulator interface {
// Accumulate stores aggragated metric values
Accumulate(resourceMetrics pdata.ResourceMetrics) (processed int)
// Collect returns a slice with relevant aggregated metrics
Collect() (metrics []pdata.Metric)
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
// The number or metrics and attributes returned will be the same.
Collect() (metrics []pdata.Metric, resourceAttrs []pdata.Map)
}

// LastValueAccumulator keeps last value for accumulated metrics
Expand All @@ -64,31 +69,32 @@ func newAccumulator(logger *zap.Logger, metricExpiration time.Duration) accumula
func (a *lastValueAccumulator) Accumulate(rm pdata.ResourceMetrics) (n int) {
now := time.Now()
ilms := rm.ScopeMetrics()
resourceAttrs := rm.Resource().Attributes()

for i := 0; i < ilms.Len(); i++ {
ilm := ilms.At(i)

metrics := ilm.Metrics()
for j := 0; j < metrics.Len(); j++ {
n += a.addMetric(metrics.At(j), ilm.Scope(), now)
n += a.addMetric(metrics.At(j), ilm.Scope(), resourceAttrs, now)
}
}

return
}

func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.InstrumentationScope, now time.Time) int {
func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.InstrumentationScope, resourceAttrs pdata.Map, now time.Time) int {
a.logger.Debug(fmt.Sprintf("accumulating metric: %s", metric.Name()))

switch metric.DataType() {
case pdata.MetricDataTypeGauge:
return a.accumulateGauge(metric, il, now)
return a.accumulateGauge(metric, il, resourceAttrs, now)
case pdata.MetricDataTypeSum:
return a.accumulateSum(metric, il, now)
return a.accumulateSum(metric, il, resourceAttrs, now)
case pdata.MetricDataTypeHistogram:
return a.accumulateDoubleHistogram(metric, il, now)
return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
case pdata.MetricDataTypeSummary:
return a.accumulateSummary(metric, il, now)
return a.accumulateSummary(metric, il, resourceAttrs, now)
default:
a.logger.With(
zap.String("data_type", string(metric.DataType())),
Expand All @@ -99,7 +105,7 @@ func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.Instrumen
return 0
}

func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.InstrumentationScope, now time.Time) (n int) {
func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.InstrumentationScope, resourceAttrs pdata.Map, now time.Time) (n int) {
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)
Expand All @@ -121,14 +127,14 @@ func (a *lastValueAccumulator) accumulateSummary(metric pdata.Metric, il pdata.I

mm := createMetric(metric)
ip.CopyTo(mm.Summary().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: mm, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: mm, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
}

return n
}

func (a *lastValueAccumulator) accumulateGauge(metric pdata.Metric, il pdata.InstrumentationScope, now time.Time) (n int) {
func (a *lastValueAccumulator) accumulateGauge(metric pdata.Metric, il pdata.InstrumentationScope, resourceAttrs pdata.Map, now time.Time) (n int) {
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)
Expand All @@ -143,7 +149,7 @@ func (a *lastValueAccumulator) accumulateGauge(metric pdata.Metric, il pdata.Ins
if !ok {
m := createMetric(metric)
ip.CopyTo(m.Gauge().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
continue
}
Expand All @@ -156,13 +162,13 @@ func (a *lastValueAccumulator) accumulateGauge(metric pdata.Metric, il pdata.Ins

m := createMetric(metric)
ip.CopyTo(m.Gauge().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
}
return
}

func (a *lastValueAccumulator) accumulateSum(metric pdata.Metric, il pdata.InstrumentationScope, now time.Time) (n int) {
func (a *lastValueAccumulator) accumulateSum(metric pdata.Metric, il pdata.InstrumentationScope, resourceAttrs pdata.Map, now time.Time) (n int) {
doubleSum := metric.Sum()

// Drop metrics with non-cumulative aggregations
Expand All @@ -186,7 +192,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pdata.Metric, il pdata.Instr
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
continue
}
Expand All @@ -201,13 +207,13 @@ func (a *lastValueAccumulator) accumulateSum(metric pdata.Metric, il pdata.Instr
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
}
return
}

func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il pdata.InstrumentationScope, now time.Time) (n int) {
func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il pdata.InstrumentationScope, resourceAttrs pdata.Map, now time.Time) (n int) {
doubleHistogram := metric.Histogram()

// Drop metrics with non-cumulative aggregations
Expand All @@ -229,7 +235,7 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il
if !ok {
m := createMetric(metric)
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
continue
}
Expand All @@ -243,17 +249,18 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il
m := createMetric(metric)
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
}
return
}

// Collect returns a slice with relevant aggregated metrics
func (a *lastValueAccumulator) Collect() []pdata.Metric {
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
func (a *lastValueAccumulator) Collect() ([]pdata.Metric, []pdata.Map) {
a.logger.Debug("Accumulator collect called")

var res []pdata.Metric
var metrics []pdata.Metric
var resourceAttrs []pdata.Map
expirationTime := time.Now().Add(-a.metricExpiration)

a.registeredMetrics.Range(func(key, value interface{}) bool {
Expand All @@ -264,11 +271,12 @@ func (a *lastValueAccumulator) Collect() []pdata.Metric {
return true
}

res = append(res, v.value)
metrics = append(metrics, v.value)
resourceAttrs = append(resourceAttrs, v.resourceAttrs)
return true
})

return res
return metrics, resourceAttrs
}

func timeseriesSignature(ilmName string, metric pdata.Metric, attributes pdata.Map) string {
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestInvalidDataType(t *testing.T) {
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
metric := pdata.NewMetric()
metric.SetDataType(-100)
n := a.addMetric(metric, pdata.NewInstrumentationScope(), time.Now())
n := a.addMetric(metric, pdata.NewInstrumentationScope(), pdata.NewMap(), time.Now())
require.Zero(t, n)
}

Expand Down
58 changes: 39 additions & 19 deletions exporter/prometheusexporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"sort"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/model/pdata"
conventions "go.opentelemetry.io/collector/model/semconv/v1.6.1"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -57,16 +59,16 @@ func (c *collector) processMetrics(rm pdata.ResourceMetrics) (n int) {

var errUnknownMetricType = fmt.Errorf("unknown metric type")

func (c *collector) convertMetric(metric pdata.Metric) (prometheus.Metric, error) {
func (c *collector) convertMetric(metric pdata.Metric, resourceAttrs pdata.Map) (prometheus.Metric, error) {
switch metric.DataType() {
case pdata.MetricDataTypeGauge:
return c.convertGauge(metric)
return c.convertGauge(metric, resourceAttrs)
case pdata.MetricDataTypeSum:
return c.convertSum(metric)
return c.convertSum(metric, resourceAttrs)
case pdata.MetricDataTypeHistogram:
return c.convertDoubleHistogram(metric)
return c.convertDoubleHistogram(metric, resourceAttrs)
case pdata.MetricDataTypeSummary:
return c.convertSummary(metric)
return c.convertSummary(metric, resourceAttrs)
}

return nil, errUnknownMetricType
Expand All @@ -79,16 +81,31 @@ func (c *collector) metricName(namespace string, metric pdata.Metric) string {
return sanitize(metric.Name(), c.skipSanitizeLabel)
}

func (c *collector) getMetricMetadata(metric pdata.Metric, attributes pdata.Map) (*prometheus.Desc, []string) {
keys := make([]string, 0, attributes.Len())
values := make([]string, 0, attributes.Len())
func (c *collector) getMetricMetadata(metric pdata.Metric, attributes pdata.Map, resourceAttrs pdata.Map) (*prometheus.Desc, []string) {
keys := make([]string, 0, attributes.Len()+2) // +2 for job and instance labels.
values := make([]string, 0, attributes.Len()+2)

attributes.Range(func(k string, v pdata.Value) bool {
keys = append(keys, sanitize(k, c.skipSanitizeLabel))
values = append(values, v.AsString())
return true
})

// Map service.name + service.namespace to job
if serviceName, ok := resourceAttrs.Get(conventions.AttributeServiceName); ok {
val := serviceName.AsString()
if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok {
val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val)
}
keys = append(keys, model.JobLabel)
values = append(values, val)
}
// Map service.instance.id to instance
if instance, ok := resourceAttrs.Get(conventions.AttributeServiceInstanceID); ok {
keys = append(keys, model.InstanceLabel)
values = append(values, instance.AsString())
}

return prometheus.NewDesc(
c.metricName(c.namespace, metric),
metric.Description(),
Expand All @@ -97,10 +114,10 @@ func (c *collector) getMetricMetadata(metric pdata.Metric, attributes pdata.Map)
), values
}

func (c *collector) convertGauge(metric pdata.Metric) (prometheus.Metric, error) {
func (c *collector) convertGauge(metric pdata.Metric, resourceAttrs pdata.Map) (prometheus.Metric, error) {
ip := metric.Gauge().DataPoints().At(0)

desc, attributes := c.getMetricMetadata(metric, ip.Attributes())
desc, attributes := c.getMetricMetadata(metric, ip.Attributes(), resourceAttrs)
var value float64
switch ip.ValueType() {
case pdata.MetricValueTypeInt:
Expand All @@ -119,15 +136,15 @@ func (c *collector) convertGauge(metric pdata.Metric) (prometheus.Metric, error)
return m, nil
}

func (c *collector) convertSum(metric pdata.Metric) (prometheus.Metric, error) {
func (c *collector) convertSum(metric pdata.Metric, resourceAttrs pdata.Map) (prometheus.Metric, error) {
ip := metric.Sum().DataPoints().At(0)

metricType := prometheus.GaugeValue
if metric.Sum().IsMonotonic() {
metricType = prometheus.CounterValue
}

desc, attributes := c.getMetricMetadata(metric, ip.Attributes())
desc, attributes := c.getMetricMetadata(metric, ip.Attributes(), resourceAttrs)
var value float64
switch ip.ValueType() {
case pdata.MetricValueTypeInt:
Expand All @@ -146,7 +163,7 @@ func (c *collector) convertSum(metric pdata.Metric) (prometheus.Metric, error) {
return m, nil
}

func (c *collector) convertSummary(metric pdata.Metric) (prometheus.Metric, error) {
func (c *collector) convertSummary(metric pdata.Metric, resourceAttrs pdata.Map) (prometheus.Metric, error) {
// TODO: In the off chance that we have multiple points
// within the same metric, how should we handle them?
point := metric.Summary().DataPoints().At(0)
Expand All @@ -159,7 +176,7 @@ func (c *collector) convertSummary(metric pdata.Metric) (prometheus.Metric, erro
quantiles[qvj.Quantile()] = qvj.Value()
}

desc, attributes := c.getMetricMetadata(metric, point.Attributes())
desc, attributes := c.getMetricMetadata(metric, point.Attributes(), resourceAttrs)
m, err := prometheus.NewConstSummary(desc, point.Count(), point.Sum(), quantiles, attributes...)
if err != nil {
return nil, err
Expand All @@ -170,9 +187,9 @@ func (c *collector) convertSummary(metric pdata.Metric) (prometheus.Metric, erro
return m, nil
}

func (c *collector) convertDoubleHistogram(metric pdata.Metric) (prometheus.Metric, error) {
func (c *collector) convertDoubleHistogram(metric pdata.Metric, resourceAttrs pdata.Map) (prometheus.Metric, error) {
ip := metric.Histogram().DataPoints().At(0)
desc, attributes := c.getMetricMetadata(metric, ip.Attributes())
desc, attributes := c.getMetricMetadata(metric, ip.Attributes(), resourceAttrs)

indicesMap := make(map[float64]int)
buckets := make([]float64, 0, len(ip.BucketCounts()))
Expand Down Expand Up @@ -214,10 +231,13 @@ func (c *collector) convertDoubleHistogram(metric pdata.Metric) (prometheus.Metr
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.logger.Debug("collect called")

inMetrics := c.accumulator.Collect()
inMetrics, resourceAttrs := c.accumulator.Collect()

for i := range inMetrics {
pMetric := inMetrics[i]
rAttr := resourceAttrs[i]

for _, pMetric := range inMetrics {
m, err := c.convertMetric(pMetric)
m, err := c.convertMetric(pMetric, rAttr)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to convert metric %s: %s", pMetric.Name(), err.Error()))
continue
Expand Down
Loading