Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Modify lag rate calculations #1022

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
@@ -347,7 +347,6 @@ func (k *KafkaMdm) trackStats(topic string, partition int32) {
return
case ts := <-ticker.C:
currentOffset := int64(kafkaStats.Offset.Peek())
k.lagMonitor.StoreOffset(partition, currentOffset, ts)
newest, err := k.tryGetOffset(topic, partition, sarama.OffsetNewest, 1, 0)
if err != nil {
log.Errorf("kafkamdm: %s", err.Error())
@@ -356,7 +355,7 @@ func (k *KafkaMdm) trackStats(topic string, partition int32) {
kafkaStats.LogSize.Set(int(newest))
lag := int(newest - currentOffset)
kafkaStats.Lag.Set(lag)
k.lagMonitor.StoreLag(partition, lag)
k.lagMonitor.StoreOffsets(partition, currentOffset, newest, ts)
}
}
}
146 changes: 63 additions & 83 deletions input/kafkamdm/lag_monitor.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,57 @@
package kafkamdm

import (
"math"
"sync"
"time"
)

type offsetMeasurement struct {
readOffset, highWaterMark int64
ts time.Time
}

// lagLogger maintains a set of most recent lag measurements
// and is able to provide the lowest value seen.
type lagLogger struct {
pos int
measurements []int
measurements []offsetMeasurement
}

func newLagLogger(size int) *lagLogger {
return &lagLogger{
pos: 0,
measurements: make([]int, 0, size),
measurements: make([]offsetMeasurement, 0, size),
}
}

