Skip to content

Commit

Permalink
Merge pull request #739 from oderwat/oderwat-translations
Browse files Browse the repository at this point in the history
Adding the possibility to translate the msg.Data before output.
  • Loading branch information
ripienaar authored Mar 31, 2023
2 parents 2d555bf + 0ddb1b4 commit a63e6fd
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 25 deletions.
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,67 @@ newyork: ☀️ +2°C
Now the `nats` CLI parses the subject, extracts the `{london,newyork}` from the subjects and calls `curl`, replacing
`{{2}}` with the body of the 2nd subject token - `{london,newyork}`.

## Translating message data using a converter command

Additional to the raw output of messages using `nats sub` and `nats stream view` you can also translate the message data by running it through a command.

The command receives the message data as raw bytes through stdin and the output of the command will be the shown output for the message. There is the additional possibility to add the filter subject by using `{{Subject}}` as part of the arguments for the tranlation command.

#### Examples for using the translation feature:

Here we use the [jq](https://github.com/stedolan/jq) tool to format our json message paylot into a more readable format:

We subscribe to a subject that will receive json data.
```
nats sub --translate 'jq .' cli.json
```
Now we publish some example data.
```
nats pub cli.json '{"task":"demo","duration":60}'
```

The Output will show the message formatted.
```
23:54:35 Subscribing on cli.json
[#1] Received on "cli.json"
{
"task": "demo",
"duration": 60
}
```

Another example is creating hex dumps from any message to avoid terminal corruption.

By changing the subscription into:

```
go run ./nats/ sub --translate 'xxd' cli.json
```

We will get the following output for the same published msg:
```
00:02:56 Subscribing on cli.json
[#1] Received on "cli.json"
00000000: 7b22 7461 736b 223a 2264 656d 6f22 2c22 {"task":"demo","
00000010: 6475 7261 7469 6f6e 223a 3630 7d duration":60}
```

#### Examples for using the translation feature with template:

A somewhat artificial example using the subject as argument would be:
```
nats sub --translate "sed 's/\(.*\)/{{Subject}}: \1/'" cli.json
```

Output
```
00:22:19 Subscribing on cli.json
[#1] Received on "cli.json"
cli.json: {"task":"demo","duration":60}
```

The translation feature makes it possible to write specialized or universal translators to aid in debugging messages in streams or core nats.

## Benchmarking and Latency Testing

Benchmarking and latency testing is key requirement for evaluating the production preparedness of your NATS network.
Expand Down
62 changes: 62 additions & 0 deletions cli/filter_data_through_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cli

import (
"bytes"
"fmt"
"github.com/google/shlex"
"os/exec"
"strings"
"text/template"
)

func outPutMSGBody(data []byte, filter, subject, stream string) {
if len(data) == 0 {
fmt.Println("nil body")
return
}

data, err := filterDataThroughCmd(data, filter, subject, stream)
if err != nil {
// using q here so raw binary data will be escaped
fmt.Printf("%q\nError while translating msg body: %s\n\n", data, err.Error())
return
}
output := string(data)
fmt.Println(output)
if !strings.HasSuffix(output, "\n") {
fmt.Println()
}
}

func filterDataThroughCmd(data []byte, filter, subject, stream string) ([]byte, error) {
if filter == "" {
return data, nil
}
funcMap := template.FuncMap{
"Subject": func() string { return subject },
"Stream": func() string { return stream },
}

tmpl, err := template.New("translate").Funcs(funcMap).Parse(filter)
if err != nil {
return nil, err
}
var builder strings.Builder
err = tmpl.Execute(&builder, nil)
if err != nil {
return nil, err
}

parts, err := shlex.Split(builder.String())
if err != nil {
return nil, fmt.Errorf("the filter command line could not be parsed: %w", err)
}
cmd := parts[0]
args := parts[1:]

runner := exec.Command(cmd, args...)
// pass the message as string to stdin
runner.Stdin = bytes.NewReader(data)
// maybe we want to do something on error?
return runner.CombinedOutput()
}
17 changes: 5 additions & 12 deletions cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type streamCmd struct {
vwStartDelta time.Duration
vwPageSize int
vwRaw bool
vwTranslate string
vwSubject string

dryRun bool
Expand Down Expand Up @@ -300,13 +301,15 @@ func configureStreamCommand(app commandHost) {
strView.Flag("id", "Start at a specific message Sequence").IntVar(&c.vwStartId)
strView.Flag("since", "Delivers messages received since a duration like 1d3h5m2s").DurationVar(&c.vwStartDelta)
strView.Flag("raw", "Show the raw data received").UnNegatableBoolVar(&c.vwRaw)
strView.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.vwTranslate)
strView.Flag("subject", "Filter the stream using a subject").StringVar(&c.vwSubject)

strGet := str.Command("get", "Retrieves a specific message from a Stream").Action(c.getAction)
strGet.Arg("stream", "Stream name").StringVar(&c.stream)
strGet.Arg("id", "Message Sequence to retrieve").Int64Var(&c.msgID)
strGet.Flag("last-for", "Retrieves the message for a specific subject").Short('S').PlaceHolder("SUBJECT").StringVar(&c.filterSubject)
strGet.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&c.json)
strGet.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.vwTranslate)

