8
8
// lagLogger maintains a set of most recent lag measurements
9
9
// and is able to provide the lowest value seen.
10
10
type lagLogger struct {
11
- sync.Mutex
12
11
pos int
13
12
measurements []int
14
13
}
@@ -30,8 +29,6 @@ func (l *lagLogger) Store(lag int) {
30
29
if lag < 0 {
31
30
return
32
31
}
33
- l .Lock ()
34
- defer l .Unlock ()
35
32
l .pos ++
36
33
if len (l .measurements ) < cap (l .measurements ) {
37
34
l .measurements = append (l .measurements , lag )
@@ -48,8 +45,6 @@ func (l *lagLogger) Store(lag int) {
48
45
// note: values may be slightly out of date if negative values were reported
49
46
// (see Store())
50
47
func (l * lagLogger ) Min () int {
51
- l .Lock ()
52
- defer l .Unlock ()
53
48
if len (l .measurements ) == 0 {
54
49
return - 1
55
50
}
@@ -63,7 +58,6 @@ func (l *lagLogger) Min() int {
63
58
}
64
59
65
60
type rateLogger struct {
66
- sync.Mutex
67
61
lastOffset int64
68
62
lastTs time.Time
69
63
rate int64
@@ -76,8 +70,6 @@ func newRateLogger() *rateLogger {
76
70
// Store saves the current offset and updates the rate if it is confident
77
71
// offset must be concrete values, not logical values like -2 (oldest) or -1 (newest)
78
72
func (o * rateLogger ) Store (offset int64 , ts time.Time ) {
79
- o .Lock ()
80
- defer o .Unlock ()
81
73
if o .lastTs .IsZero () {
82
74
// first measurement
83
75
o .lastOffset = offset
@@ -122,8 +114,6 @@ func (o *rateLogger) Store(offset int64, ts time.Time) {
122
114
// * exceptionally, it's an old measurement (if you keep adjusting the system clock)
123
115
// after startup, reported rate may be 0 if we haven't been up long enough to determine it yet.
124
116
func (o * rateLogger ) Rate () int64 {
125
- o .Lock ()
126
- defer o .Unlock ()
127
117
return o .rate
128
118
}
129
119
@@ -133,8 +123,22 @@ func (o *rateLogger) Rate() int64 {
133
123
// * ingest rate
134
124
// We then combine this data into a score, see the Metric() method.
135
125
type LagMonitor struct {
136
- lag map [int32 ]* lagLogger
137
- rate map [int32 ]* rateLogger
126
+ sync.Mutex
127
+ lag map [int32 ]* lagLogger
128
+ rate map [int32 ]* rateLogger
129
+ explanation Explanation
130
+ }
131
+
132
+ type Explanation struct {
133
+ Status map [int32 ]Status
134
+ Priority int
135
+ Updated time.Time
136
+ }
137
+
138
+ type Status struct {
139
+ Lag int
140
+ Rate int
141
+ Priority int
138
142
}
139
143
140
144
func NewLagMonitor (size int , partitions []int32 ) * LagMonitor {
@@ -167,33 +171,61 @@ func NewLagMonitor(size int, partitions []int32) *LagMonitor {
167
171
// - trouble querying the partition for latest offset
168
172
// - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag()
169
173
func (l * LagMonitor ) Metric () int {
174
+ l .Lock ()
175
+ defer l .Unlock ()
176
+ l .explanation = Explanation {
177
+ Status : make (map [int32 ]Status ),
178
+ Updated : time .Now (),
179
+ }
170
180
max := 0
171
181
for p , lag := range l .lag {
172
- rate := l .rate [p ]
173
- l := lag .Min () // accurate lag, -1 if unknown
174
- r := rate .Rate () // accurate rate, or 0 if we're not sure.
175
- if r == 0 {
176
- r = 1
182
+ status := Status {
183
+ Lag : lag .Min (), // accurate lag, -1 if unknown
184
+ Rate : int (l .rate [p ].Rate ()), // accurate rate, or 0 if we're not sure
177
185
}
178
- var val int
179
- if l == - 1 {
186
+ if status .Lag == - 1 {
180
187
// if we have no lag measurements yet,
181
188
// just assign a priority of 10k for this partition
182
- val = 10000
189
+ status . Priority = 10000
183
190
} else {
184
- val = l / int (r )
191
+ // if we're not sure of rate, we don't want divide by zero
192
+ // instead assume rate is super low
193
+ if status .Rate == 0 {
194
+ status .Priority = status .Lag
195
+ } else {
196
+ status .Priority = status .Lag / status .Rate
197
+ }
185
198
}
186
- if val > max {
187
- max = val
199
+ if status . Priority > max {
200
+ max = status . Priority
188
201
}
202
+ l .explanation .Status [p ] = status
189
203
}
204
+ l .explanation .Updated = time .Now ()
205
+ l .explanation .Priority = max
190
206
return max
191
207
}
192
208
209
+ func (l * LagMonitor ) Explain () interface {} {
210
+ l .Lock ()
211
+ defer l .Unlock ()
212
+ return struct {
213
+ Explanation Explanation
214
+ Now time.Time
215
+ }{
216
+ Explanation : l .explanation ,
217
+ Now : time .Now (),
218
+ }
219
+ }
220
+
193
221
func (l * LagMonitor ) StoreLag (partition int32 , val int ) {
222
+ l .Lock ()
194
223
l .lag [partition ].Store (val )
224
+ l .Unlock ()
195
225
}
196
226
197
227
func (l * LagMonitor ) StoreOffset (partition int32 , offset int64 , ts time.Time ) {
228
+ l .Lock ()
198
229
l .rate [partition ].Store (offset , ts )
230
+ l .Unlock ()
199
231
}
0 commit comments