Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prometheus histogram rate overflows #17753

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix "ID" event generator of Google Cloud module {issue}17160[17160] {pull}17608[17608]
- Add privileged option for Auditbeat in Openshift {pull}17637[17637]
- Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624]
- Add a switch to the driver definition on SQL module to use pretty names {pull}17378[17378]
- Add a switch to the driver definition on SQL module to use pretty names. {pull}17378[17378]
- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753]

*Packetbeat*

Expand Down
32 changes: 18 additions & 14 deletions x-pack/metricbeat/module/prometheus/collector/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ type CounterCache interface {
Stop()

// RateUint64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0 on the first call
RateUint64(counterName string, value uint64) uint64
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
RateUint64(counterName string, value uint64) (uint64, bool)

// RateFloat64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0.0 on the first call
RateFloat64(counterName string, value float64) float64
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
RateFloat64(counterName string, value float64) (float64, bool)
}

type counterCache struct {
Expand All @@ -47,35 +49,37 @@ func NewCounterCache(timeout time.Duration) CounterCache {
}

// RateUint64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0 on the first call
func (c *counterCache) RateUint64(counterName string, value uint64) uint64 {
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
func (c *counterCache) RateUint64(counterName string, value uint64) (uint64, bool) {
prev := c.ints.PutWithTimeout(counterName, value, c.timeout)
if prev != nil {
if prev.(uint64) > value {
// counter reset
return 0
return 0, true
}
return value - prev.(uint64)
return value - prev.(uint64), true
}

// first put for this value, return rate of 0
return 0
return 0, false
}

// RateFloat64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0.0 on the first call
func (c *counterCache) RateFloat64(counterName string, value float64) float64 {
// and the value that was given in a previous call, and true if a previous value existed.
// It will return 0 and false on the first call.
func (c *counterCache) RateFloat64(counterName string, value float64) (float64, bool) {
prev := c.floats.PutWithTimeout(counterName, value, c.timeout)
if prev != nil {
if prev.(float64) > value {
// counter reset
return 0
return 0, true
}
return value - prev.(float64)
return value - prev.(float64), true
}

// first put for this value, return rate of 0
return 0
return 0, false
}

// Start the cache cleanup worker. It mus be called once before start using
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/prometheus/collector/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func Test_CounterCache(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
for i, val := range tt.valuesUint64 {
want := tt.expectedUin64[i]
if got := tt.counterCache.RateUint64(tt.counterName, val); got != want {
if got, _ := tt.counterCache.RateUint64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateUint64() = %v, want %v", got, want)
}
}
for i, val := range tt.valuesFloat64 {
want := tt.expectedFloat64[i]
if got := tt.counterCache.RateFloat64(tt.counterName, val); got != want {
if got, _ := tt.counterCache.RateFloat64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateFloat64() = %v, want %v", got, want)
}
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/prometheus/collector/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (g *typedGenerator) rateCounterUint64(name string, labels common.MapStr, va
}

if g.rateCounters {
d["rate"] = g.counterCache.RateUint64(name+labels.String(), value)
d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value)
}

return d
Expand All @@ -184,7 +184,7 @@ func (g *typedGenerator) rateCounterFloat64(name string, labels common.MapStr, v
}

if g.rateCounters {
d["rate"] = g.counterCache.RateFloat64(name+labels.String(), value)
d["rate"], _ = g.counterCache.RateFloat64(name+labels.String(), value)
}

return d
Expand Down
25 changes: 20 additions & 5 deletions x-pack/metricbeat/module/prometheus/collector/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo

// calculate centroids and rated counts
var lastUpper, prevUpper float64
var sumCount uint64
var sumCount, prevCount uint64
for _, bucket := range histogram.GetBucket() {
// Ignore non-numbers
if bucket.GetCumulativeCount() == uint64(math.NaN()) || bucket.GetCumulativeCount() == uint64(math.Inf(0)) {
Expand All @@ -50,10 +50,25 @@ func promHistogramToES(cc CounterCache, name string, labels common.MapStr, histo
lastUpper = bucket.GetUpperBound()
}

// take count for this period (rate) + deacumulate
count := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount()) - sumCount
counts = append(counts, count)
sumCount += count
// Take count for this period (rate)
countRate, found := cc.RateUint64(name+labels.String()+fmt.Sprintf("%f", bucket.GetUpperBound()), bucket.GetCumulativeCount())

switch {
case !found:
// This is a new bucket, consider it zero by now, but still increase the
// sum to don't deviate following buckets that are not new.
counts = append(counts, 0)
sumCount += bucket.GetCumulativeCount() - prevCount
case countRate < sumCount:
// This should never happen, this means something is wrong in the
// prometheus response. Handle it to avoid overflowing when deaccumulating.
counts = append(counts, 0)
default:
// Store the deaccumulated count.
counts = append(counts, countRate-sumCount)
sumCount = countRate
}
prevCount = bucket.GetCumulativeCount()
}

res := common.MapStr{
Expand Down
Loading