Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[full-ci] use NATS JetStream #3192

Merged
merged 10 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions audit/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/owncloud/ocis/audit/pkg/config"
Expand Down Expand Up @@ -35,7 +35,10 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()

evtsCfg := cfg.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
client, err := server.NewNatsStream(
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion audit/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func DefaultConfig() *config.Config {
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "test-cluster",
Cluster: "ocis-cluster",
ConsumerGroup: "audit",
},
Auditlog: config.Auditlog{
Expand Down
6 changes: 6 additions & 0 deletions changelog/unreleased/change-nats-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Change: Switch NATS backend

We've switched the NATS backend from Streaming to JetStream, since NATS Streaming is depreciated.

https://github.com/owncloud/ocis/pull/3192
https://github.com/cs3org/reva/pull/2574
11 changes: 4 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/GeertJohan/yubigo v0.0.0-20190917122436-175bc097e60e
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/asim/go-micro/plugins/client/grpc/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/natsjs/v4 v4.0.0-20220311080335-e5a35d38f931
github.com/asim/go-micro/plugins/logger/zerolog/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/registry/consul/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/registry/etcd/v4 v4.0.0-20220118152736-9e0be6c85d75
Expand All @@ -22,7 +22,7 @@ require (
github.com/blevesearch/bleve/v2 v2.3.1
github.com/coreos/go-oidc/v3 v3.1.0
github.com/cs3org/go-cs3apis v0.0.0-20220126114148-64c025ccdd19
github.com/cs3org/reva/v2 v2.0.0-20220321093112-25cedab9f739
github.com/cs3org/reva/v2 v2.0.0-20220324071614-82800d7ef768
github.com/disintegration/imaging v1.6.2
github.com/glauth/glauth/v2 v2.0.0-20211021011345-ef3151c28733
github.com/go-chi/chi/v5 v5.0.7
Expand All @@ -47,7 +47,6 @@ require (
github.com/mitchellh/mapstructure v1.4.3
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.7.4
github.com/nats-io/nats-streaming-server v0.24.3
github.com/nmcclain/asn1-ber v0.0.0-20170104154839-2661553a0484
github.com/nmcclain/ldap v0.0.0-20210720162743-7f8d1e44eeba
github.com/oklog/run v1.1.0
Expand Down Expand Up @@ -171,7 +170,6 @@ require (
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/raft v1.3.6 // indirect
github.com/hashicorp/serf v0.9.6 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
Expand All @@ -194,7 +192,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b // indirect
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 // indirect
github.com/miekg/dns v1.1.44 // indirect
github.com/miekg/dns v1.1.46 // indirect
github.com/mileusna/useragent v1.0.2 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
Expand All @@ -212,7 +210,6 @@ require (
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nats-io/stan.go v0.10.2 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/orcaman/concurrent-map v1.0.0 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
Expand Down Expand Up @@ -260,7 +257,7 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/tools v0.1.8 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
49 changes: 11 additions & 38 deletions go.sum

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion nats/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func Server(cfg *config.Config) *cli.Command {

natsServer, err := nats.NewNATSServer(
ctx,
logging.NewLogWrapper(logger),
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.Logger(logging.NewLogWrapper(logger)),
nats.ClusterID(cfg.Nats.ClusterID),
nats.StoreDir(cfg.Nats.StoreDir),
)
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions nats/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {

// Nats is the nats config
type Nats struct {
Host string `ocisConfig:"host" env:"NATS_NATS_HOST"`
Port int `ocisConfig:"port" env:"NATS_NATS_PORT"`
Host string `ocisConfig:"host" env:"NATS_NATS_HOST"`
Port int `ocisConfig:"port" env:"NATS_NATS_PORT"`
ClusterID string `ocisConfig:"clusterid" env:"NATS_NATS_CLUSTER_ID"`
StoreDir string `ocisConfig:"store_dir" env:"NATS_NATS_STORE_DIR"`
}
13 changes: 10 additions & 3 deletions nats/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package defaults

import "github.com/owncloud/ocis/nats/pkg/config"
import (
"path"

"github.com/owncloud/ocis/nats/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/config/defaults"
)

// NOTE: Most of this configuration is not needed to keep it as simple as possible
// TODO: Clean up unneeded configuration
Expand All @@ -20,8 +25,10 @@ func DefaultConfig() *config.Config {
Name: "nats",
},
Nats: config.Nats{
Host: "127.0.0.1",
Port: 9233,
Host: "127.0.0.1",
Port: 9233,
ClusterID: "ocis-cluster",
StoreDir: path.Join(defaults.BaseDataPath(), "nats"),
},
}
}
Expand Down
64 changes: 22 additions & 42 deletions nats/pkg/server/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,44 @@ import (
"context"
"time"

natsServer "github.com/nats-io/nats-server/v2/server"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

var NATSListenAndServeLoopTimer = 1 * time.Second

type NATSServer struct {
ctx context.Context

natsOpts *natsServer.Options
stanOpts *stanServer.Options

server *stanServer.StanServer
ctx context.Context
server *nserver.Server
}

// NewNATSServer returns a new NATSServer
func NewNATSServer(ctx context.Context, opts ...Option) (*NATSServer, error) {

server := &NATSServer{
ctx: ctx,
natsOpts: &stanServer.DefaultNatsServerOptions,
stanOpts: stanServer.GetDefaultOptions(),
}
func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) {
natsOpts := &nserver.Options{}

for _, o := range opts {
o(server.natsOpts, server.stanOpts)
o(natsOpts)
}

return server, nil
}
// enable JetStream
natsOpts.JetStream = true

// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
n.server, err = stanServer.RunServerWithOpts(
n.stanOpts,
n.natsOpts,
)
server, err := nserver.NewServer(natsOpts)
if err != nil {
return err
return nil, err
}

defer n.Shutdown()
server.SetLoggerV2(logger, true, true, false)

for {
// check if NATs server has an encountered an error
if err := n.server.LastError(); err != nil {
return err
}
// check if the NATs server is still running
if n.server.State() == stanServer.Shutdown {
return nil
}
// check if context was cancelled
if n.ctx.Err() != nil {
return nil
}
time.Sleep(NATSListenAndServeLoopTimer)
}
return &NATSServer{
ctx: ctx,
server: server,
}, nil
}

// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
go n.server.Start()
<-n.ctx.Done()
return nil
}

func (n *NATSServer) Shutdown() {
Expand Down
35 changes: 20 additions & 15 deletions nats/pkg/server/nats/options.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
package nats

import (
natsServer "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-streaming-server/logger"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

// Option configures the nats server
type Option func(*natsServer.Options, *stanServer.Options)
// NatsOption configures the nats server
type NatsOption func(*nserver.Options)

// Host sets the host URL for the nats server
func Host(url string) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Host = url
func Host(url string) NatsOption {
return func(o *nserver.Options) {
o.Host = url
}
}

// Port sets the host URL for the nats server
func Port(port int) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Port = port
func Port(port int) NatsOption {
return func(o *nserver.Options) {
o.Port = port
}
}

// Port sets the host URL for the nats server
func Logger(logger logger.Logger) Option {
return func(no *natsServer.Options, so *stanServer.Options) {
so.CustomLogger = logger
// ClusterID sets the name for the nats cluster
func ClusterID(clusterID string) NatsOption {
return func(o *nserver.Options) {
o.Cluster.Name = clusterID
}
}

// StoreDir sets the folder for persistence
func StoreDir(StoreDir string) NatsOption {
return func(o *nserver.Options) {
o.StoreDir = StoreDir
}
}
7 changes: 5 additions & 2 deletions notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package command
import (
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/owncloud/ocis/notifications/pkg/channels"
Expand Down Expand Up @@ -31,7 +31,10 @@ func Server(cfg *config.Config) *cli.Command {
}

evtsCfg := cfg.Notifications.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
client, err := server.NewNatsStream(
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion notifications/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func DefaultConfig() *config.Config {
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "test-cluster",
Cluster: "ocis-cluster",
ConsumerGroup: "notifications",
},
RevaGateway: "127.0.0.1:9142",
Expand Down
2 changes: 1 addition & 1 deletion storage/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func DefaultConfig() *config.Config {
UserStorageMountID: "",
Events: config.Events{
Address: "127.0.0.1:9233",
ClusterID: "test-cluster",
ClusterID: "ocis-cluster",
},
},
StorageShares: config.StoragePort{
Expand Down