strBackup := str.Command("backup", "Creates a backup of a Stream over the NATS network").Alias("snapshot").Action(c.backupAction)
strBackup.Arg("stream", "Stream to backup").Required().StringVar(&c.stream)
Expand Down Expand Up @@ -702,15 +705,7 @@ func (c *streamCmd) viewAction(_ *fisk.ParseContext) error {
}

fmt.Println()
if len(msg.Data) == 0 {
fmt.Println("nil body")
} else {
fmt.Println(string(msg.Data))
if !strings.HasSuffix(string(msg.Data), "\n") {
fmt.Println()
}
}

outPutMSGBody(msg.Data, c.vwTranslate, msg.Subject, meta.Stream())
}

if last {
Expand Down Expand Up @@ -2705,9 +2700,7 @@ func (c *streamCmd) getAction(_ *fisk.ParseContext) (err error) {
}
fmt.Println()
}

fmt.Println(string(item.Data))
fmt.Println()
outPutMSGBody(item.Data, c.vwTranslate, item.Subject, c.stream)
return nil
}

Expand Down
13 changes: 6 additions & 7 deletions cli/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type subCmd struct {
queue string
durable string
raw bool
translate string
jsAck bool
inbox bool
match bool
Expand Down Expand Up @@ -67,6 +68,7 @@ func configureSubCommand(app commandHost) {
act.Flag("queue", "Subscribe to a named queue group").StringVar(&c.queue)
act.Flag("durable", "Use a durable consumer (requires JetStream)").StringVar(&c.durable)
act.Flag("raw", "Show the raw data received").Short('r').UnNegatableBoolVar(&c.raw)
act.Flag("translate", "Translate the message data by running it through the given command before output").StringVar(&c.translate)
act.Flag("ack", "Acknowledge JetStream message that have the correct metadata").BoolVar(&c.jsAck)
act.Flag("match-replies", "Match replies to requests").UnNegatableBoolVar(&c.match)
act.Flag("inbox", "Subscribes to a generate inbox").Short('i').UnNegatableBoolVar(&c.inbox)
Expand Down Expand Up @@ -499,7 +501,7 @@ func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), msg.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

prettyPrintMsg(msg, c.headersOnly)
prettyPrintMsg(msg, c.headersOnly, c.translate)

if reply != nil {
if info == nil {
Expand All @@ -510,7 +512,7 @@ func printMsg(c *subCmd, msg *nats.Msg, reply *nats.Msg, ctr uint) {
fmt.Printf("[#%d] Matched reply JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), reply.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

prettyPrintMsg(reply, c.headersOnly)
prettyPrintMsg(reply, c.headersOnly, c.translate)

}

Expand Down Expand Up @@ -541,7 +543,7 @@ func dumpMsg(msg *nats.Msg, stdout bool, filepath string, ctr uint) {
}
}

func prettyPrintMsg(msg *nats.Msg, headersOnly bool) {
func prettyPrintMsg(msg *nats.Msg, headersOnly bool, filter string) {
if len(msg.Header) > 0 {
for h, vals := range msg.Header {
for _, val := range vals {
Expand All @@ -553,9 +555,6 @@ func prettyPrintMsg(msg *nats.Msg, headersOnly bool) {
}

if !headersOnly {
fmt.Println(string(msg.Data))
if !strings.HasSuffix(string(msg.Data), "\n") {
fmt.Println()
}
outPutMSGBody(msg.Data, filter, msg.Subject, "")
}
}
1 change: 1 addition & 0 deletions dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This file lists the dependencies used in this repository.
| github.com/nats-io/nats.go | Apache License 2.0 |
| github.com/nats-io/nkeys | Apache License 2.0 |
| github.com/nats-io/nuid | Apache License 2.0 |
| github.com/google/shlex | Apache License 2.0 |
| github.com/tylertreat/hdrhistogram-writer | Apache License 2.0 |
| github.com/xeipuuv/gojsonpointer | Apache License 2.0 |
| github.com/xeipuuv/gojsonreference | Apache License 2.0 |
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/fatih/color v1.15.0
github.com/ghodss/yaml v1.0.0
github.com/google/go-cmp v0.5.9
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/gosuri/uiprogress v0.0.1
github.com/guptarohit/asciigraph v0.5.5
github.com/jedib0t/go-pretty/v6 v6.4.6
Expand All @@ -33,8 +34,8 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gosuri/uilive v0.0.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDe
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/choria-io/fisk v0.4.0 h1:V+mh3OpcmE0uBcihUA18dRJeL1J8YUCCLbZZQRa7GMY=
github.com/choria-io/fisk v0.4.0/go.mod h1:3Rc9XxqKC4y9wBf2GfQ4ovJ1VKELAWcU0J33M/Zgjvs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand All @@ -34,12 +34,14 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGw
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY=
github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI=
github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJSxw=
Expand Down

0 comments on commit a63e6fd

Please sign in to comment.