Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 3a2634e

Browse files
committed
endpoint for analyzing priority calculation
we sometimes see instances coming online and after replaying their backlog, not being able to become ready. Or sometimes we want to analyze the priority calculation
1 parent 5baf51d commit 3a2634e

File tree

10 files changed

+133
-23
lines changed

10 files changed

+133
-23
lines changed

api/api.go

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Server struct {
4949
Cache cache.Cache
5050
shutdown chan struct{}
5151
Tracer opentracing.Tracer
52+
prioritySetters []PrioritySetter
5253
}
5354

5455
func (s *Server) BindMetricIndex(i idx.MetricIndex) {
@@ -73,6 +74,14 @@ func (s *Server) BindPromQueryEngine() {
7374
s.PromQueryEngine = promql.NewEngine(s, nil)
7475
}
7576

77+
type PrioritySetter interface {
78+
ExplainPriority() interface{}
79+
}
80+
81+
func (s *Server) BindPrioritySetter(p PrioritySetter) {
82+
s.prioritySetters = append(s.prioritySetters, p)
83+
}
84+
7685
func NewServer() (*Server, error) {
7786

7887
m := macaron.New()

api/cluster.go

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ import (
1818

1919
var NotFoundErr = errors.New("not found")
2020

21+
func (s *Server) explainPriority(ctx *middleware.Context) {
22+
var data []interface{}
23+
for _, p := range s.prioritySetters {
24+
data = append(data, p.ExplainPriority())
25+
}
26+
response.Write(ctx, response.NewJson(200, data, ""))
27+
}
28+
2129
func (s *Server) getNodeStatus(ctx *middleware.Context) {
2230
response.Write(ctx, response.NewJson(200, cluster.Manager.ThisNode(), ""))
2331
}

api/routes.go

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func (s *Server) RegisterRoutes() {
2727
r.Get("/", s.appStatus)
2828
r.Get("/node", s.getNodeStatus)
2929
r.Post("/node", bind(models.NodeStatus{}), s.setNodeStatus)
30+
r.Get("/priority", s.explainPriority)
3031
r.Get("/debug/pprof/block", blockHandler)
3132
r.Get("/debug/pprof/mutex", mutexHandler)
3233

docs/http-api.md

+40
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,46 @@ Sets the primary status to this node to true or false.
160160
curl --data primary=true "http://localhost:6060/node"
161161
```
162162

163+
## Analyze instance priority
164+
165+
```
166+
GET /priority
167+
```
168+
169+
Gets all enabled plugins to declare how they compute their
170+
priority scores.
171+
172+
#### Example
173+
174+
```bash
175+
curl -s http://localhost:6060/priority | jsonpp
176+
[
177+
"carbon-in: priority=0 (always in sync)",
178+
{
179+
"Title": "kafka-mdm:",
180+
"Explanation": {
181+
"Explanation": {
182+
"Status": {
183+
"0": {
184+
"Lag": 1,
185+
"Rate": 26494,
186+
"Priority": 0
187+
},
188+
"1": {
189+
"Lag": 2,
190+
"Rate": 24989,
191+
"Priority": 0
192+
}
193+
},
194+
"Priority": 0,
195+
"Updated": "2018-06-06T11:56:24.399840121Z"
196+
},
197+
"Now": "2018-06-06T11:56:25.016645631Z"
198+
}
199+
}
200+
]
201+
```
202+
163203
## Misc
164204

165205
### Tspec

input/carbon/carbon.go

+4
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ func (c *Carbon) MaintainPriority() {
126126
cluster.Manager.SetPriority(0)
127127
}
128128

129+
func (c *Carbon) ExplainPriority() interface{} {
130+
return "carbon-in: priority=0 (always in sync)"
131+
}
132+
129133
func (c *Carbon) accept() {
130134
for {
131135
conn, err := c.listener.AcceptTCP()

input/kafkamdm/kafkamdm.go

+10
Original file line numberDiff line numberDiff line change
@@ -399,3 +399,13 @@ func (k *KafkaMdm) MaintainPriority() {
399399
}
400400
}()
401401
}
402+
403+
func (k *KafkaMdm) ExplainPriority() interface{} {
404+
return struct {
405+
Title string
406+
Explanation interface{}
407+
}{
408+
"kafka-mdm:",
409+
k.lagMonitor.Explain(),
410+
}
411+
}

input/kafkamdm/lag_monitor.go

+55-23
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
// lagLogger maintains a set of most recent lag measurements
99
// and is able to provide the lowest value seen.
1010
type lagLogger struct {
11-
sync.Mutex
1211
pos int
1312
measurements []int
1413
}
@@ -30,8 +29,6 @@ func (l *lagLogger) Store(lag int) {
3029
if lag < 0 {
3130
return
3231
}
33-
l.Lock()
34-
defer l.Unlock()
3532
l.pos++
3633
if len(l.measurements) < cap(l.measurements) {
3734
l.measurements = append(l.measurements, lag)
@@ -48,8 +45,6 @@ func (l *lagLogger) Store(lag int) {
4845
// note: values may be slightly out of date if negative values were reported
4946
// (see Store())
5047
func (l *lagLogger) Min() int {
51-
l.Lock()
52-
defer l.Unlock()
5348
if len(l.measurements) == 0 {
5449
return -1
5550
}
@@ -63,7 +58,6 @@ func (l *lagLogger) Min() int {
6358
}
6459

6560
type rateLogger struct {
66-
sync.Mutex
6761
lastOffset int64
6862
lastTs time.Time
6963
rate int64
@@ -76,8 +70,6 @@ func newRateLogger() *rateLogger {
7670
// Store saves the current offset and updates the rate if it is confident
7771
// offset must be concrete values, not logical values like -2 (oldest) or -1 (newest)
7872
func (o *rateLogger) Store(offset int64, ts time.Time) {
79-
o.Lock()
80-
defer o.Unlock()
8173
if o.lastTs.IsZero() {
8274
// first measurement
8375
o.lastOffset = offset
@@ -122,8 +114,6 @@ func (o *rateLogger) Store(offset int64, ts time.Time) {
122114
// * exceptionally, it's an old measurement (if you keep adjusting the system clock)
123115
// after startup, reported rate may be 0 if we haven't been up long enough to determine it yet.
124116
func (o *rateLogger) Rate() int64 {
125-
o.Lock()
126-
defer o.Unlock()
127117
return o.rate
128118
}
129119

@@ -133,8 +123,22 @@ func (o *rateLogger) Rate() int64 {
133123
// * ingest rate
134124
// We then combine this data into a score, see the Metric() method.
135125
type LagMonitor struct {
136-
lag map[int32]*lagLogger
137-
rate map[int32]*rateLogger
126+
sync.Mutex
127+
lag map[int32]*lagLogger
128+
rate map[int32]*rateLogger
129+
explanation Explanation
130+
}
131+
132+
type Explanation struct {
133+
Status map[int32]Status
134+
Priority int
135+
Updated time.Time
136+
}
137+
138+
type Status struct {
139+
Lag int
140+
Rate int
141+
Priority int
138142
}
139143

140144
func NewLagMonitor(size int, partitions []int32) *LagMonitor {
@@ -167,33 +171,61 @@ func NewLagMonitor(size int, partitions []int32) *LagMonitor {
167171
// - trouble querying the partition for latest offset
168172
// - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag()
169173
func (l *LagMonitor) Metric() int {
174+
l.Lock()
175+
defer l.Unlock()
176+
l.explanation = Explanation{
177+
Status: make(map[int32]Status),
178+
Updated: time.Now(),
179+
}
170180
max := 0
171181
for p, lag := range l.lag {
172-
rate := l.rate[p]
173-
l := lag.Min() // accurate lag, -1 if unknown
174-
r := rate.Rate() // accurate rate, or 0 if we're not sure.
175-
if r == 0 {
176-
r = 1
182+
status := Status{
183+
Lag: lag.Min(), // accurate lag, -1 if unknown
184+
Rate: int(l.rate[p].Rate()), // accurate rate, or 0 if we're not sure
177185
}
178-
var val int
179-
if l == -1 {
186+
if status.Lag == -1 {
180187
// if we have no lag measurements yet,
181188
// just assign a priority of 10k for this partition
182-
val = 10000
189+
status.Priority = 10000
183190
} else {
184-
val = l / int(r)
191+
// if we're not sure of rate, we don't want divide by zero
192+
// instead assume rate is super low
193+
if status.Rate == 0 {
194+
status.Priority = status.Lag
195+
} else {
196+
status.Priority = status.Lag / status.Rate
197+
}
185198
}
186-
if val > max {
187-
max = val
199+
if status.Priority > max {
200+
max = status.Priority
188201
}
202+
l.explanation.Status[p] = status
189203
}
204+
l.explanation.Updated = time.Now()
205+
l.explanation.Priority = max
190206
return max
191207
}
192208

209+
func (l *LagMonitor) Explain() interface{} {
210+
l.Lock()
211+
defer l.Unlock()
212+
return struct {
213+
Explanation Explanation
214+
Now time.Time
215+
}{
216+
Explanation: l.explanation,
217+
Now: time.Now(),
218+
}
219+
}
220+
193221
func (l *LagMonitor) StoreLag(partition int32, val int) {
222+
l.Lock()
194223
l.lag[partition].Store(val)
224+
l.Unlock()
195225
}
196226

197227
func (l *LagMonitor) StoreOffset(partition int32, offset int64, ts time.Time) {
228+
l.Lock()
198229
l.rate[partition].Store(offset, ts)
230+
l.Unlock()
199231
}

input/plugin.go

+1
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ type Plugin interface {
99
// Note that upon fatal close, metrictank will call Stop() on all plugins, also the one that triggered it.
1010
Start(handler Handler, fatal chan struct{}) error
1111
MaintainPriority()
12+
ExplainPriority() interface{}
1213
Stop() // Should block until shutdown is complete.
1314
}

input/prometheus/prometheus.go

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (p *prometheusWriteHandler) MaintainPriority() {
5555
cluster.Manager.SetPriority(0)
5656
}
5757

58+
func (p *prometheusWriteHandler) ExplainPriority() interface{} {
59+
return "prometheus-in: priority=0 (always in sync)"
60+
}
61+
5862
func (p *prometheusWriteHandler) Stop() {
5963
log.Info("prometheus-in: shutting down")
6064
}

metrictank.go

+1
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ func main() {
421421
return
422422
}
423423
plugin.MaintainPriority()
424+
apiServer.BindPrioritySetter(plugin)
424425
}
425426

426427
// metric cluster.self.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary

0 commit comments

Comments
 (0)