Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native histogram performance #4244

Merged
merged 6 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func BenchmarkCollect(b *testing.B) {
spanMetricsDimensions: []string{"k8s.cluster.name", "k8s.namespace.name"},
spanMetricsEnableTargetInfo: true,
spanMetricsTargetInfoExcludedDimensions: []string{"excluded}"},
// nativeHistograms: overrides.HistogramMethodBoth,
nativeHistograms: overrides.HistogramMethodBoth,
}
)

Expand Down
3 changes: 2 additions & 1 deletion modules/generator/registry/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
// add metric name
baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: c.metricName})

// TODO: avoid allocation on each collection
lb := labels.NewBuilder(baseLabels)

for _, s := range c.series {
Expand Down Expand Up @@ -166,7 +167,7 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
return
}

// TODO support exemplars
// TODO: support exemplars
}

return
Expand Down
3 changes: 2 additions & 1 deletion modules/generator/registry/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, external
baseLabels = append(baseLabels, labels.Label{Name: name, Value: value})
}

// TODO: avoid allocation on each collection
lb := labels.NewBuilder(baseLabels)

for _, s := range g.series {
Expand All @@ -166,7 +167,7 @@ func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, external
if err != nil {
return
}
// TODO support exemplars
// TODO: support exemplars
}

return
Expand Down
122 changes: 72 additions & 50 deletions modules/generator/registry/native_histogram.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package registry

