diff --git a/nsqd/http.go b/nsqd/http.go index a12e09bc9..6657d48f0 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -18,6 +18,7 @@ import ( "strings" "time" + "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" "github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/protocol" @@ -132,6 +133,9 @@ func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps ht if !s.ctx.nsqd.IsHealthy() { return nil, http_api.Err{500, health} } + if s.ctx.nsqd.serf != nil && (s.ctx.nsqd.serf.State() == serf.SerfAlive || len(s.ctx.nsqd.serf.Members()) < 2) { + return nil, http_api.Err{500, "NOK - gossip unhealthy"} + } return health, nil } @@ -198,6 +202,10 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e } func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + if s.ctx.nsqd.serf == nil || s.ctx.nsqd.serf.State() != serf.SerfAlive { + return nil, http_api.Err{400, "GOSSIP_NOT_ENABLED"} + } + reqParams, err := http_api.NewReqParams(req) if err != nil { return nil, http_api.Err{400, "INVALID_REQUEST"} @@ -544,8 +552,13 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro } } + var serfStats map[string]string + if s.ctx.nsqd.serf != nil { + serfStats = s.ctx.nsqd.serf.Stats() + } + if !jsonFormat { - return s.printStats(stats, health, startTime, uptime), nil + return s.printStats(stats, health, startTime, uptime, serfStats), nil } return struct { @@ -556,18 +569,17 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro }{version.Binary, health, startTime.Unix(), stats}, nil } -func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration) []byte { - var buf bytes.Buffer - w := &buf +func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration, gossip map[string]string) []byte { + w := &bytes.Buffer{} now := time.Now() - io.WriteString(w, fmt.Sprintf("%s\n", version.String("nsqd"))) - io.WriteString(w, fmt.Sprintf("start_time %v\n", startTime.Format(time.RFC3339))) - io.WriteString(w, fmt.Sprintf("uptime %s\n", uptime)) + fmt.Fprintf(w, "%s\n", version.String("nsqd")) + fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339)) + fmt.Fprintf(w, "uptime %s\n", uptime) if len(stats) == 0 { - io.WriteString(w, "\nNO_TOPICS\n") - return buf.Bytes() + w.WriteString("\nNO_TOPICS\n") + return w.Bytes() } - io.WriteString(w, fmt.Sprintf("\nHealth: %s\n", health)) + fmt.Fprintf(w, "\nHealth: %s\n", health) for _, t := range stats { var pausedPrefix string if t.Paused { @@ -575,37 +587,37 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim } else { pausedPrefix = " " } - io.WriteString(w, fmt.Sprintf("\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", + fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", pausedPrefix, t.TopicName, t.Depth, t.BackendDepth, t.MessageCount, - t.E2eProcessingLatency)) + t.E2eProcessingLatency) for _, c := range t.Channels { if c.Paused { pausedPrefix = " *P " } else { pausedPrefix = " " } - io.WriteString(w, - fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n", - pausedPrefix, - c.ChannelName, - c.Depth, - c.BackendDepth, - c.InFlightCount, - c.DeferredCount, - c.RequeueCount, - c.TimeoutCount, - c.MessageCount, - c.E2eProcessingLatency)) + fmt.Fprintf(w, + "%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n", + pausedPrefix, + c.ChannelName, + c.Depth, + c.BackendDepth, + c.InFlightCount, + c.DeferredCount, + c.RequeueCount, + c.TimeoutCount, + c.MessageCount, + c.E2eProcessingLatency) for _, client := range c.Clients { connectTime := time.Unix(client.ConnectTime, 0) // truncate to the second duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second _, port, _ := net.SplitHostPort(client.RemoteAddress) - io.WriteString(w, fmt.Sprintf(" [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n", + fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n", client.Version, fmt.Sprintf("%s:%s", client.Name, port), client.State, @@ -615,11 +627,19 @@ func (s *httpServer) printStats(stats []TopicStats, health string, startTime tim client.RequeueCount, client.MessageCount, duration, - )) + ) } } } - return buf.Bytes() + + if gossip != nil { + fmt.Fprintf(w, "\nGossip:\n") + for k, v := range gossip { + fmt.Fprintf(w, " %s: %s\n", k, v) + } + } + + return w.Bytes() } func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {