Skip to content

Commit

Permalink
Merge pull request #3192 from owncloud/nats-check-for-error
Browse files Browse the repository at this point in the history
[full-ci] use NATS JetStream
  • Loading branch information
wkloucek authored Mar 24, 2022
2 parents b0745e1 + 90d5ef3 commit 4f13ac6
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 115 deletions.
7 changes: 5 additions & 2 deletions audit/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/owncloud/ocis/audit/pkg/config"
Expand Down Expand Up @@ -35,7 +35,10 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()

evtsCfg := cfg.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
client, err := server.NewNatsStream(
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion audit/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func DefaultConfig() *config.Config {
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "test-cluster",
Cluster: "ocis-cluster",
ConsumerGroup: "audit",
},
Auditlog: config.Auditlog{
Expand Down
6 changes: 6 additions & 0 deletions changelog/unreleased/change-nats-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Change: Switch NATS backend

We've switched the NATS backend from Streaming to JetStream, since NATS Streaming is depreciated.

https://github.com/owncloud/ocis/pull/3192
https://github.com/cs3org/reva/pull/2574
11 changes: 4 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/GeertJohan/yubigo v0.0.0-20190917122436-175bc097e60e
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/asim/go-micro/plugins/client/grpc/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/natsjs/v4 v4.0.0-20220311080335-e5a35d38f931
github.com/asim/go-micro/plugins/logger/zerolog/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/registry/consul/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/registry/etcd/v4 v4.0.0-20220118152736-9e0be6c85d75
Expand All @@ -22,7 +22,7 @@ require (
github.com/blevesearch/bleve/v2 v2.3.1
github.com/coreos/go-oidc/v3 v3.1.0
github.com/cs3org/go-cs3apis v0.0.0-20220126114148-64c025ccdd19
github.com/cs3org/reva/v2 v2.0.0-20220324071330-5bbdbf17c339
github.com/cs3org/reva/v2 v2.0.0-20220324071614-82800d7ef768
github.com/disintegration/imaging v1.6.2
github.com/glauth/glauth/v2 v2.0.0-20211021011345-ef3151c28733
github.com/go-chi/chi/v5 v5.0.7
Expand All @@ -47,7 +47,6 @@ require (
github.com/mitchellh/mapstructure v1.4.3
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.7.4
github.com/nats-io/nats-streaming-server v0.24.3
github.com/nmcclain/asn1-ber v0.0.0-20170104154839-2661553a0484
github.com/nmcclain/ldap v0.0.0-20210720162743-7f8d1e44eeba
github.com/oklog/run v1.1.0
Expand Down Expand Up @@ -171,7 +170,6 @@ require (
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/raft v1.3.6 // indirect
github.com/hashicorp/serf v0.9.6 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
Expand All @@ -194,7 +192,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b // indirect
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 // indirect
github.com/miekg/dns v1.1.44 // indirect
github.com/miekg/dns v1.1.46 // indirect
github.com/mileusna/useragent v1.0.2 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
Expand All @@ -212,7 +210,6 @@ require (
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nats-io/stan.go v0.10.2 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/orcaman/concurrent-map v1.0.0 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
Expand Down Expand Up @@ -260,7 +257,7 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/tools v0.1.8 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
49 changes: 11 additions & 38 deletions go.sum

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion nats/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func Server(cfg *config.Config) *cli.Command {

natsServer, err := nats.NewNATSServer(
ctx,
logging.NewLogWrapper(logger),
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.Logger(logging.NewLogWrapper(logger)),
nats.ClusterID(cfg.Nats.ClusterID),
nats.StoreDir(cfg.Nats.StoreDir),
)
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions nats/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {

// Nats is the nats config
type Nats struct {
Host string `ocisConfig:"host" env:"NATS_NATS_HOST"`
Port int `ocisConfig:"port" env:"NATS_NATS_PORT"`
Host string `ocisConfig:"host" env:"NATS_NATS_HOST"`
Port int `ocisConfig:"port" env:"NATS_NATS_PORT"`
ClusterID string `ocisConfig:"clusterid" env:"NATS_NATS_CLUSTER_ID"`
StoreDir string `ocisConfig:"store_dir" env:"NATS_NATS_STORE_DIR"`
}
13 changes: 10 additions & 3 deletions nats/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package defaults

import "github.com/owncloud/ocis/nats/pkg/config"
import (
"path"

"github.com/owncloud/ocis/nats/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/config/defaults"
)

// NOTE: Most of this configuration is not needed to keep it as simple as possible
// TODO: Clean up unneeded configuration
Expand All @@ -20,8 +25,10 @@ func DefaultConfig() *config.Config {
Name: "nats",
},
Nats: config.Nats{
Host: "127.0.0.1",
Port: 9233,
Host: "127.0.0.1",
Port: 9233,
ClusterID: "ocis-cluster",
StoreDir: path.Join(defaults.BaseDataPath(), "nats"),
},
}
}
Expand Down
64 changes: 22 additions & 42 deletions nats/pkg/server/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,44 @@ import (
"context"
"time"

natsServer "github.com/nats-io/nats-server/v2/server"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

var NATSListenAndServeLoopTimer = 1 * time.Second

type NATSServer struct {
ctx context.Context

natsOpts *natsServer.Options
stanOpts *stanServer.Options

server *stanServer.StanServer
ctx context.Context
server *nserver.Server
}

// NewNATSServer returns a new NATSServer
func NewNATSServer(ctx context.Context, opts ...Option) (*NATSServer, error) {

server := &NATSServer{
ctx: ctx,
natsOpts: &stanServer.DefaultNatsServerOptions,
stanOpts: stanServer.GetDefaultOptions(),
}
func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) {
natsOpts := &nserver.Options{}

for _, o := range opts {
o(server.natsOpts, server.stanOpts)
o(natsOpts)
}

return server, nil
}
// enable JetStream
natsOpts.JetStream = true

// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
n.server, err = stanServer.RunServerWithOpts(
n.stanOpts,
n.natsOpts,
)
server, err := nserver.NewServer(natsOpts)
if err != nil {
return err
return nil, err
}

defer n.Shutdown()
server.SetLoggerV2(logger, true, true, false)

for {
// check if NATs server has an encountered an error
if err := n.server.LastError(); err != nil {
return err
}
// check if the NATs server is still running
if n.server.State() == stanServer.Shutdown {
return nil
}
// check if context was cancelled
if n.ctx.Err() != nil {
return nil
}
time.Sleep(NATSListenAndServeLoopTimer)
}
return &NATSServer{
ctx: ctx,
server: server,
}, nil
}

// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
go n.server.Start()
<-n.ctx.Done()
return nil
}

func (n *NATSServer) Shutdown() {
Expand Down
35 changes: 20 additions & 15 deletions nats/pkg/server/nats/options.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
package nats

import (
natsServer "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-streaming-server/logger"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

// Option configures the nats server
type Option func(*natsServer.Options, *stanServer.Options)
// NatsOption configures the nats server
type NatsOption func(*nserver.Options)

// Host sets the host URL for the nats server
func Host(url string) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Host = url
func Host(url string) NatsOption {
return func(o *nserver.Options) {
o.Host = url
}
}

// Port sets the host URL for the nats server
func Port(port int) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Port = port
func Port(port int) NatsOption {
return func(o *nserver.Options) {
o.Port = port
}
}

// Port sets the host URL for the nats server
func Logger(logger logger.Logger) Option {
return func(no *natsServer.Options, so *stanServer.Options) {
so.CustomLogger = logger
// ClusterID sets the name for the nats cluster
func ClusterID(clusterID string) NatsOption {
return func(o *nserver.Options) {
o.Cluster.Name = clusterID
}
}

// StoreDir sets the folder for persistence
func StoreDir(StoreDir string) NatsOption {
return func(o *nserver.Options) {
o.StoreDir = StoreDir
}
}
7 changes: 5 additions & 2 deletions notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package command
import (
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/owncloud/ocis/notifications/pkg/channels"
Expand Down Expand Up @@ -31,7 +31,10 @@ func Server(cfg *config.Config) *cli.Command {
}

evtsCfg := cfg.Notifications.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
client, err := server.NewNatsStream(
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion notifications/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func DefaultConfig() *config.Config {
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "test-cluster",
Cluster: "ocis-cluster",
ConsumerGroup: "notifications",
},
RevaGateway: "127.0.0.1:9142",
Expand Down
2 changes: 1 addition & 1 deletion storage/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func DefaultConfig() *config.Config {
UserStorageMountID: "",
Events: config.Events{
Address: "127.0.0.1:9233",
ClusterID: "test-cluster",
ClusterID: "ocis-cluster",
},
},
StorageShares: config.StoragePort{
Expand Down

0 comments on commit 4f13ac6

Please sign in to comment.