diff --git a/api/api.go b/api/api.go index f216be02f2..a120313580 100644 --- a/api/api.go +++ b/api/api.go @@ -49,6 +49,7 @@ type Server struct { Cache cache.Cache shutdown chan struct{} Tracer opentracing.Tracer + prioritySetters []PrioritySetter } func (s *Server) BindMetricIndex(i idx.MetricIndex) { @@ -73,6 +74,14 @@ func (s *Server) BindPromQueryEngine() { s.PromQueryEngine = promql.NewEngine(s, nil) } +type PrioritySetter interface { + ExplainPriority() interface{} +} + +func (s *Server) BindPrioritySetter(p PrioritySetter) { + s.prioritySetters = append(s.prioritySetters, p) +} + func NewServer() (*Server, error) { m := macaron.New() diff --git a/api/cluster.go b/api/cluster.go index 28c7d1727f..6e518de0d6 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -18,6 +18,14 @@ import ( var NotFoundErr = errors.New("not found") +func (s *Server) explainPriority(ctx *middleware.Context) { + var data []interface{} + for _, p := range s.prioritySetters { + data = append(data, p.ExplainPriority()) + } + response.Write(ctx, response.NewJson(200, data, "")) +} + func (s *Server) getNodeStatus(ctx *middleware.Context) { response.Write(ctx, response.NewJson(200, cluster.Manager.ThisNode(), "")) } diff --git a/api/routes.go b/api/routes.go index afb55be0c4..1864d22e89 100644 --- a/api/routes.go +++ b/api/routes.go @@ -27,6 +27,7 @@ func (s *Server) RegisterRoutes() { r.Get("/", s.appStatus) r.Get("/node", s.getNodeStatus) r.Post("/node", bind(models.NodeStatus{}), s.setNodeStatus) + r.Get("/priority", s.explainPriority) r.Get("/debug/pprof/block", blockHandler) r.Get("/debug/pprof/mutex", mutexHandler) diff --git a/docs/http-api.md b/docs/http-api.md index 169e0d77c8..b42386d21e 100644 --- a/docs/http-api.md +++ b/docs/http-api.md @@ -160,6 +160,46 @@ Sets the primary status to this node to true or false. curl --data primary=true "http://localhost:6060/node" ``` +## Analyze instance priority + +``` +GET /priority +``` + +Gets all enabled plugins to declare how they compute their +priority scores. + +#### Example + +```bash +curl -s http://localhost:6060/priority | jsonpp +[ + "carbon-in: priority=0 (always in sync)", + { + "Title": "kafka-mdm:", + "Explanation": { + "Explanation": { + "Status": { + "0": { + "Lag": 1, + "Rate": 26494, + "Priority": 0 + }, + "1": { + "Lag": 2, + "Rate": 24989, + "Priority": 0 + } + }, + "Priority": 0, + "Updated": "2018-06-06T11:56:24.399840121Z" + }, + "Now": "2018-06-06T11:56:25.016645631Z" + } + } +] +``` + ## Misc ### Tspec diff --git a/input/carbon/carbon.go b/input/carbon/carbon.go index 258f53c33c..d3cd9dd957 100644 --- a/input/carbon/carbon.go +++ b/input/carbon/carbon.go @@ -126,6 +126,10 @@ func (c *Carbon) MaintainPriority() { cluster.Manager.SetPriority(0) } +func (c *Carbon) ExplainPriority() interface{} { + return "carbon-in: priority=0 (always in sync)" +} + func (c *Carbon) accept() { for { conn, err := c.listener.AcceptTCP() diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index d611ce64bf..2428e7d7ac 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -399,3 +399,13 @@ func (k *KafkaMdm) MaintainPriority() { } }() } + +func (k *KafkaMdm) ExplainPriority() interface{} { + return struct { + Title string + Explanation interface{} + }{ + "kafka-mdm:", + k.lagMonitor.Explain(), + } +} diff --git a/input/kafkamdm/lag_monitor.go b/input/kafkamdm/lag_monitor.go index 2238be6fb6..73e94ec9ef 100644 --- a/input/kafkamdm/lag_monitor.go +++ b/input/kafkamdm/lag_monitor.go @@ -8,7 +8,6 @@ import ( // lagLogger maintains a set of most recent lag measurements // and is able to provide the lowest value seen. type lagLogger struct { - sync.Mutex pos int measurements []int } @@ -30,8 +29,6 @@ func (l *lagLogger) Store(lag int) { if lag < 0 { return } - l.Lock() - defer l.Unlock() l.pos++ if len(l.measurements) < cap(l.measurements) { l.measurements = append(l.measurements, lag) @@ -48,8 +45,6 @@ func (l *lagLogger) Store(lag int) { // note: values may be slightly out of date if negative values were reported // (see Store()) func (l *lagLogger) Min() int { - l.Lock() - defer l.Unlock() if len(l.measurements) == 0 { return -1 } @@ -63,7 +58,6 @@ func (l *lagLogger) Min() int { } type rateLogger struct { - sync.Mutex lastOffset int64 lastTs time.Time rate int64 @@ -76,8 +70,6 @@ func newRateLogger() *rateLogger { // Store saves the current offset and updates the rate if it is confident // offset must be concrete values, not logical values like -2 (oldest) or -1 (newest) func (o *rateLogger) Store(offset int64, ts time.Time) { - o.Lock() - defer o.Unlock() if o.lastTs.IsZero() { // first measurement o.lastOffset = offset @@ -122,8 +114,6 @@ func (o *rateLogger) Store(offset int64, ts time.Time) { // * exceptionally, it's an old measurement (if you keep adjusting the system clock) // after startup, reported rate may be 0 if we haven't been up long enough to determine it yet. func (o *rateLogger) Rate() int64 { - o.Lock() - defer o.Unlock() return o.rate } @@ -133,8 +123,22 @@ func (o *rateLogger) Rate() int64 { // * ingest rate // We then combine this data into a score, see the Metric() method. type LagMonitor struct { - lag map[int32]*lagLogger - rate map[int32]*rateLogger + sync.Mutex + lag map[int32]*lagLogger + rate map[int32]*rateLogger + explanation Explanation +} + +type Explanation struct { + Status map[int32]Status + Priority int + Updated time.Time +} + +type Status struct { + Lag int + Rate int + Priority int } func NewLagMonitor(size int, partitions []int32) *LagMonitor { @@ -167,33 +171,61 @@ func NewLagMonitor(size int, partitions []int32) *LagMonitor { // - trouble querying the partition for latest offset // - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag() func (l *LagMonitor) Metric() int { + l.Lock() + defer l.Unlock() + l.explanation = Explanation{ + Status: make(map[int32]Status), + Updated: time.Now(), + } max := 0 for p, lag := range l.lag { - rate := l.rate[p] - l := lag.Min() // accurate lag, -1 if unknown - r := rate.Rate() // accurate rate, or 0 if we're not sure. - if r == 0 { - r = 1 + status := Status{ + Lag: lag.Min(), // accurate lag, -1 if unknown + Rate: int(l.rate[p].Rate()), // accurate rate, or 0 if we're not sure } - var val int - if l == -1 { + if status.Lag == -1 { // if we have no lag measurements yet, // just assign a priority of 10k for this partition - val = 10000 + status.Priority = 10000 } else { - val = l / int(r) + // if we're not sure of rate, we don't want divide by zero + // instead assume rate is super low + if status.Rate == 0 { + status.Priority = status.Lag + } else { + status.Priority = status.Lag / status.Rate + } } - if val > max { - max = val + if status.Priority > max { + max = status.Priority } + l.explanation.Status[p] = status } + l.explanation.Updated = time.Now() + l.explanation.Priority = max return max } +func (l *LagMonitor) Explain() interface{} { + l.Lock() + defer l.Unlock() + return struct { + Explanation Explanation + Now time.Time + }{ + Explanation: l.explanation, + Now: time.Now(), + } +} + func (l *LagMonitor) StoreLag(partition int32, val int) { + l.Lock() l.lag[partition].Store(val) + l.Unlock() } func (l *LagMonitor) StoreOffset(partition int32, offset int64, ts time.Time) { + l.Lock() l.rate[partition].Store(offset, ts) + l.Unlock() } diff --git a/input/plugin.go b/input/plugin.go index 1d1f18dcc2..ec12d245c5 100644 --- a/input/plugin.go +++ b/input/plugin.go @@ -9,5 +9,6 @@ type Plugin interface { // Note that upon fatal close, metrictank will call Stop() on all plugins, also the one that triggered it. Start(handler Handler, fatal chan struct{}) error MaintainPriority() + ExplainPriority() interface{} Stop() // Should block until shutdown is complete. } diff --git a/input/prometheus/prometheus.go b/input/prometheus/prometheus.go index 1e751630fd..3a0f21bf04 100644 --- a/input/prometheus/prometheus.go +++ b/input/prometheus/prometheus.go @@ -55,6 +55,10 @@ func (p *prometheusWriteHandler) MaintainPriority() { cluster.Manager.SetPriority(0) } +func (p *prometheusWriteHandler) ExplainPriority() interface{} { + return "prometheus-in: priority=0 (always in sync)" +} + func (p *prometheusWriteHandler) Stop() { log.Info("prometheus-in: shutting down") } diff --git a/metrictank.go b/metrictank.go index 82be3eece6..ed8c7299cb 100644 --- a/metrictank.go +++ b/metrictank.go @@ -421,6 +421,7 @@ func main() { return } plugin.MaintainPriority() + apiServer.BindPrioritySetter(plugin) } // metric cluster.self.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary