Skip to content

Commit

Permalink
Merge pull request #10 from wallyqs/toggle-subscriptions
Browse files Browse the repository at this point in the history
Display subscriptions from a connection
  • Loading branch information
derekcollison committed Aug 20, 2015
2 parents 5bd5db2 + f3b8051 commit 7e0b040
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 18 deletions.
64 changes: 55 additions & 9 deletions nats-top.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"sort"
"strings"
"time"

ui "github.com/gizak/termui"
Expand Down Expand Up @@ -139,13 +140,35 @@ func generateParagraph(
outMsgs, outBytes, outMsgsRate, outBytesRate)
text += fmt.Sprintf("\n\nConnections: %d\n", numConns)

connHeader := " %-20s %-8s %-6s %-10s %-10s %-10s %-10s %-10s %-7s %-7s\n"
var displaySubs bool
if val, ok := opts["subs"]; ok {
displaySubs = val.(bool)
}

connRows := fmt.Sprintf(connHeader, "HOST", "CID", "SUBS", "PENDING",
"MSGS_TO", "MSGS_FROM", "BYTES_TO", "BYTES_FROM",
"LANG", "VERSION")
connHeader := " %-20s %-8s %-6s %-10s %-10s %-10s %-10s %-10s %-7s %-7s "
if displaySubs {
connHeader += "%13s"
}
connHeader += "\n"

var connRows string
var connValues string
if displaySubs {
connRows = fmt.Sprintf(connHeader, "HOST", "CID", "SUBS", "PENDING",
"MSGS_TO", "MSGS_FROM", "BYTES_TO", "BYTES_FROM",
"LANG", "VERSION", "SUBSCRIPTIONS")
} else {
connRows = fmt.Sprintf(connHeader, "HOST", "CID", "SUBS", "PENDING",
"MSGS_TO", "MSGS_FROM", "BYTES_TO", "BYTES_FROM",
"LANG", "VERSION")
}
text += connRows
connValues := " %-20s %-8d %-6d %-10s %-10s %-10s %-10s %-10s %-7s %-7s\n"

connValues = " %-20s %-8d %-6d %-10s %-10s %-10s %-10s %-10s %-7s %-7s "
if displaySubs {
connValues += "%s"
}
connValues += "\n"

