Skip to content

Commit

Permalink
Merge branch 'main' into oderwat-translations
Browse files Browse the repository at this point in the history
  • Loading branch information
ripienaar authored Mar 31, 2023
2 parents aa23bb2 + 520c0e6 commit b39e5dc
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 12 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Output
Next we publish 5 messages with a counter and timestamp in the format `message 5 @ 2020-12-03T12:33:18+01:00`:

```
nats pub cli.demo "message {{.Count}} @ {{.TimeStamp}}" --count=10
nats pub cli.demo "message {{.Count}} @ {{.TimeStamp}}" --count=5
```
Output
```
Expand Down Expand Up @@ -191,7 +191,7 @@ hello headers
### match requests and replies
We can print matching replay-requests together
```
sub --match-replies subject.name
nats sub --match-replies cli.demo
```
Output
```
Expand Down Expand Up @@ -246,7 +246,7 @@ Output
In another shell we can send a request to this service:

```
nats request cli.weather.london
nats request "cli.weather.london" ''
```
Output
```
Expand Down
3 changes: 3 additions & 0 deletions cli/cheats/sub.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ nats sub subject --dump=- | xargs -0 -n 1 -I "{}" sh -c "echo '{}' | wc -c"

# To receive new messages received in a stream with the subject ORDERS.new
nats sub ORDERS.new --next

# To report the number of subjects with message and byte count. The default `--report-top` is 10
nats sub ">" --report-subjects --report-top=20
4 changes: 4 additions & 0 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
}
}

if len(ncfg.BackOff) > 0 && ncfg.AckWait != t.AckWait {
return fmt.Errorf("consumers with backoff policies do not support editing Ack Wait")
}

// sort strings to subject lists that only differ in ordering is considered equal
sorter := cmp.Transformer("Sort", func(in []string) []string {
out := append([]string(nil), in...)
Expand Down
7 changes: 2 additions & 5 deletions cli/server_run_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ logtime: false
{{- if .JetStream }}
jetstream {
store_dir: {{ .StoreDir }}
store_dir: "{{ .StoreDir | escape }}"
{{- if .JSDomain }}
domain: {{ .JSDomain }}
{{- end }}
Expand Down Expand Up @@ -258,10 +258,7 @@ func (c *SrvRunCmd) prepareConfig() error {
return err
}
c.config.StoreDir = filepath.Join(parent, "nats", c.config.Name)
if runtime.GOOS == "windows" {
// escape path separator in file
c.config.StoreDir = strings.ReplaceAll(c.config.StoreDir, "\\", "\\\\")
}

if c.config.ExtendWithContext || c.config.ExtendDemoNetwork {
c.config.JSDomain = strings.ToUpper(c.config.Name)
}
Expand Down
105 changes: 101 additions & 4 deletions cli/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"

"github.com/choria-io/fisk"
"github.com/dustin/go-humanize"
"github.com/nats-io/jsm.go"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
Expand All @@ -44,6 +47,8 @@ type subCmd struct {
sseq uint64
deliverAll bool
deliverNew bool
reportSubjects bool
reportSubjectsCount int
deliverLast bool
deliverSince string
deliverLastPerSubject bool
Expand Down Expand Up @@ -79,12 +84,76 @@ func configureSubCommand(app commandHost) {
act.Flag("stream", "Subscribe to a specific stream (required JetStream)").PlaceHolder("STREAM").StringVar(&c.stream)
act.Flag("ignore-subject", "Subjects for which corresponding messages will be ignored and therefore not shown in the output").Short('I').PlaceHolder("SUBJECT").StringsVar(&c.ignoreSubjects)
act.Flag("wait", "Max time to wait before unsubscribing.").DurationVar(&c.wait)
act.Flag("report-subjects", "Subscribes to a subject pattern and builds a de-duplicated report of active subjects receiving data").UnNegatableBoolVar(&c.reportSubjects)
act.Flag("report-top", "Number of subjects to show when doing 'report-subjects'. Default is 10.").Default("10").IntVar(&c.reportSubjectsCount)
}

func init() {
registerCommand("sub", 17, configureSubCommand)
}

func startSubjectReporting(ctx context.Context, subjMu *sync.Mutex, subjectReportMap map[string]int64, subjectBytesReportMap map[string]int64, subjCount int) {

go func() {
ticker := time.NewTicker(time.Second)

for {
select {
case <-ctx.Done():
ticker.Stop()
case <-ticker.C:

subjectRows := [][]any{}

if runtime.GOOS != "windows" {
fmt.Print("\033[2J")
fmt.Print("\033[H")
}

totalBytes := int64(0)
totalCount := int64(0)

subjMu.Lock()
keys := make([]string, 0, len(subjectReportMap))

for k := range subjectReportMap {
keys = append(keys, k)
}
// sort.Strings(keys)
// sort by count in descending order
sort.Slice(keys, func(i, j int) bool {
return subjectReportMap[keys[i]] > subjectReportMap[keys[j]]
})

for count, k := range keys {

subjectRows = append(subjectRows, []any{k, humanize.Comma(subjectReportMap[k]), humanize.IBytes(uint64(subjectBytesReportMap[k]))})
totalCount += subjectReportMap[k]
totalBytes += subjectBytesReportMap[k]
if (count + 1) == subjCount {
break
}
}
subjMu.Unlock()

tableHeaderString := ""
if subjCount == 1 {
tableHeaderString = "Top Subject Report"
} else {
tableHeaderString = fmt.Sprintf("Top %d Active Subjects Report", subjCount)
}
table := newTableWriter(tableHeaderString)
table.AddHeaders("Subject", "Message Count", "Bytes")
table.AddFooter("Totals", humanize.Comma(totalCount), humanize.IBytes(uint64(totalBytes)))
for i := range subjectRows {
table.AddRow(subjectRows[i]...)
}
fmt.Println(table.Render())
}
}
}()
}

func (c *subCmd) subscribe(p *fisk.ParseContext) error {
nc, err := newNatsConn("", natsOpts()...)
if err != nil {
Expand All @@ -111,16 +180,24 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
return fmt.Errorf("queue group subscriptions are not supported with JetStream")
}

if c.reportSubjects && c.reportSubjectsCount == 0 {
return fmt.Errorf("subject count must be at least one")
}

var (
sub *nats.Subscription
mu = sync.Mutex{}
subjMu = sync.Mutex{}
dump = c.dump != ""
ctr = uint(0)
ignoreSubjects = splitCLISubjects(c.ignoreSubjects)
ctx, cancel = context.WithCancel(ctx)

replySub *nats.Subscription
matchMap map[string]*nats.Msg

subjectReportMap map[string]int64
subjectBytesReportMap map[string]int64
)
defer cancel()

Expand Down Expand Up @@ -170,10 +247,20 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
}

ctr++
if c.match && m.Reply != "" {
matchMap[m.Reply] = m
} else {
printMsg(c, m, nil, ctr)
if c.reportSubjects {
subjMu.Lock()
subjectReportMap[m.Subject]++
subjectBytesReportMap[m.Subject] += int64(len(m.Data))
subjMu.Unlock()
}

// if we're not reporting on subjects, then print the message
if !c.reportSubjects {
if c.match && m.Reply != "" {
matchMap[m.Reply] = m
} else {
printMsg(c, m, nil, ctr)
}
}

if ctr == c.limit {
Expand Down Expand Up @@ -228,6 +315,11 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
}
}

if c.reportSubjects {
subjectReportMap = make(map[string]int64)
subjectBytesReportMap = make(map[string]int64)
}

var ignoredSubjInfo string
if len(ignoreSubjects) > 0 {
ignoredSubjInfo = fmt.Sprintf("\nIgnored subjects: %s", strings.Join(ignoreSubjects, ", "))
Expand All @@ -245,6 +337,10 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
}

switch {
case c.reportSubjects:
sub, err = nc.Subscribe(c.subject, handler)
startSubjectReporting(ctx, &subjMu, subjectReportMap, subjectBytesReportMap, c.reportSubjectsCount)

case c.jetStream:
var js nats.JetStreamContext
js, err = nc.JetStream()
Expand Down Expand Up @@ -329,6 +425,7 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {

case c.queue != "":
sub, err = nc.QueueSubscribe(c.subject, c.queue, handler)

default:
sub, err = nc.Subscribe(c.subject, handler)
}
Expand Down
1 change: 1 addition & 0 deletions cli/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func prepareJSHelper() (*nats.Conn, nats.JetStreamContext, error) {
jso := []nats.JSOpt{
nats.Domain(opts.JsDomain),
nats.APIPrefix(opts.JsApiPrefix),
nats.MaxWait(opts.Timeout),
}

if opts.Trace {
Expand Down

0 comments on commit b39e5dc

Please sign in to comment.