From 3a2634e6bc0427d6bc4d4e1c1972a0a1770041c2 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck <dieter@grafana.com> Date: Wed, 6 Jun 2018 13:53:14 +0200 Subject: [PATCH] endpoint for analyzing priority calculation we sometimes see instances coming online and after replaying their backlog, not being able to become ready. Or sometimes we want to analyze the priority calculation --- api/api.go | 9 ++++ api/cluster.go | 8 ++++ api/routes.go | 1 + docs/http-api.md | 40 +++++++++++++++++ input/carbon/carbon.go | 4 ++ input/kafkamdm/kafkamdm.go | 10 +++++ input/kafkamdm/lag_monitor.go | 78 ++++++++++++++++++++++++---------- input/plugin.go | 1 + input/prometheus/prometheus.go | 4 ++ metrictank.go | 1 + 10 files changed, 133 insertions(+), 23 deletions(-) diff --git a/api/api.go b/api/api.go index f216be02f2..a120313580 100644 --- a/api/api.go +++ b/api/api.go @@ -49,6 +49,7 @@ type Server struct { Cache cache.Cache shutdown chan struct{} Tracer opentracing.Tracer + prioritySetters []PrioritySetter } func (s *Server) BindMetricIndex(i idx.MetricIndex) { @@ -73,6 +74,14 @@ func (s *Server) BindPromQueryEngine() { s.PromQueryEngine = promql.NewEngine(s, nil) } +type PrioritySetter interface { + ExplainPriority() interface{} +} + +func (s *Server) BindPrioritySetter(p PrioritySetter) { + s.prioritySetters = append(s.prioritySetters, p) +} + func NewServer() (*Server, error) { m := macaron.New() diff --git a/api/cluster.go b/api/cluster.go index 28c7d1727f..6e518de0d6 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -18,6 +18,14 @@ import ( var NotFoundErr = errors.New("not found") +func (s *Server) explainPriority(ctx *middleware.Context) { + var data []interface{} + for _, p := range s.prioritySetters { + data = append(data, p.ExplainPriority()) + } + response.Write(ctx, response.NewJson(200, data, "")) +} + func (s *Server) getNodeStatus(ctx *middleware.Context) { response.Write(ctx, response.NewJson(200, cluster.Manager.ThisNode(), "")) } diff --git a/api/routes.go b/api/routes.go index afb55be0c4..1864d22e89 100644 --- a/api/routes.go +++ b/api/routes.go @@ -27,6 +27,7 @@ func (s *Server) RegisterRoutes() { r.Get("/", s.appStatus) r.Get("/node", s.getNodeStatus) r.Post("/node", bind(models.NodeStatus{}), s.setNodeStatus) + r.Get("/priority", s.explainPriority) r.Get("/debug/pprof/block", blockHandler) r.Get("/debug/pprof/mutex", mutexHandler) diff --git a/docs/http-api.md b/docs/http-api.md index 169e0d77c8..b42386d21e 100644 --- a/docs/http-api.md +++ b/docs/http-api.md @@ -160,6 +160,46 @@ Sets the primary status to this node to true or false. curl --data primary=true "http://localhost:6060/node" ``` +## Analyze instance priority + +``` +GET /priority +``` + +Gets all enabled plugins to declare how they compute their +priority scores. + +#### Example + +```bash +curl -s http://localhost:6060/priority | jsonpp +[ + "carbon-in: priority=0 (always in sync)", + { + "Title": "kafka-mdm:", + "Explanation": { + "Explanation": { + "Status": { + "0": { + "Lag": 1, + "Rate": 26494, + "Priority": 0 + }, + "1": { + "Lag": 2, + "Rate": 24989, + "Priority": 0 + } + }, + "Priority": 0, + "Updated": "2018-06-06T11:56:24.399840121Z" + }, + "Now": "2018-06-06T11:56:25.016645631Z" + } + } +] +``` + ## Misc ### Tspec diff --git a/input/carbon/carbon.go b/input/carbon/carbon.go index 258f53c33c..d3cd9dd957 100644 --- a/input/carbon/carbon.go +++ b/input/carbon/carbon.go @@ -126,6 +126,10 @@ func (c *Carbon) MaintainPriority() { cluster.Manager.SetPriority(0) } +func (c *Carbon) ExplainPriority() interface{} { + return "carbon-in: priority=0 (always in sync)" +} + func (c *Carbon) accept() { for { conn, err := c.listener.AcceptTCP() diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index d611ce64bf..2428e7d7ac 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -399,3 +399,13 @@ func (k *KafkaMdm) MaintainPriority() { } }() } + +func (k *KafkaMdm) ExplainPriority() interface{} { + return struct { + Title string + Explanation interface{} + }{ + "kafka-mdm:", + k.lagMonitor.Explain(), + } +} diff --git a/input/kafkamdm/lag_monitor.go b/input/kafkamdm/lag_monitor.go index 2238be6fb6..73e94ec9ef 100644 --- a/input/kafkamdm/lag_monitor.go +++ b/input/kafkamdm/lag_monitor.go @@ -8,7 +8,6 @@ import ( // lagLogger maintains a set of most recent lag measurements // and is able to provide the lowest value seen. type lagLogger struct { - sync.Mutex pos int measurements []int } @@ -30,8 +29,6 @@ func (l *lagLogger) Store(lag int) { if lag < 0 { return } - l.Lock() - defer l.Unlock() l.pos++ if len(l.measurements) < cap(l.measurements) { l.measurements = append(l.measurements, lag) @@ -48,8 +45,6 @@ func (l *lagLogger) Store(lag int) { // note: values may be slightly out of date if negative values were reported // (see Store()) func (l *lagLogger) Min() int { - l.Lock() - defer l.Unlock() if len(l.measurements) == 0 { return -1 } @@ -63,7 +58,6 @@ func (l *lagLogger) Min() int { } type rateLogger struct { - sync.Mutex lastOffset int64 lastTs time.Time rate int64 @@ -76,8 +70,6 @@ func newRateLogger() *rateLogger { // Store saves the current offset and updates the rate if it is confident // offset must be concrete values, not logical values like -2 (oldest) or -1 (newest) func (o *rateLogger) Store(offset int64, ts time.Time) { - o.Lock() - defer o.Unlock() if o.lastTs.IsZero() { // first measurement o.lastOffset = offset @@ -122,8 +114,6 @@ func (o *rateLogger) Store(offset int64, ts time.Time) { // * exceptionally, it's an old measurement (if you keep adjusting the system clock) // after startup, reported rate may be 0 if we haven't been up long enough to determine it yet. func (o *rateLogger) Rate() int64 { - o.Lock() - defer o.Unlock() return o.rate } @@ -133,8 +123,22 @@ func (o *rateLogger) Rate() int64 { // * ingest rate // We then combine this data into a score, see the Metric() method. type LagMonitor struct { - lag map[int32]*lagLogger - rate map[int32]*rateLogger + sync.Mutex + lag map[int32]*lagLogger + rate map[int32]*rateLogger + explanation Explanation +} + +type Explanation struct { + Status map[int32]Status + Priority int + Updated time.Time +} + +type Status struct { + Lag int + Rate int + Priority int } func NewLagMonitor(size int, partitions []int32) *LagMonitor { @@ -167,33 +171,61 @@ func NewLagMonitor(size int, partitions []int32) *LagMonitor { // - trouble querying the partition for latest offset // - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag() func (l *LagMonitor) Metric() int { + l.Lock() + defer l.Unlock() + l.explanation = Explanation{ + Status: make(map[int32]Status), + Updated: time.Now(), + } max := 0 for p, lag := range l.lag { - rate := l.rate[p] - l := lag.Min() // accurate lag, -1 if unknown - r := rate.Rate() // accurate rate, or 0 if we're not sure. - if r == 0 { - r = 1 + status := Status{ + Lag: lag.Min(), // accurate lag, -1 if unknown + Rate: int(l.rate[p].Rate()), // accurate rate, or 0 if we're not sure } - var val int - if l == -1 { + if status.Lag == -1 { // if we have no lag measurements yet, // just assign a priority of 10k for this partition - val = 10000 + status.Priority = 10000 } else { - val = l / int(r) + // if we're not sure of rate, we don't want divide by zero + // instead assume rate is super low + if status.Rate == 0 { + status.Priority = status.Lag + } else { + status.Priority = status.Lag / status.Rate + } } - if val > max { - max = val + if status.Priority > max { + max = status.Priority } + l.explanation.Status[p] = status } + l.explanation.Updated = time.Now() + l.explanation.Priority = max return max } +func (l *LagMonitor) Explain() interface{} { + l.Lock() + defer l.Unlock() + return struct { + Explanation Explanation + Now time.Time + }{ + Explanation: l.explanation, + Now: time.Now(), + } +} + func (l *LagMonitor) StoreLag(partition int32, val int) { + l.Lock() l.lag[partition].Store(val) + l.Unlock() } func (l *LagMonitor) StoreOffset(partition int32, offset int64, ts time.Time) { + l.Lock() l.rate[partition].Store(offset, ts) + l.Unlock() } diff --git a/input/plugin.go b/input/plugin.go index 1d1f18dcc2..ec12d245c5 100644 --- a/input/plugin.go +++ b/input/plugin.go @@ -9,5 +9,6 @@ type Plugin interface { // Note that upon fatal close, metrictank will call Stop() on all plugins, also the one that triggered it. Start(handler Handler, fatal chan struct{}) error MaintainPriority() + ExplainPriority() interface{} Stop() // Should block until shutdown is complete. } diff --git a/input/prometheus/prometheus.go b/input/prometheus/prometheus.go index 1e751630fd..3a0f21bf04 100644 --- a/input/prometheus/prometheus.go +++ b/input/prometheus/prometheus.go @@ -55,6 +55,10 @@ func (p *prometheusWriteHandler) MaintainPriority() { cluster.Manager.SetPriority(0) } +func (p *prometheusWriteHandler) ExplainPriority() interface{} { + return "prometheus-in: priority=0 (always in sync)" +} + func (p *prometheusWriteHandler) Stop() { log.Info("prometheus-in: shutting down") } diff --git a/metrictank.go b/metrictank.go index 82be3eece6..ed8c7299cb 100644 --- a/metrictank.go +++ b/metrictank.go @@ -421,6 +421,7 @@ func main() { return } plugin.MaintainPriority() + apiServer.BindPrioritySetter(plugin) } // metric cluster.self.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary