Skip to content

Commit

Permalink
Merge pull request #1377 from nats-io/subsz
Browse files Browse the repository at this point in the history
[FIXED] subsz monitoring endpoint did not account for accounts.
  • Loading branch information
kozlovic authored May 6, 2020
2 parents b1f1e87 + 136feb9 commit 7e60769
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 8 deletions.
22 changes: 16 additions & 6 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {

// Subsz represents detail information on current connections.
type Subsz struct {
ID string `json:"server_id"`
Now time.Time `json:"now"`
*SublistStats
Total int `json:"total"`
Offset int `json:"offset"`
Expand Down Expand Up @@ -827,19 +829,21 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
}

s.mu.Lock()
gaccSl := s.gacc.sl
s.mu.Unlock()
slStats := &SublistStats{}

// FIXME(dlc) - Make account aware.
sz := &Subsz{gaccSl.Stats(), 0, offset, limit, nil}
sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil}

if subdetail {
// Now add in subscription's details
var raw [4096]*subscription
subs := raw[:0]
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
slStats.add(acc.sl.Stats())
acc.sl.localSubs(&subs)
return true
})

gaccSl.localSubs(&subs)
details := make([]SubDetail, len(subs))
i := 0
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
Expand Down Expand Up @@ -870,6 +874,12 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
sz.Subs = details[minoff:maxoff]
sz.Total = len(sz.Subs)
} else {
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
slStats.add(acc.sl.Stats())
return true
})
}

return sz, nil
Expand Down
112 changes: 110 additions & 2 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ func runMonitorServer() *Server {
return RunServer(opts)
}

func runMonitorServerWithAccounts() *Server {
resetPreviousHTTPConnections()
opts := DefaultMonitorOptions()
aA := NewAccount("A")
aB := NewAccount("B")
opts.Accounts = append(opts.Accounts, aA, aB)
opts.Users = append(opts.Users,
&User{Username: "a", Password: "a", Account: aA},
&User{Username: "b", Password: "b", Account: aB})
return RunServer(opts)
}

func runMonitorServerNoHTTPPort() *Server {
resetPreviousHTTPConnections()
opts := DefaultMonitorOptions()
Expand Down Expand Up @@ -1471,6 +1483,93 @@ func TestSubszTestPubSubject(t *testing.T) {
readBodyEx(t, testUrl+"test=foo..bar", http.StatusBadRequest, textPlain)
}

func TestSubszMultiAccount(t *testing.T) {
s := runMonitorServerWithAccounts()
defer s.Shutdown()

ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
defer ncA.Close()

ncA.Subscribe("foo.*", func(m *nats.Msg) {})
ncA.Subscribe("foo.bar", func(m *nats.Msg) {})
ncA.Subscribe("foo.foo", func(m *nats.Msg) {})

ncA.Publish("foo.bar", []byte("Hello"))
ncA.Publish("foo.baz", []byte("Hello"))
ncA.Publish("foo.foo", []byte("Hello"))

ncA.Flush()

ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
defer ncB.Close()

ncB.Subscribe("foo.*", func(m *nats.Msg) {})
ncB.Subscribe("foo.bar", func(m *nats.Msg) {})
ncB.Subscribe("foo.foo", func(m *nats.Msg) {})

ncB.Publish("foo.bar", []byte("Hello"))
ncB.Publish("foo.baz", []byte("Hello"))
ncB.Publish("foo.foo", []byte("Hello"))

ncB.Flush()

url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)

for mode := 0; mode < 2; mode++ {
sl := pollSubsz(t, s, mode, url+"subsz?subs=1", &SubszOptions{Subscriptions: true})
if sl.NumSubs != 6 {
t.Fatalf("Expected NumSubs of 6, got %d\n", sl.NumSubs)
}
if sl.Total != 6 {
t.Fatalf("Expected Total of 6, got %d\n", sl.Total)
}
if len(sl.Subs) != 6 {
t.Fatalf("Expected subscription details for 6 subs, got %d\n", len(sl.Subs))
}
}
}

func TestSubszMultiAccountWithOffsetAndLimit(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()

ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
defer ncA.Close()

for i := 0; i < 200; i++ {
ncA.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
}
ncA.Flush()

ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
defer ncB.Close()

for i := 0; i < 200; i++ {
ncB.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
}
ncB.Flush()

url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
sl := pollSubsz(t, s, mode, url+"subsz?subs=1&offset=10&limit=100", &SubszOptions{Subscriptions: true, Offset: 10, Limit: 100})
if sl.NumSubs != 400 {
t.Fatalf("Expected NumSubs of 200, got %d\n", sl.NumSubs)
}
if sl.Total != 100 {
t.Fatalf("Expected Total of 100, got %d\n", sl.Total)
}
if sl.Offset != 10 {
t.Fatalf("Expected Offset of 10, got %d\n", sl.Offset)
}
if sl.Limit != 100 {
t.Fatalf("Expected Total of 100, got %d\n", sl.Limit)
}
if len(sl.Subs) != 100 {
t.Fatalf("Expected subscription details for 100 subs, got %d\n", len(sl.Subs))
}
}
}

// Tests handle root
func TestHandleRoot(t *testing.T) {
s := runMonitorServer()
Expand Down Expand Up @@ -1735,8 +1834,13 @@ func TestConnzClosedConnsBadTLSClient(t *testing.T) {
}

// Create a connection to test ConnInfo
func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
natsURL := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
func createClientConnWithUserSubscribeAndPublish(t *testing.T, s *Server, user, pwd string) *nats.Conn {
natsURL := ""
if user == "" {
natsURL = fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
} else {
natsURL = fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pwd, s.Addr().(*net.TCPAddr).Port)
}
client := nats.DefaultOptions
client.Servers = []string{natsURL}
nc, err := client.Connect()
Expand All @@ -1759,6 +1863,10 @@ func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
return nc
}

func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
return createClientConnWithUserSubscribeAndPublish(t, s, "", "")
}

func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn {
natsURI := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)

Expand Down
24 changes: 24 additions & 0 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,28 @@ type SublistStats struct {
CacheHitRate float64 `json:"cache_hit_rate"`
MaxFanout uint32 `json:"max_fanout"`
AvgFanout float64 `json:"avg_fanout"`
totFanout int
cacheCnt int
}

func (s *SublistStats) add(stat *SublistStats) {
s.NumSubs += stat.NumSubs
s.NumCache += stat.NumCache
s.NumInserts += stat.NumInserts
s.NumRemoves += stat.NumRemoves
s.NumMatches += stat.NumMatches
s.CacheHitRate += stat.CacheHitRate
if s.MaxFanout < stat.MaxFanout {
s.MaxFanout = stat.MaxFanout
}

// ignore slStats.AvgFanout, collect the values
// it's based on instead
s.totFanout += stat.totFanout
s.cacheCnt += stat.cacheCnt
if s.totFanout > 0 {
s.AvgFanout = float64(s.totFanout) / float64(s.cacheCnt)
}
}

// Stats will return a stats structure for the current state.
Expand Down Expand Up @@ -735,6 +757,8 @@ func (s *Sublist) Stats() *SublistStats {
}
return true
})
st.totFanout = tot
st.cacheCnt = clen
st.MaxFanout = uint32(max)
if tot > 0 {
st.AvgFanout = float64(tot) / float64(clen)
Expand Down

0 comments on commit 7e60769

Please sign in to comment.