From 3f4af16f6b62ef1ee4ff27ec0c3788190d2ab269 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 6 May 2020 15:20:28 -0400 Subject: [PATCH] [FIXEd] subsz monitoring endpoint did not account for accounts. Fixes #1371 and #1357 by adding up stats and collecting subscriptions from all accounts. Signed-off-by: Matthias Hanel --- server/monitor.go | 22 +++++--- server/monitor_test.go | 112 ++++++++++++++++++++++++++++++++++++++++- server/sublist.go | 24 +++++++++ 3 files changed, 150 insertions(+), 8 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 2a99404accb..691b96d3d96 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -752,6 +752,8 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) { // Subsz represents detail information on current connections. type Subsz struct { + ID string `json:"server_id"` + Now time.Time `json:"now"` *SublistStats Total int `json:"total"` Offset int `json:"offset"` @@ -827,19 +829,21 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { } } - s.mu.Lock() - gaccSl := s.gacc.sl - s.mu.Unlock() + slStats := &SublistStats{} // FIXME(dlc) - Make account aware. - sz := &Subsz{gaccSl.Stats(), 0, offset, limit, nil} + sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil} if subdetail { - // Now add in subscription's details var raw [4096]*subscription subs := raw[:0] + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) + slStats.add(acc.sl.Stats()) + acc.sl.localSubs(&subs) + return true + }) - gaccSl.localSubs(&subs) details := make([]SubDetail, len(subs)) i := 0 // TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering. @@ -870,6 +874,12 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) { } sz.Subs = details[minoff:maxoff] sz.Total = len(sz.Subs) + } else { + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) + slStats.add(acc.sl.Stats()) + return true + }) } return sz, nil diff --git a/server/monitor_test.go b/server/monitor_test.go index ed21d57f757..ca641424f87 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -59,6 +59,18 @@ func runMonitorServer() *Server { return RunServer(opts) } +func runMonitorServerWithAccounts() *Server { + resetPreviousHTTPConnections() + opts := DefaultMonitorOptions() + aA := &Account{Name: "A"} + aB := &Account{Name: "B"} + opts.Accounts = append(opts.Accounts, aA, aB) + opts.Users = append(opts.Users, + &User{Username: "a", Password: "a", Account: aA}, + &User{Username: "b", Password: "b", Account: aB}) + return RunServer(opts) +} + func runMonitorServerNoHTTPPort() *Server { resetPreviousHTTPConnections() opts := DefaultMonitorOptions() @@ -1471,6 +1483,93 @@ func TestSubszTestPubSubject(t *testing.T) { readBodyEx(t, testUrl+"test=foo..bar", http.StatusBadRequest, textPlain) } +func TestSubszMultiAccount(t *testing.T) { + s := runMonitorServerWithAccounts() + defer s.Shutdown() + + ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a") + defer ncA.Close() + + ncA.Subscribe("foo.*", func(m *nats.Msg) {}) + ncA.Subscribe("foo.bar", func(m *nats.Msg) {}) + ncA.Subscribe("foo.foo", func(m *nats.Msg) {}) + + ncA.Publish("foo.bar", []byte("Hello")) + ncA.Publish("foo.baz", []byte("Hello")) + ncA.Publish("foo.foo", []byte("Hello")) + + ncA.Flush() + + ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b") + defer ncB.Close() + + ncB.Subscribe("foo.*", func(m *nats.Msg) {}) + ncB.Subscribe("foo.bar", func(m *nats.Msg) {}) + ncB.Subscribe("foo.foo", func(m *nats.Msg) {}) + + ncB.Publish("foo.bar", []byte("Hello")) + ncB.Publish("foo.baz", []byte("Hello")) + ncB.Publish("foo.foo", []byte("Hello")) + + ncB.Flush() + + url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port) + + for mode := 0; mode < 2; mode++ { + sl := pollSubsz(t, s, mode, url+"subsz?subs=1", &SubszOptions{Subscriptions: true}) + if sl.NumSubs != 6 { + t.Fatalf("Expected NumSubs of 6, got %d\n", sl.NumSubs) + } + if sl.Total != 6 { + t.Fatalf("Expected Total of 6, got %d\n", sl.Total) + } + if len(sl.Subs) != 6 { + t.Fatalf("Expected subscription details for 6 subs, got %d\n", len(sl.Subs)) + } + } +} + +func TestSubszMultiAccountWithOffsetAndLimit(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a") + defer ncA.Close() + + for i := 0; i < 200; i++ { + ncA.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {}) + } + ncA.Flush() + + ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b") + defer ncB.Close() + + for i := 0; i < 200; i++ { + ncB.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {}) + } + ncB.Flush() + + url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port) + for mode := 0; mode < 2; mode++ { + sl := pollSubsz(t, s, mode, url+"subsz?subs=1&offset=10&limit=100", &SubszOptions{Subscriptions: true, Offset: 10, Limit: 100}) + if sl.NumSubs != 400 { + t.Fatalf("Expected NumSubs of 200, got %d\n", sl.NumSubs) + } + if sl.Total != 100 { + t.Fatalf("Expected Total of 100, got %d\n", sl.Total) + } + if sl.Offset != 10 { + t.Fatalf("Expected Offset of 10, got %d\n", sl.Offset) + } + if sl.Limit != 100 { + t.Fatalf("Expected Total of 100, got %d\n", sl.Limit) + } + if len(sl.Subs) != 100 { + t.Fatalf("Expected subscription details for 100 subs, got %d\n", len(sl.Subs)) + } + } +} + // Tests handle root func TestHandleRoot(t *testing.T) { s := runMonitorServer() @@ -1735,8 +1834,13 @@ func TestConnzClosedConnsBadTLSClient(t *testing.T) { } // Create a connection to test ConnInfo -func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn { - natsURL := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port) +func createClientConnWithUserSubscribeAndPublish(t *testing.T, s *Server, user, pwd string) *nats.Conn { + natsURL := "" + if user == "" { + natsURL = fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port) + } else { + natsURL = fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pwd, s.Addr().(*net.TCPAddr).Port) + } client := nats.DefaultOptions client.Servers = []string{natsURL} nc, err := client.Connect() @@ -1759,6 +1863,10 @@ func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn { return nc } +func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn { + return createClientConnWithUserSubscribeAndPublish(t, s, "", "") +} + func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn { natsURI := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port) diff --git a/server/sublist.go b/server/sublist.go index 7e772e655ff..ddd2d69c74e 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -700,6 +700,28 @@ type SublistStats struct { CacheHitRate float64 `json:"cache_hit_rate"` MaxFanout uint32 `json:"max_fanout"` AvgFanout float64 `json:"avg_fanout"` + totFanout int + cacheCnt int +} + +func (s *SublistStats) add(stat *SublistStats) { + s.NumSubs += stat.NumSubs + s.NumCache += stat.NumCache + s.NumInserts += stat.NumInserts + s.NumRemoves += stat.NumRemoves + s.NumMatches += stat.NumMatches + s.CacheHitRate += stat.CacheHitRate + if s.MaxFanout < stat.MaxFanout { + s.MaxFanout = stat.MaxFanout + } + + // ignore slStats.AvgFanout, collect the values + // it's based on instead + s.totFanout += stat.totFanout + s.cacheCnt += stat.cacheCnt + if s.totFanout > 0 { + s.AvgFanout = float64(s.totFanout) / float64(s.cacheCnt) + } } // Stats will return a stats structure for the current state. @@ -735,6 +757,8 @@ func (s *Sublist) Stats() *SublistStats { } return true }) + st.totFanout = tot + st.cacheCnt = clen st.MaxFanout = uint32(max) if tot > 0 { st.AvgFanout = float64(tot) / float64(clen)