Skip to content

Commit

Permalink
enhanced example
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Meiberg committed Apr 21, 2022
1 parent ee98272 commit 36682fa
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ For using vnats you will need to

### Publisher

The publisher sends a slice of bytes `[]byte` to a subject. If a struct or different type should be sent,
the user has to (un-)marshal the payload.
The publisher sends a slice of bytes `[]byte` to a subject. If a struct or different type should be sent, the user has
to (un-)marshal the payload.

#### Example

Expand Down Expand Up @@ -46,6 +46,12 @@ func main() {
if err != nil {
log.Errorf(err.Error())
}
// Close NATS connection deferred
defer func(conn vnats.Connection) {
if err := conn.Close(); err != nil {
log.Errorf("NATS connection could not be closed: %v", err)
}
}(conn)

// Create publisher bound to stream `PRODUCTS`
pub, err := conn.NewPublisher(vnats.NewPublisherArgs{StreamName: "PRODUCTS"})
Expand All @@ -58,13 +64,13 @@ func main() {
Price: "12,34",
LastUpdated: time.Now(),
}

// Since vnats needs a slice of bytes, the products is converted via the json marshaller
productToBytes, err := json.Marshal(p)
if err != nil {
panic(err)
panic(err)
}

// Publish message to stream `PRODUCTS.PRICES` with a context bound, unique message ID
// msgID is used for deduplication
msgID := fmt.Sprintf("%s-%s", p.Name, p.LastUpdated)
Expand All @@ -75,11 +81,6 @@ func main() {
}); err != nil {
log.Errorf("Could not publish %v: %v", p, err)
}

// Close NATS connection
if err := conn.Close(); err != nil {
log.Errorf("NATS connection could not be closed: %v", err)
}
}
```

Expand All @@ -88,8 +89,8 @@ func main() {
### Subscriber

We use a pull-based subscriber by default, which scales horizontally. The subscriber is asynchronous and pulls
continuously for new messages. A message handler is needed to process each message. The message will be passed as
a slice of bytes `[]byte`.
continuously for new messages. A message handler is needed to process each message. The message will be passed as a
slice of bytes `[]byte`.

**Important**: The `MsgHandler` **MUST** finish its task under 30 seconds. Longer tasks must be only triggered and
executed asynchronously.
Expand Down Expand Up @@ -125,6 +126,13 @@ func main() {
if err != nil {
log.Errorf(err.Error())
}

// Unsubscribe to all open subscriptions and close NATS connection deferred
defer func(conn vnats.Connection) {
if err := conn.Close(); err != nil {
log.Errorf("NATS connection could not be closed: %v", err)
}
}(conn)

// Create Pull-Subscriber bound to consumer `EXAMPLE_CONSUMER`
// and the subject `PRODUCTS.PRICES`
Expand All @@ -143,19 +151,13 @@ func main() {

// Wait for stop signal (e.g. ctrl-C)
waitForStopSignal()

// Unsubscribe to all open subscriptions and close NATS connection
if err := conn.Close(); err != nil {
log.Errorf("NATS connection could not be closed: %v", err)
}

}

// MsgHandler returns the data in a slice of bytes inside the InMsg struct.
func msgHandler(msg vnats.InMsg) error {
var p Product
if err := json.Unmarshal(msg.Data(), &p); err != nil {
return err
if err := json.Unmarshal(msg.Data(), &p); err != nil {
return err
}
log.Debugf("Received product: %v", p)
return nil
Expand Down

0 comments on commit 36682fa

Please sign in to comment.