Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] subsz monitoring endpoint did not account for accounts. #1377

Merged
merged 1 commit into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {

// Subsz represents detail information on current connections.
type Subsz struct {
ID string `json:"server_id"`
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
Now time.Time `json:"now"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry a bit late to the party here - but as we use timestamp elsewhere for the typed events can we use the same here given its a new field?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this is an addition, all of the other monitoring endpoints use now so I would prefer to keep that for consistency with the HTTP endpoints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ripienaar I kept it the same way to be identical with the rest. If we expose the monitoring endpoints via nats, these are wrapped and the outer json will adhere to what we discussed.

*SublistStats
Total int `json:"total"`
Offset int `json:"offset"`
Expand Down Expand Up @@ -827,19 +829,21 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
}

s.mu.Lock()
gaccSl := s.gacc.sl
s.mu.Unlock()
slStats := &SublistStats{}

// FIXME(dlc) - Make account aware.
sz := &Subsz{gaccSl.Stats(), 0, offset, limit, nil}
sz := &Subsz{s.info.ID, time.Now(), slStats, 0, offset, limit, nil}

if subdetail {
// Now add in subscription's details
var raw [4096]*subscription
subs := raw[:0]
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
slStats.add(acc.sl.Stats())
acc.sl.localSubs(&subs)
return true
})

gaccSl.localSubs(&subs)
details := make([]SubDetail, len(subs))
i := 0
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
Expand Down Expand Up @@ -870,6 +874,12 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
}
sz.Subs = details[minoff:maxoff]
sz.Total = len(sz.Subs)
} else {
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
slStats.add(acc.sl.Stats())
return true
})
}

return sz, nil
Expand Down
112 changes: 110 additions & 2 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ func runMonitorServer() *Server {
return RunServer(opts)
}

func runMonitorServerWithAccounts() *Server {
resetPreviousHTTPConnections()
opts := DefaultMonitorOptions()
aA := NewAccount("A")
aB := NewAccount("B")
opts.Accounts = append(opts.Accounts, aA, aB)
opts.Users = append(opts.Users,
&User{Username: "a", Password: "a", Account: aA},
&User{Username: "b", Password: "b", Account: aB})
return RunServer(opts)
}

func runMonitorServerNoHTTPPort() *Server {
resetPreviousHTTPConnections()
opts := DefaultMonitorOptions()
Expand Down Expand Up @@ -1471,6 +1483,93 @@ func TestSubszTestPubSubject(t *testing.T) {
readBodyEx(t, testUrl+"test=foo..bar", http.StatusBadRequest, textPlain)
}

func TestSubszMultiAccount(t *testing.T) {
s := runMonitorServerWithAccounts()
defer s.Shutdown()

ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
defer ncA.Close()

ncA.Subscribe("foo.*", func(m *nats.Msg) {})
ncA.Subscribe("foo.bar", func(m *nats.Msg) {})
ncA.Subscribe("foo.foo", func(m *nats.Msg) {})

ncA.Publish("foo.bar", []byte("Hello"))
ncA.Publish("foo.baz", []byte("Hello"))
ncA.Publish("foo.foo", []byte("Hello"))

ncA.Flush()

ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
defer ncB.Close()

ncB.Subscribe("foo.*", func(m *nats.Msg) {})
ncB.Subscribe("foo.bar", func(m *nats.Msg) {})
ncB.Subscribe("foo.foo", func(m *nats.Msg) {})

ncB.Publish("foo.bar", []byte("Hello"))
ncB.Publish("foo.baz", []byte("Hello"))
ncB.Publish("foo.foo", []byte("Hello"))

ncB.Flush()

url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)

for mode := 0; mode < 2; mode++ {
sl := pollSubsz(t, s, mode, url+"subsz?subs=1", &SubszOptions{Subscriptions: true})
if sl.NumSubs != 6 {
t.Fatalf("Expected NumSubs of 6, got %d\n", sl.NumSubs)
}
if sl.Total != 6 {
t.Fatalf("Expected Total of 6, got %d\n", sl.Total)
}
if len(sl.Subs) != 6 {
t.Fatalf("Expected subscription details for 6 subs, got %d\n", len(sl.Subs))
}
}
}

func TestSubszMultiAccountWithOffsetAndLimit(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()

ncA := createClientConnWithUserSubscribeAndPublish(t, s, "a", "a")
defer ncA.Close()

for i := 0; i < 200; i++ {
ncA.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
}
ncA.Flush()

ncB := createClientConnWithUserSubscribeAndPublish(t, s, "b", "b")
defer ncB.Close()

for i := 0; i < 200; i++ {
ncB.Subscribe(fmt.Sprintf("foo.%d", i), func(m *nats.Msg) {})
}
ncB.Flush()

url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
sl := pollSubsz(t, s, mode, url+"subsz?subs=1&offset=10&limit=100", &SubszOptions{Subscriptions: true, Offset: 10, Limit: 100})
if sl.NumSubs != 400 {
t.Fatalf("Expected NumSubs of 200, got %d\n", sl.NumSubs)
}
if sl.Total != 100 {
t.Fatalf("Expected Total of 100, got %d\n", sl.Total)
}
if sl.Offset != 10 {
t.Fatalf("Expected Offset of 10, got %d\n", sl.Offset)
}
if sl.Limit != 100 {
t.Fatalf("Expected Total of 100, got %d\n", sl.Limit)
}
if len(sl.Subs) != 100 {
t.Fatalf("Expected subscription details for 100 subs, got %d\n", len(sl.Subs))
}
}
}

// Tests handle root
func TestHandleRoot(t *testing.T) {
s := runMonitorServer()
Expand Down Expand Up @@ -1735,8 +1834,13 @@ func TestConnzClosedConnsBadTLSClient(t *testing.T) {
}

// Create a connection to test ConnInfo
func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
natsURL := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
func createClientConnWithUserSubscribeAndPublish(t *testing.T, s *Server, user, pwd string) *nats.Conn {
natsURL := ""
if user == "" {
natsURL = fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
} else {
natsURL = fmt.Sprintf("nats://%s:%s@127.0.0.1:%d", user, pwd, s.Addr().(*net.TCPAddr).Port)
}
client := nats.DefaultOptions
client.Servers = []string{natsURL}
nc, err := client.Connect()
Expand All @@ -1759,6 +1863,10 @@ func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
return nc
}

func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
return createClientConnWithUserSubscribeAndPublish(t, s, "", "")
}

func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn {
natsURI := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)

Expand Down
24 changes: 24 additions & 0 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,28 @@ type SublistStats struct {
CacheHitRate float64 `json:"cache_hit_rate"`
MaxFanout uint32 `json:"max_fanout"`
AvgFanout float64 `json:"avg_fanout"`
totFanout int
cacheCnt int
}

func (s *SublistStats) add(stat *SublistStats) {
s.NumSubs += stat.NumSubs
s.NumCache += stat.NumCache
s.NumInserts += stat.NumInserts
s.NumRemoves += stat.NumRemoves
s.NumMatches += stat.NumMatches
s.CacheHitRate += stat.CacheHitRate
if s.MaxFanout < stat.MaxFanout {
s.MaxFanout = stat.MaxFanout
}

// ignore slStats.AvgFanout, collect the values
// it's based on instead
s.totFanout += stat.totFanout
s.cacheCnt += stat.cacheCnt
if s.totFanout > 0 {
s.AvgFanout = float64(s.totFanout) / float64(s.cacheCnt)
}
}

// Stats will return a stats structure for the current state.
Expand Down Expand Up @@ -735,6 +757,8 @@ func (s *Sublist) Stats() *SublistStats {
}
return true
})
st.totFanout = tot
st.cacheCnt = clen
st.MaxFanout = uint32(max)
if tot > 0 {
st.AvgFanout = float64(tot) / float64(clen)
Expand Down