switch opts["sort"] {
case SortByCid:
Expand All @@ -164,9 +187,19 @@ func generateParagraph(

for _, conn := range stats.Connz.Conns {
host := fmt.Sprintf("%s:%d", conn.IP, conn.Port)
connLine := fmt.Sprintf(connValues, host, conn.Cid, conn.NumSubs, Psize(int64(conn.Pending)),
Psize(conn.OutMsgs), Psize(conn.InMsgs), Psize(conn.OutBytes), Psize(conn.InBytes),
conn.Lang, conn.Version)

var connLine string
if displaySubs {
subs := strings.Join(conn.Subs, ", ")
connLine = fmt.Sprintf(connValues, host, conn.Cid, conn.NumSubs, Psize(int64(conn.Pending)),
Psize(conn.OutMsgs), Psize(conn.InMsgs), Psize(conn.OutBytes), Psize(conn.InBytes),
conn.Lang, conn.Version, subs)
} else {
connLine = fmt.Sprintf(connValues, host, conn.Cid, conn.NumSubs, Psize(int64(conn.Pending)),
Psize(conn.OutMsgs), Psize(conn.InMsgs), Psize(conn.OutBytes), Psize(conn.InBytes),
conn.Lang, conn.Version)
}

text += connLine
}

Expand Down Expand Up @@ -241,6 +274,7 @@ func StartUI(
// Flags for capturing options
waitingSortOption := false
waitingLimitOption := false
displaySubscriptions := false

optionBuf := ""
refreshOptionHeader := func() {
Expand Down Expand Up @@ -332,6 +366,16 @@ func StartUI(
cleanExit()
}

if e.Type == ui.EventKey && e.Ch == 's' && !(waitingLimitOption || waitingSortOption) {
if displaySubscriptions {
displaySubscriptions = false
opts["subs"] = false
} else {
displaySubscriptions = true
opts["subs"] = true
}
}

if e.Type == ui.EventKey && viewMode == HelpViewMode {
ui.Body.Rows = topViewGrid.Rows
viewMode = TopViewMode
Expand Down Expand Up @@ -390,7 +434,9 @@ n<limit> Set sample size of connections to request from the server.
would respect both options allowing queries like 'connection
with largest number of subscriptions': -n 1 -sort subs
q Quit nats-top
s Toggle displaying connection subscriptions.
q Quit nats-top.
Press any key to continue...
Expand Down
14 changes: 9 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ Server:
Out: Msgs: 68.3K Bytes: 6.0M Msgs/Sec: 75.8 Bytes/Sec: 6779.4

Connections: 4
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION
127.0.0.1:56134 2 5 0 11.6K 11.6K 1.1M 905.1K go 1.1.0
127.0.1.1:56138 3 1 0 34.2K 0 3.0M 0 go 1.1.0
127.0.0.1:56144 4 5 0 11.2K 11.1K 873.5K 1.1M go 1.1.0
127.0.0.1:56151 5 8 0 11.4K 11.5K 1014.6K 1.0M go 1.1.0
HOST CID SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION SUBSCRIPTIONS
127.0.0.1:56134 2 5 0 11.6K 11.6K 1.1M 905.1K go 1.1.0 foo, hello
127.0.1.1:56138 3 1 0 34.2K 0 3.0M 0 go 1.1.0 _INBOX.a96f3f6853616154d23d1b5072
127.0.0.1:56144 4 5 0 11.2K 11.1K 873.5K 1.1M go 1.1.0 foo, hello
127.0.0.1:56151 5 8 0 11.4K 11.5K 1014.6K 1.0M go 1.1.0 foo, hello
```

## Install
Expand Down Expand Up @@ -74,6 +74,10 @@ While in top view, it is possible to use the following commands:
both options enabling queries like _connection with largest number of subscriptions_:
`nats-top -n 1 -sort subs`

- **s**

Toggle displaying connection subscriptions.

- **?**

Show help message with options.
Expand Down
38 changes: 34 additions & 4 deletions test/toputils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package test

import (
"fmt"
"net"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -52,6 +54,18 @@ func TestFetchingStatz(t *testing.T) {
t.Fatalf("Could not monitor number of cores. got: %v", got)
}

// Create simple subscription to gnatsd port to show subscriptions
go func() {
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", GNATSD_PORT))
if err != nil {
t.Fatalf("could not create subcription to NATS: ", err)
}
fmt.Fprintf(conn, "SUB hello.world 90\r\n")
time.Sleep(5 * time.Second)
conn.Close()
}()
time.Sleep(1 * time.Second)

var connz *server.Connz
result, err = Request("/connz", params)
if err != nil {
Expand All @@ -62,10 +76,26 @@ func TestFetchingStatz(t *testing.T) {
connz = connzVal
}

// Check for default value of connections limit
got = connz.Limit
if got != 1024 {
t.Fatalf("Could not monitor limit of connections. got: %v", got)
// Check that we got connections
got = len(connz.Conns)
if got <= 0 {
t.Fatalf("Could not monitor with subscriptions option. expected non-nil conns, got: %v", got)
}

params["subs"] = true
result, err = Request("/connz", params)
if err != nil {
t.Fatalf("Failed getting /connz: %v", err)
}

if connzVal, ok := result.(*server.Connz); ok {
connz = connzVal
}

// Check that we got subscriptions
got = len(connz.Conns[0].Subs)
if got <= 0 {
t.Fatalf("Could not monitor with client subscriptions. expected client with subscriptions, got: %v", got)
}

s.Shutdown()
Expand Down
7 changes: 7 additions & 0 deletions util/toputils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
gnatsd "github.com/nats-io/gnatsd/server"
)

const DisplaySubscriptions = 1

// Request takes a path and options, and returns a Stats struct
// with with either connz or varz
func Request(path string, opts map[string]interface{}) (interface{}, error) {
Expand All @@ -22,6 +24,11 @@ func Request(path string, opts map[string]interface{}) (interface{}, error) {
case "/connz":
statz = &gnatsd.Connz{}
uri += fmt.Sprintf("?limit=%d&sort=%s", opts["conns"], opts["sort"])
if displaySubs, ok := opts["subs"]; ok {
if displaySubs.(bool) {
uri += fmt.Sprintf("&subs=%d", DisplaySubscriptions)
}
}
default:
return nil, fmt.Errorf("invalid path '%s' for stats server", path)
}
Expand Down

0 comments on commit 7e0b040

Please sign in to comment.