Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[full-ci] use NATS JetStream #3192

Merged
merged 10 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions audit/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/asim/go-micro/plugins/events/natsjs/v4"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/owncloud/ocis/audit/pkg/config"
Expand Down Expand Up @@ -35,7 +35,10 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()

evtsCfg := cfg.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
client, err := server.NewNatsStream(
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion audit/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func DefaultConfig() *config.Config {
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "test-cluster",
Cluster: "ocis-cluster",
ConsumerGroup: "audit",
},
Auditlog: config.Auditlog{
Expand Down
6 changes: 6 additions & 0 deletions changelog/unreleased/change-nats-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Change: Switch NATS backend

We've switched the NATS backend from Streaming to JetStream, since NATS Streaming is depreciated.

https://github.com/owncloud/ocis/pull/3192
https://github.com/cs3org/reva/pull/2574
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/GeertJohan/yubigo v0.0.0-20190917122436-175bc097e60e
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/asim/go-micro/plugins/client/grpc/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/events/natsjs/v4 v4.0.0-20220311080335-e5a35d38f931
github.com/asim/go-micro/plugins/logger/zerolog/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/registry/consul/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/asim/go-micro/plugins/registry/etcd/v4 v4.0.0-20220118152736-9e0be6c85d75
Expand Down Expand Up @@ -47,7 +47,6 @@ require (
github.com/mitchellh/mapstructure v1.4.3
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.7.4
github.com/nats-io/nats-streaming-server v0.24.3
github.com/nmcclain/asn1-ber v0.0.0-20170104154839-2661553a0484
github.com/nmcclain/ldap v0.0.0-20210720162743-7f8d1e44eeba
github.com/oklog/run v1.1.0
Expand Down Expand Up @@ -99,6 +98,7 @@ require (
github.com/armon/go-metrics v0.3.10 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75 // indirect
github.com/aws/aws-sdk-go v1.42.39 // indirect
github.com/beevik/etree v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -171,7 +171,7 @@ require (
github.com/hashicorp/go-plugin v1.4.3 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/raft v1.3.6 // indirect
github.com/hashicorp/raft v1.3.3 // indirect
github.com/hashicorp/serf v0.9.6 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
Expand All @@ -194,7 +194,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b // indirect
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 // indirect
github.com/miekg/dns v1.1.44 // indirect
github.com/miekg/dns v1.1.46 // indirect
github.com/mileusna/useragent v1.0.2 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
Expand All @@ -209,6 +209,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
github.com/nats-io/nats-streaming-server v0.24.1 // indirect
github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down Expand Up @@ -260,7 +261,7 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/tools v0.1.8 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
20 changes: 11 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ github.com/asim/go-micro/plugins/client/grpc/v4 v4.0.0-20220118152736-9e0be6c85d
github.com/asim/go-micro/plugins/client/grpc/v4 v4.0.0-20220118152736-9e0be6c85d75/go.mod h1:P/Jjf1gCQqBAgpVerr3opyTU594ns1t0JZXsDAYh86c=
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75 h1:G5Degn+tmIBpRCD7vPVpCoAol2gd/S7s00z5CWpzp5U=
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75/go.mod h1:BxrcQ4TPPMevB2udKEAHenQxCUh1xXVItoU2CbvVdcQ=
github.com/asim/go-micro/plugins/events/natsjs/v4 v4.0.0-20220311080335-e5a35d38f931 h1:kNoNolWlG44eDbk/pH6CdYqhivcrSnGwASc38DvD/ik=
github.com/asim/go-micro/plugins/events/natsjs/v4 v4.0.0-20220311080335-e5a35d38f931/go.mod h1:QLCZPlk5wiDpP9BjY2PIogVIoIldUwFsM+5ktw0n4h4=
github.com/asim/go-micro/plugins/logger/zerolog/v4 v4.0.0-20220118152736-9e0be6c85d75 h1:xcCheUvtF9vb2DBuq4VUABnDGlwSKUOmutnXc1qiZ/I=
github.com/asim/go-micro/plugins/logger/zerolog/v4 v4.0.0-20220118152736-9e0be6c85d75/go.mod h1:K1WMlVyOCAte1WcMZoltdSXdTzOQkUgcqvQES6idRg8=
github.com/asim/go-micro/plugins/registry/consul/v4 v4.0.0-20220118152736-9e0be6c85d75 h1:56CkyiUTmBhuY4vsjvjCgF9jk9W02ReRKRp7yzBn2kY=
Expand Down Expand Up @@ -815,9 +817,8 @@ github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn
github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA=
github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/raft v1.3.1/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft v1.3.3 h1:Xr6DSHC5cIM8kzxu+IgoT/+MeNeUNeWin3ie6nlSrMg=
github.com/hashicorp/raft v1.3.3/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft v1.3.6 h1:v5xW5KzByoerQlN/o31VJrFNiozgzGyDoMgDJgXpsto=
github.com/hashicorp/raft v1.3.6/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc=
Expand Down Expand Up @@ -890,6 +891,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.3/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
Expand Down Expand Up @@ -1003,8 +1005,8 @@ github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKju
github.com/miekg/dns v1.1.40/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
github.com/miekg/dns v1.1.44 h1:4rpqcegYPVkvIeVhITrKP1sRR3KjfRc1nrOPMUZmLyc=
github.com/miekg/dns v1.1.44/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
github.com/miekg/dns v1.1.46 h1:uzwpxRtSVxtcIZmz/4Uz6/Rn7G11DvsaslXoy5LxQio=
github.com/miekg/dns v1.1.46/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
github.com/mileusna/useragent v1.0.2 h1:DgVKtiPnjxlb73z9bCwgdUvU2nQNQ97uhgfO8l9uz/w=
github.com/mileusna/useragent v1.0.2/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
Expand Down Expand Up @@ -1079,9 +1081,8 @@ github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBB
github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM=
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats-streaming-server v0.23.0/go.mod h1:1asNNRpUKbgwoPqRLEWbJE65uqmWjG1YN/Xlo3WgkTY=
github.com/nats-io/nats-streaming-server v0.24.1 h1:autzhooN72ELtqP3alC2OPzmrbiA6jIZaQmKdLQsckk=
github.com/nats-io/nats-streaming-server v0.24.1/go.mod h1:N2Q05hKD+aW2Ur1VYP85yUR2zUWHbqJG88CxAFLRrd4=
github.com/nats-io/nats-streaming-server v0.24.3 h1:uZez8jBkXscua++jaDsK7DhpSAkizdetar6yWbPMRco=
github.com/nats-io/nats-streaming-server v0.24.3/go.mod h1:rqWfyCbxlhKj//fAp8POdQzeADwqkVhZcoWlbhkuU5w=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
Expand Down Expand Up @@ -1554,7 +1555,7 @@ golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220307211146-efcb8507fb70/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220314234724-5d542ad81a58 h1:L8CkJyVoa0/NslN3RUMLgasK5+KatNvyRGQ9QyCYAfc=
golang.org/x/crypto v0.0.0-20220314234724-5d542ad81a58/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1821,7 +1822,7 @@ golang.org/x/sys v0.0.0-20211110154304-99a53858aa08/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 h1:y/woIyUBFbpQGKS0u1aHF/40WUDnek3fPOyD08H5Vng=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down Expand Up @@ -1849,8 +1850,9 @@ golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
12 changes: 9 additions & 3 deletions nats/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ func Server(cfg *config.Config) *cli.Command {

natsServer, err := nats.NewNATSServer(
ctx,
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.Logger(logging.NewLogWrapper(logger)),
logging.NewLogWrapper(logger),
[]nats.NatsOption{
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.ClusterID(cfg.Nats.ClusterID),
},
[]nats.JetStreamOption{
nats.JetStreamStoreDir(cfg.Nats.StoreDir),
},
)
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions nats/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {

// Nats is the nats config
type Nats struct {
Host string `ocisConfig:"host" env:"NATS_NATS_HOST"`
Port int `ocisConfig:"port" env:"NATS_NATS_PORT"`
Host string `ocisConfig:"host" env:"NATS_NATS_HOST"`
Port int `ocisConfig:"port" env:"NATS_NATS_PORT"`
ClusterID string `ocisConfig:"clusterid" env:"NATS_NATS_CLUSTER_ID"`
StoreDir string `ocisConfig:"store_dir" env:"NATS_NATS_STORE_DIR"`
}
13 changes: 10 additions & 3 deletions nats/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package defaults

import "github.com/owncloud/ocis/nats/pkg/config"
import (
"path"

"github.com/owncloud/ocis/nats/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/config/defaults"
)

// NOTE: Most of this configuration is not needed to keep it as simple as possible
// TODO: Clean up unneeded configuration
Expand All @@ -20,8 +25,10 @@ func DefaultConfig() *config.Config {
Name: "nats",
},
Nats: config.Nats{
Host: "127.0.0.1",
Port: 9233,
Host: "127.0.0.1",
Port: 9233,
ClusterID: "ocis-cluster",
StoreDir: path.Join(defaults.BaseDataPath(), "nats"),
},
}
}
Expand Down
67 changes: 29 additions & 38 deletions nats/pkg/server/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,55 @@ import (
"context"
"time"

natsServer "github.com/nats-io/nats-server/v2/server"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

var NATSListenAndServeLoopTimer = 1 * time.Second

type NATSServer struct {
ctx context.Context

natsOpts *natsServer.Options
stanOpts *stanServer.Options

server *stanServer.StanServer
ctx context.Context
jetStreamConfig *nserver.JetStreamConfig
server *nserver.Server
}

// NewNATSServer returns a new NATSServer
func NewNATSServer(ctx context.Context, opts ...Option) (*NATSServer, error) {
func NewNATSServer(ctx context.Context, logger nserver.Logger, natsOpts []NatsOption, jetstreamOpts []JetStreamOption) (*NATSServer, error) {
natsOptions := &nserver.Options{}
jetStreamOptions := &nserver.JetStreamConfig{}

for _, o := range natsOpts {
o(natsOptions)
}

server := &NATSServer{
ctx: ctx,
natsOpts: &stanServer.DefaultNatsServerOptions,
stanOpts: stanServer.GetDefaultOptions(),
for _, o := range jetstreamOpts {
o(jetStreamOptions)
}

for _, o := range opts {
o(server.natsOpts, server.stanOpts)
server, err := nserver.NewServer(natsOptions)
if err != nil {
return nil, err
}

return server, nil
server.SetLoggerV2(logger, true, true, false)

return &NATSServer{
ctx: ctx,
jetStreamConfig: jetStreamOptions,
server: server,
}, nil
}

// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
n.server, err = stanServer.RunServerWithOpts(
n.stanOpts,
n.natsOpts,
)
// start NATS first
go n.server.Start()
// start NATS JetStream second
n.server.EnableJetStream(n.jetStreamConfig)
kobergj marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

defer n.Shutdown()

for {
// check if NATs server has an encountered an error
if err := n.server.LastError(); err != nil {
return err
}
// check if the NATs server is still running
if n.server.State() == stanServer.Shutdown {
return nil
}
// check if context was cancelled
if n.ctx.Err() != nil {
return nil
}
time.Sleep(NATSListenAndServeLoopTimer)
}
<-n.ctx.Done()
return nil
}

func (n *NATSServer) Shutdown() {
Expand Down
38 changes: 23 additions & 15 deletions nats/pkg/server/nats/options.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
package nats

import (
natsServer "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-streaming-server/logger"
stanServer "github.com/nats-io/nats-streaming-server/server"
nserver "github.com/nats-io/nats-server/v2/server"
)

// Option configures the nats server
type Option func(*natsServer.Options, *stanServer.Options)
// NatsOption configures the nats server
type NatsOption func(*nserver.Options)

// Host sets the host URL for the nats server
func Host(url string) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Host = url
func Host(url string) NatsOption {
return func(o *nserver.Options) {
o.Host = url
}
}

// Port sets the host URL for the nats server
func Port(port int) Option {
return func(no *natsServer.Options, _ *stanServer.Options) {
no.Port = port
func Port(port int) NatsOption {
return func(o *nserver.Options) {
o.Port = port
}
}

// Port sets the host URL for the nats server
func Logger(logger logger.Logger) Option {
return func(no *natsServer.Options, so *stanServer.Options) {
so.CustomLogger = logger
// ClusterID sets the name for the nats cluster
func ClusterID(clusterID string) NatsOption {
return func(o *nserver.Options) {
o.Cluster.Name = clusterID
}
}

// NatsOption configures the nats server
type JetStreamOption func(*nserver.JetStreamConfig)

// ClusterID sets the name for the nats cluster
func JetStreamStoreDir(StoreDir string) JetStreamOption {
return func(o *nserver.JetStreamConfig) {
o.StoreDir = StoreDir
}
}
Loading