Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add NATS JetStream events plugin #2433

Merged
merged 3 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugins/events/nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# NATS Streaming

This plugin uses NATS Streaming to send and receive events.

Please not that [NATS Streaming is deprecated](https://docs.nats.io/legacy/stan) and will be no longer receive security fixes after June of 2023.

You should instead [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) and the [go-micro natsjs plugin](https://github.com/asim/go-micro/tree/master/plugins/events/natsjs).
3 changes: 3 additions & 0 deletions plugins/events/natsjs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# NATS JetStream

This plugin uses NATS with JetStream to send and receive events.
40 changes: 40 additions & 0 deletions plugins/events/natsjs/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module github.com/asim/go-micro/plugins/events/natsjs/v4

go 1.17

require (
github.com/google/uuid v1.3.0
github.com/nats-io/nats-server/v2 v2.6.2
github.com/nats-io/nats.go v1.13.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
go-micro.dev/v4 v4.6.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/klauspost/compress v1.14.3 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/miekg/dns v1.1.46 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
869 changes: 869 additions & 0 deletions plugins/events/natsjs/go.sum

Large diffs are not rendered by default.

89 changes: 89 additions & 0 deletions plugins/events/natsjs/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package natsjs_test

import (
"context"
"fmt"
"net"
"path/filepath"
"testing"

nserver "github.com/nats-io/nats-server/v2/server"
"github.com/stretchr/testify/assert"
)

func getFreeLocalhostAddress() string {
l, _ := net.Listen("tcp", "127.0.0.1:0")
defer l.Close()
return l.Addr().String()

}

func natsServer(ctx context.Context, t *testing.T, opts *nserver.Options) {
server, err := nserver.NewServer(
opts,
)
assert.NoError(t, err)
if err != nil {
return
}

server.SetLoggerV2(
NewLogWrapper(),
true, true, false,
)

// first start NATS
go server.Start()

jsConf := &nserver.JetStreamConfig{
StoreDir: filepath.Join(t.TempDir(), "nats-js"),
}

// second start JetStream
err = server.EnableJetStream(jsConf)
assert.NoError(t, err)
if err != nil {
return
}

<-ctx.Done()

server.Shutdown()
}

func NewLogWrapper() *LogWrapper {
return &LogWrapper{}
}

type LogWrapper struct {
}

// Noticef logs a notice statement
func (l *LogWrapper) Noticef(format string, v ...interface{}) {
fmt.Printf(format+"\n", v...)
}

// Warnf logs a warning statement
func (l *LogWrapper) Warnf(format string, v ...interface{}) {
fmt.Printf(format+"\n", v...)
}

// Fatalf logs a fatal statement
func (l *LogWrapper) Fatalf(format string, v ...interface{}) {
fmt.Printf(format+"\n", v...)
}

// Errorf logs an error statement
func (l *LogWrapper) Errorf(format string, v ...interface{}) {
fmt.Printf(format+"\n", v...)
}

// Debugf logs a debug statement
func (l *LogWrapper) Debugf(format string, v ...interface{}) {
fmt.Printf(format+"\n", v...)
}

// Tracef logs a trace statement
func (l *LogWrapper) Tracef(format string, v ...interface{}) {
fmt.Printf(format+"\n", v...)
}
233 changes: 233 additions & 0 deletions plugins/events/natsjs/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package natsjs

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/google/uuid"
nats "github.com/nats-io/nats.go"
"github.com/pkg/errors"

"go-micro.dev/v4/events"
"go-micro.dev/v4/logger"
)

const (
defaultClusterID = "micro"
reconnectLoopDuration = 10 * time.Second
)

// NewStream returns an initialized nats stream or an error if the connection to the nats
// server could not be established
func NewStream(opts ...Option) (events.Stream, error) {
// parse the options
options := Options{
ClientID: uuid.New().String(),
ClusterID: defaultClusterID,
}
for _, o := range opts {
o(&options)
}

s := &stream{opts: options}
natsJetStreamCtx, err := connectToNatsJetStream(options)
if err != nil {
return nil, fmt.Errorf("error connecting to nats cluster %v: %v", options.ClusterID, err)
}
s.natsJetStreamCtx = natsJetStreamCtx
return s, nil
}

type stream struct {
opts Options
natsJetStreamCtx nats.JetStreamContext
}

func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) {
nopts := nats.GetDefaultOptions()
if options.TLSConfig != nil {
nopts.Secure = true
nopts.TLSConfig = options.TLSConfig
}
if len(options.Address) > 0 {
nopts.Servers = strings.Split(options.Address, ",")
}

conn, err := nopts.Connect()
if err != nil {
return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)
}

js, err := conn.JetStream()
if err != nil {
return nil, fmt.Errorf("error while obtaining JetStream context: %v", err)
}

return js, nil
}