import (
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -39,11 +40,19 @@ type nativeHistogram struct {
// generate. A diff in the configured value on the processors will cause a
// reload of the process, and a new instance of the histogram to be created.
histogramOverride HistogramMode

externalLabels map[string]string

// classic
nameCount string
nameSum string
nameBucket string
}

type nativeHistogramSeries struct {
// labels should not be modified after creation
labels LabelPair
lb *labels.Builder
labels labels.Labels
promHistogram prometheus.Histogram
lastUpdated int64
histogram *dto.Histogram
Expand All @@ -53,6 +62,11 @@ type nativeHistogramSeries struct {
// This avoids Prometheus throwing away the first value in the series,
// due to the transition from null -> x.
firstSeries *atomic.Bool

// classic
countLabels labels.Labels
sumLabels labels.Labels
// bucketLabels []labels.Labels
}

func (hs *nativeHistogramSeries) isNew() bool {
Expand All @@ -68,7 +82,7 @@ var (
_ metric = (*nativeHistogram)(nil)
)

func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string, histogramOverride HistogramMode) *nativeHistogram {
func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string, histogramOverride HistogramMode, externalLabels map[string]string) *nativeHistogram {
if onAddSeries == nil {
onAddSeries = func(uint32) bool {
return true
Expand All @@ -90,6 +104,12 @@ func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32)
traceIDLabelName: traceIDLabelName,
buckets: buckets,
histogramOverride: histogramOverride,
externalLabels: externalLabels,

// classic
nameCount: fmt.Sprintf("%s_count", name),
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
nameSum: fmt.Sprintf("%s_sum", name),
nameBucket: fmt.Sprintf("%s_bucket", name),
}
}

Expand All @@ -109,19 +129,15 @@ func (h *nativeHistogram) ObserveWithExemplar(labelValueCombo *LabelValueCombo,
return
}

newSeries := h.newSeries(labelValueCombo, value, traceID, multiplier)
h.series[hash] = newSeries
h.series[hash] = h.newSeries(labelValueCombo, value, traceID, multiplier)
}

func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value float64, traceID string, multiplier float64) *nativeHistogramSeries {
newSeries := &nativeHistogramSeries{
// TODO move these labels in HistogramOpts.ConstLabels?
labels: labelValueCombo.getLabelPair(),
promHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: h.name(),
Help: "Native histogram for metric " + h.name(),
Buckets: h.buckets,
// TODO check if these values are sensible and break them out
Name: h.name(),
Help: "Native histogram for metric " + h.name(),
Buckets: h.buckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 15 * time.Minute,
Expand All @@ -132,6 +148,31 @@ func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value floa

h.updateSeries(newSeries, value, traceID, multiplier)

lbls := labelValueCombo.getLabelPair()
lb := labels.NewBuilder(make(labels.Labels, 1+len(lbls.names)))

// set series labels
for i, name := range lbls.names {
lb.Set(name, lbls.values[i])
}
// set external labels
for name, value := range h.externalLabels {
lb.Set(name, value)
}

lb.Set(labels.MetricName, h.metricName)

newSeries.labels = lb.Labels()
newSeries.lb = lb
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved

// _count
lb.Set(labels.MetricName, h.nameCount)
newSeries.countLabels = lb.Labels()

// _sum
lb.Set(labels.MetricName, h.nameSum)
newSeries.sumLabels = lb.Labels()

return newSeries
}

Expand All @@ -149,28 +190,13 @@ func (h *nativeHistogram) name() string {
return h.metricName
}

func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error) {
func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64, _ map[string]string) (activeSeries int, err error) {
h.seriesMtx.Lock()
defer h.seriesMtx.Unlock()

lbls := make(labels.Labels, 1+len(externalLabels))
lb := labels.NewBuilder(lbls)
activeSeries = 0

lb.Set(labels.MetricName, h.metricName)

// set external labels
for name, value := range externalLabels {
lb.Set(name, value)
}

for _, s := range h.series {

// Set series-specific labels
for i, name := range s.labels.names {
lb.Set(name, s.labels.values[i])
}

// Extract histogram
encodedMetric := &dto.Metric{}

Expand All @@ -180,14 +206,15 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64
return activeSeries, err
}

// NOTE: Store the encoded histogram here so we can keep track of the exemplars
// that have been sent. The value is updated here, but the pointers remain
// the same, and so Reset() call below can be used to clear the exemplars.
// NOTE: Store the encoded histogram here so we can keep track of the
// exemplars that have been sent. The value is updated here, but the
// pointers remain the same, and so Reset() call below can be used to clear
// the exemplars.
s.histogram = encodedMetric.GetHistogram()

// If we are in "both" or "classic" mode, also emit classic histograms.
if hasClassicHistograms(h.histogramOverride) {
classicSeries, classicErr := h.classicHistograms(appender, lb, timeMs, s)
classicSeries, classicErr := h.classicHistograms(appender, timeMs, s)
if classicErr != nil {
return activeSeries, classicErr
}
Expand All @@ -196,7 +223,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64

// If we are in "both" or "native" mode, also emit native histograms.
if hasNativeHistograms(h.histogramOverride) {
nativeErr := h.nativeHistograms(appender, lb, timeMs, s)
nativeErr := h.nativeHistograms(appender, s.labels, timeMs, s)
if nativeErr != nil {
return activeSeries, nativeErr
}
Expand All @@ -215,7 +242,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64
Ts: convertTimestampToMs(encodedExemplar.GetTimestamp()),
HasTs: true,
}
_, err = appender.AppendExemplar(0, lb.Labels(), e)
_, err = appender.AppendExemplar(0, s.labels, e)
if err != nil {
return activeSeries, err
}
Expand Down Expand Up @@ -243,7 +270,7 @@ func (h *nativeHistogram) activeSeriesPerHistogramSerie() uint32 {
return 1
}

func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (err error) {
func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lbls labels.Labels, timeMs int64, s *nativeHistogramSeries) (err error) {
// Decode to Prometheus representation
hist := promhistogram.Histogram{
Schema: s.histogram.GetSchema(),
Expand Down Expand Up @@ -273,66 +300,61 @@ func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels
}
hist.NegativeBuckets = s.histogram.NegativeDelta

lb.Set(labels.MetricName, h.metricName)
_, err = appender.AppendHistogram(0, lb.Labels(), timeMs, &hist, nil)
_, err = appender.AppendHistogram(0, lbls, timeMs, &hist, nil)
if err != nil {
return err
}

return
}

func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) {
func (h *nativeHistogram) classicHistograms(appender storage.Appender, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) {
if s.isNew() {
lb.Set(labels.MetricName, h.metricName+"_count")
endOfLastMinuteMs := getEndOfLastMinuteMs(timeMs)
_, err = appender.Append(0, lb.Labels(), endOfLastMinuteMs, 0)
_, err = appender.Append(0, s.countLabels, endOfLastMinuteMs, 0)
if err != nil {
return activeSeries, err
}
s.registerSeenSeries()
}

// sum
lb.Set(labels.MetricName, h.metricName+"_sum")
_, err = appender.Append(0, lb.Labels(), timeMs, s.histogram.GetSampleSum())
_, err = appender.Append(0, s.sumLabels, timeMs, s.histogram.GetSampleSum())
if err != nil {
return activeSeries, err
}
activeSeries++

// count
lb.Set(labels.MetricName, h.metricName+"_count")
_, err = appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
_, err = appender.Append(0, s.countLabels, timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
if err != nil {
return activeSeries, err
}
activeSeries++

// bucket
lb.Set(labels.MetricName, h.metricName+"_bucket")
s.lb.Set(labels.MetricName, h.metricName+"_bucket")

// the Prometheus histogram will sometimes add the +Inf bucket, it depends on whether there is an exemplar
// for that bucket or not. To avoid adding it twice, keep track of it with this boolean.
infBucketWasAdded := false

for _, bucket := range s.histogram.Bucket {
// add "le" label
lb.Set(labels.BucketLabel, formatFloat(bucket.GetUpperBound()))
s.lb.Set(labels.BucketLabel, formatFloat(bucket.GetUpperBound()))

if bucket.GetUpperBound() == math.Inf(1) {
infBucketWasAdded = true
}

ref, appendErr := appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(bucket.GetCumulativeCountFloat(), bucket.GetCumulativeCount()))
ref, appendErr := appender.Append(0, s.lb.Labels(), timeMs, getIfGreaterThenZeroOr(bucket.GetCumulativeCountFloat(), bucket.GetCumulativeCount()))
if appendErr != nil {
return activeSeries, appendErr
}
activeSeries++

if bucket.Exemplar != nil && len(bucket.Exemplar.Label) > 0 {
// TODO are we appending the same exemplar twice?
_, err = appender.AppendExemplar(ref, lb.Labels(), exemplar.Exemplar{
_, err = appender.AppendExemplar(ref, s.lb.Labels(), exemplar.Exemplar{
Labels: convertLabelPairToLabels(bucket.Exemplar.GetLabel()),
Value: bucket.Exemplar.GetValue(),
Ts: timeMs,
Expand All @@ -346,17 +368,17 @@ func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *label

if !infBucketWasAdded {
// Add +Inf bucket
lb.Set(labels.BucketLabel, "+Inf")
s.lb.Set(labels.BucketLabel, "+Inf")

_, err = appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
_, err = appender.Append(0, s.lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
if err != nil {
return activeSeries, err
}
activeSeries++
}

// drop "le" label again
lb.Del(labels.BucketLabel)
s.lb.Del(labels.BucketLabel)

return
}
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/registry/native_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_ObserveWithExemplar_duplicate(t *testing.T) {
return true
}

h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id", HistogramModeBoth)
h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id", HistogramModeBoth, nil)

lv := newLabelValueCombo([]string{"label"}, []string{"value-1"})

Expand Down Expand Up @@ -463,7 +463,7 @@ func Test_Histograms(t *testing.T) {
}

onAdd := func(uint32) bool { return true }
h := newNativeHistogram("test_histogram", tc.buckets, onAdd, nil, "trace_id", HistogramModeBoth)
h := newNativeHistogram("test_histogram", tc.buckets, onAdd, nil, "trace_id", HistogramModeBoth, nil)
testHistogram(t, h, tc.collections)
})
})
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *ManagedRegistry) NewHistogram(name string, buckets []float64, histogram
// are disabled, eventually the new implementation can handle all cases

if hasNativeHistograms(histogramOverride) {
h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, histogramOverride)
h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, histogramOverride, r.externalLabels)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
} else {
h = newHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, r.externalLabels)
}
Expand Down