// Store saves the current value, potentially overwriting an old value
// Store saves the current offsets and timestamp, potentially overwriting an old value
// if needed.
// Note: negative values are ignored. We rely on previous data - if any - in such case.
// Note: negative values of highWaterMark - readOffset are ignored. We rely on previous data - if any - in such case.
// negative values can happen upon a rollover of the offset counter
func (l *lagLogger) Store(lag int) {
func (l *lagLogger) Store(readOffset, highWaterMark int64, ts time.Time) {
lag := highWaterMark - readOffset
if lag < 0 {
return
}
l.pos++

measurement := offsetMeasurement{
readOffset: readOffset,
highWaterMark: highWaterMark,
ts: ts,
}
if len(l.measurements) < cap(l.measurements) {
l.measurements = append(l.measurements, lag)
l.measurements = append(l.measurements, measurement)
l.pos = len(l.measurements) - 1
return
}

l.pos++

if l.pos >= cap(l.measurements) {
l.pos = 0
}
l.measurements[l.pos] = lag
l.measurements[l.pos] = measurement
}

// Min returns the lowest lag seen (0 or positive) or -1 if no lags reported yet
@@ -46,73 +61,47 @@ func (l *lagLogger) Min() int {
if len(l.measurements) == 0 {
return -1
}
min := -1
for _, m := range l.measurements {
if min < 0 || m < min {
min = m
min := int(l.measurements[0].highWaterMark - l.measurements[0].readOffset)
for _, m := range l.measurements[1:] {
lag := int(m.highWaterMark - m.readOffset)
if lag < min {
min = lag
}
}
return min
}

type rateLogger struct {
lastOffset int64
lastTs time.Time
rate int64
}
// Rate returns an average rate calculation over the last N measurements (configured at construction)
// after startup, reported rate may be 0 if we haven't been up long enough to determine it yet.
func (l *lagLogger) Rate() int64 {
if len(l.measurements) < 2 {
return 0
}

func newRateLogger() *rateLogger {
return &rateLogger{}
}
latestLag := l.measurements[l.pos]
var earliestLag offsetMeasurement

// Store saves the current offset and updates the rate if it is confident
// offset must be concrete values, not logical values like -2 (oldest) or -1 (newest)
func (o *rateLogger) Store(offset int64, ts time.Time) {
if o.lastTs.IsZero() {
// first measurement
o.lastOffset = offset
o.lastTs = ts
return
if len(l.measurements) == cap(l.measurements) {
earliestLag = l.measurements[(l.pos+1)%len(l.measurements)]
} else {
earliestLag = l.measurements[0]
}
duration := ts.Sub(o.lastTs)
if duration < time.Second && duration > 0 {
// too small difference. either due to clock adjustment or this method
// is called very frequently, e.g. due to a subsecond offset-commit-interval.
// We need to let more time pass to make an accurate calculation.
return
}
if duration <= 0 {
// current ts is <= last ts. This would only happen if clock went back in time
// in which case we can't reliably work out how
// long it has really been since we last took a measurement.
// but set a new baseline for next time
o.lastTs = ts
o.lastOffset = offset
return

high, low := latestLag.highWaterMark, earliestLag.highWaterMark

// If there are no longer any incoming messages, use our read offsets to compute rate
if high == low {
high, low = latestLag.readOffset, earliestLag.readOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason why we wouldn't just always use the .readOffset, instead of only using it if the .highWaterMarks are equal?
If I understand this correctly then it will now calculate the rate based on how fast Kafka receives new messages in this partition, unless it receives no new messages, then it will calculate the rate based on how fast MT consumes them, right? It seems a little surprising to me that the caller of .Rate() doesn't know which of those two rates it will actually get returned...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to estimate the interval between now and the time this message was put into kafka. Using the rate of growth of the partition offsets in kafka is better for this than our read offset, since various things can impact our ingest rate (GC, Prune locks, etc). The old implementation relied on the read offset and was susceptible to over correction due to short pauses in ingest.

As for the failover...it could be better, and I'm open to suggestions. I think that @Dieterbe was looking into a more complete refactor that split the computations from the usage.

I'm most interested in coming up with the most effective heuristic for estimating eligibility in cluster processing. If that means that a well-known fallback behavior is used, then so be it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense 👍

}
metrics := offset - o.lastOffset
o.lastTs = ts
o.lastOffset = offset
if metrics < 0 {
// this is possible if our offset counter rolls over or is reset.
// If it was a rollover we could compute the rate, but it is safer
// to just keep using the last computed rate, and wait for the next
// measurement to compute a new rate based on the new baseline
return
if high < low {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be else if.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot, due to variable change. high/low are modified in the first branch.

// Offset must have wrapped around...seems unlikely. Estimate growth using MaxInt64
high = math.MaxInt64
}
// note the multiplication overflows if you have 9 billion metrics
// sice max int64 is 9 223 372 036 854 775 807 (not an issue in practice)
o.rate = (1e9 * metrics / int64(duration)) // metrics/ns -> metrics/s
return
}

// Rate returns the last reliable rate calculation
// * generally, it's the last reported measurement
// * occasionally, it's one report interval in the past (due to rollover)
// * exceptionally, it's an old measurement (if you keep adjusting the system clock)
// after startup, reported rate may be 0 if we haven't been up long enough to determine it yet.
func (o *rateLogger) Rate() int64 {
return o.rate
totalGrowth := float64(high - low)
totalDurationSec := float64(latestLag.ts.UnixNano()-earliestLag.ts.UnixNano()) / float64(time.Second.Nanoseconds())

return int64(totalGrowth / totalDurationSec)
}

// LagMonitor determines how upToDate this node is.
@@ -122,8 +111,7 @@ func (o *rateLogger) Rate() int64 {
// We then combine this data into a score, see the Metric() method.
type LagMonitor struct {
sync.Mutex
lag map[int32]*lagLogger
rate map[int32]*rateLogger
monitors map[int32]*lagLogger
explanation Explanation
}

@@ -141,22 +129,20 @@ type Status struct {

func NewLagMonitor(size int, partitions []int32) *LagMonitor {
m := &LagMonitor{
lag: make(map[int32]*lagLogger),
rate: make(map[int32]*rateLogger),
monitors: make(map[int32]*lagLogger),
}
for _, p := range partitions {
m.lag[p] = newLagLogger(size)
m.rate[p] = newRateLogger()
m.monitors[p] = newLagLogger(size)
}
return m
}

// Metric computes the overall score of up-to-date-ness of this node,
// as an estimated number of seconds behind kafka.
// We first compute the score for each partition like so:
// (minimum lag seen in last N measurements) / ingest rate.
// (minimum lag seen in last N measurements) / input rate.
// example:
// lag (in messages/metrics) ingest rate ---> score (seconds behind)
// lag (in messages/metrics) input rate ---> score (seconds behind)
// 10k 1k/second 10
// 200 1k/second 0 (less than 1s behind)
// 0 * 0 (perfectly in sync)
@@ -176,10 +162,10 @@ func (l *LagMonitor) Metric() int {
Updated: time.Now(),
}
max := 0
for p, lag := range l.lag {
for p, lag := range l.monitors {
status := Status{
Lag: lag.Min(), // accurate lag, -1 if unknown
Rate: int(l.rate[p].Rate()), // accurate rate, or 0 if we're not sure
Lag: lag.Min(), // accurate lag, -1 if unknown
Rate: int(lag.Rate()), // accurate rate, or 0 if we're not sure
}
if status.Lag == -1 {
// if we have no lag measurements yet,
@@ -216,14 +202,8 @@ func (l *LagMonitor) Explain() interface{} {
}
}

func (l *LagMonitor) StoreLag(partition int32, val int) {
l.Lock()
l.lag[partition].Store(val)
l.Unlock()
}

func (l *LagMonitor) StoreOffset(partition int32, offset int64, ts time.Time) {
func (l *LagMonitor) StoreOffsets(partition int32, readOffset, highWaterMark int64, ts time.Time) {
l.Lock()
l.rate[partition].Store(offset, ts)
l.monitors[partition].Store(readOffset, highWaterMark, ts)
l.Unlock()
}
Loading