// Publish a message to a topic
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// validate the topic
if len(topic) == 0 {
return events.ErrMissingTopic
}

// parse the options
options := events.PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}

// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return events.ErrEncodingMessage
}
payload = p
}

// construct the event
event := &events.Event{
ID: uuid.New().String(),
Topic: topic,
Timestamp: options.Timestamp,
Metadata: options.Metadata,
Payload: payload,
}

// serialize the event to bytes
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error encoding event")
}

pubOpts := []nats.PubOpt{
// TODO: to make de-duplication work, we need to pass the event from the outside as an option
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asim would be de-duplication a thing we want to support? I'm not sure if it can be supported by other event implementations.

https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication

// nats.MsgId(event.ID), // event de-duplication
}

// publish the event to the topic's channel
if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes, pubOpts...); err != nil {
return errors.Wrap(err, "Error publishing message to topic")
}

return nil
}

// Consume from a topic
func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, events.ErrMissingTopic
}

// parse the options
options := events.ConsumeOptions{
Group: uuid.New().String(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group name should be defineable. I want multiple services listening in the same group

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just a "default" group if you don't provide one via the opts.

If you provide one, it'll be set in the next few lines:

	for _, o := range opts {
		o(&options)
	}

}
for _, o := range opts {
o(&options)
}

// setup the subscriber
c := make(chan events.Event)
handleMsg := func(m *nats.Msg) {

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// TODO: not supported by go-micro interface
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asim NATS Jetstream supports "stretching" the message ACK timeout (options.AckWait) but that would need exposing a InProgressFunc to the events channel consumer. Is that something we want and can support with other implementations? see https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive -> AckProgress

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its best to do it in an isolated case specifically for this. You can see the rabbitmq broker plugin for implementation specific options that can be passed through. It's not clear that other implementations would support it so we'd inherit an option nothing can deal with.

// would need a event.InProgressFunc{} to be
// called periodically
//err := m.InProgress(nats.Context(ctx))
//if err != nil {
// return
}

// decode the message
var evt events.Event
if err := json.Unmarshal(m.Data, &evt); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error decoding message: %v", err)
}
// not acknowledging the message is the way to indicate an error occurred
return
}

if !options.AutoAck {
// set up the ack funcs
evt.SetAckFunc(func() error {
return m.Ack()
})
evt.SetNackFunc(func() error {
return m.Nak()
})
}

// push onto the channel and wait for the consumer to take the event off before we acknowledge it.
c <- evt

if !options.AutoAck {
return
}
if err := m.Ack(nats.Context(ctx)); err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error acknowledging message: %v", err)
}
}

// ensure that a stream exists for that topic
_, err := s.natsJetStreamCtx.StreamInfo(topic)
if err != nil {
cfg := &nats.StreamConfig{
Name: topic,
}

_, err = s.natsJetStreamCtx.AddStream(cfg)
if err != nil {
return nil, errors.Wrap(err, "Stream did not exist and adding a stream failed")
}
}

// setup the options
subOpts := []nats.SubOpt{
nats.Durable(topic),
}

if options.CustomRetries {
subOpts = append(subOpts, nats.MaxDeliver(options.GetRetryLimit()))
}

if options.AutoAck {
subOpts = append(subOpts, nats.AckNone())
} else {
subOpts = append(subOpts, nats.AckExplicit())
}

if !options.Offset.IsZero() {
subOpts = append(subOpts, nats.StartTime(options.Offset))
} else {
subOpts = append(subOpts, nats.DeliverNew())
}

if options.AckWait > 0 {
subOpts = append(subOpts, nats.AckWait(options.AckWait))
}

// connect the subscriber
_, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...)
if err != nil {
return nil, errors.Wrap(err, "Error subscribing to topic")
}

return c, nil
}
Loading