Skip to content

Commit

Permalink
use NATS JetStream (#2574)
Browse files Browse the repository at this point in the history
* use nats jetstream

* update github.com/asim/go-micro/plugins/events/natsjs

* add changelog

* incorporate review feedback

* check EnableJetStream error
  • Loading branch information
wkloucek authored Mar 24, 2022
1 parent 5bbdbf1 commit 82800d7
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 100 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/change-nats-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Change: Switch NATS backend

We've switched the NATS backend from Streaming to JetStream, since NATS Streaming is depreciated.

https://github.com/cs3org/reva/pull/2574
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/Masterminds/sprig v2.22.0+incompatible
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/natsjs/v4 v4.0.0-20220311080335-e5a35d38f931
github.com/aws/aws-sdk-go v1.42.39
github.com/beevik/etree v1.1.0
github.com/bluele/gcache v0.0.2
Expand Down Expand Up @@ -53,13 +53,13 @@ require (
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/mapstructure v1.4.3
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nats-streaming-server v0.24.1
github.com/onsi/ginkgo/v2 v2.0.0
github.com/onsi/gomega v1.18.1
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.4
github.com/pquerna/cachecontrol v0.1.0 // indirect
github.com/prometheus/alertmanager v0.23.0
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rs/cors v1.8.2
github.com/rs/zerolog v1.26.1
github.com/sciencemesh/meshdirectory-web v1.0.4
Expand All @@ -70,19 +70,19 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tus/tusd v1.8.0
github.com/wk8/go-ordered-map v0.2.0
go-micro.dev/v4 v4.3.1-0.20211108085239-0c2041e43908
go-micro.dev/v4 v4.6.0
go.mongodb.org/mongo-driver v1.7.2 // indirect
go.opencensus.io v0.23.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/exporters/jaeger v1.3.0
go.opentelemetry.io/otel/sdk v1.3.0
go.opentelemetry.io/otel/trace v1.3.0
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
golang.org/x/term v0.0.0-20210916214954-140adaaadfaf
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c
google.golang.org/grpc v1.42.0
google.golang.org/protobuf v1.27.1
Expand Down
83 changes: 24 additions & 59 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go-micro.dev/v4/util/log"
"google.golang.org/grpc"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
v1beta12 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
Expand Down Expand Up @@ -188,6 +188,6 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
case "nats":
address := m["address"].(string)
cid := m["clusterID"].(string)
return server.NewNatsStream(nats.Address(address), nats.ClusterID(cid))
return server.NewNatsStream(natsjs.Address(address), natsjs.ClusterID(cid))
}
}
17 changes: 13 additions & 4 deletions pkg/events/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"log"
"time"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/example/consumer"
"github.com/cs3org/reva/v2/pkg/events/example/publisher"
Expand All @@ -32,7 +32,9 @@ import (
// Simple example of an event workflow
func main() {
// start a server
Server()
go Server()

time.Sleep(5 * time.Second)

// obtain a client
c := Client()
Expand All @@ -53,15 +55,22 @@ func main() {

// Server generates a nats server
func Server() {
err := server.RunNatsServer()
err := server.RunNatsServer(
server.ClusterID("test-cluster"),
server.Host("127.0.0.1"),
server.Port(9233),
)
if err != nil {
log.Fatal(err)
}
}

// Client builds a nats client
func Client() events.Stream {
c, err := server.NewNatsStream(nats.Address("127.0.0.1:9233"), nats.ClusterID("test-cluster"))
c, err := server.NewNatsStream(
natsjs.Address("127.0.0.1:9233"),
natsjs.ClusterID("test-cluster"),
)
if err != nil {
log.Fatal(err)
}
Expand Down
34 changes: 23 additions & 11 deletions pkg/events/server/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,48 @@ package server
import (
"time"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/cenkalti/backoff"
"github.com/cs3org/reva/v2/pkg/logger"
"go-micro.dev/v4/events"

stanServer "github.com/nats-io/nats-streaming-server/server"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
nserver "github.com/nats-io/nats-server/v2/server"
)

// RunNatsServer runs the nats streaming server
// RunNatsServer starts the nats server and blocks
func RunNatsServer(opts ...Option) error {
natsOpts := stanServer.DefaultNatsServerOptions
stanOpts := stanServer.GetDefaultOptions()
options := &nserver.Options{}

for _, o := range opts {
o(&natsOpts, stanOpts)
o(options)
}
_, err := stanServer.RunServerWithOpts(stanOpts, &natsOpts)
return err

server, err := nserver.NewServer(options)
if err != nil {
return err
}

c := &nserver.JetStreamConfig{}

err = server.EnableJetStream(c)
if err != nil {
return err
}

server.Start()
return nil
}

// NewNatsStream returns a streaming client used by `Consume` and `Publish` methods
// retries exponentially to connect to a nats server
func NewNatsStream(opts ...nats.Option) (events.Stream, error) {
func NewNatsStream(opts ...natsjs.Option) (events.Stream, error) {
b := backoff.NewExponentialBackOff()
var stream events.Stream
o := func() error {
n := b.NextBackOff()
s, err := nats.NewStream(opts...)
s, err := natsjs.NewStream(opts...)
if err != nil && n > time.Second {
logger.New().Error().Err(err).Msgf("can't connect to nats (stan) server, retrying in %s", n)
logger.New().Error().Err(err).Msgf("can't connect to nats (jetstream) server, retrying in %s", n)
}
stream = s
return err
Expand Down
28 changes: 10 additions & 18 deletions pkg/events/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,29 @@
package server

import (
natsServer "github.com/nats-io/nats-server/v2/server"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

// Option configures the nats server
type Option func(*natsServer.Options, *stanServer.Options)
type Option func(*nserver.Options)

// Host sets the host URL for the nats server
func Host(url string) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Host = url
return func(o *nserver.Options) {
o.Host = url
}
}

// Port sets the host URL for the nats server
func Port(port int) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Port = port
return func(o *nserver.Options) {
o.Port = port
}
}

// NatsOpts allows setting Options from nats package directly
func NatsOpts(opt func(*natsServer.Options)) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
opt(no)
}
}

// StanOpts allows setting Options from stan package directly
func StanOpts(opt func(*stanServer.Options)) Option {
return func(_ *natsServer.Options, so *stanServer.Options) {
opt(so)
// ClusterID sets the name for the nats cluster
func ClusterID(clusterID string) Option {
return func(o *nserver.Options) {
o.Cluster.Name = clusterID
}
}

0 comments on commit 82800d7

Please sign in to comment.