From caad4709f80b179cbfbe643d491a8e04d0735574 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Oct 2023 06:23:26 +0000 Subject: [PATCH] Bump github.com/nats-io/nats-server/v2 from 2.10.1 to 2.10.2 Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.1 to 2.10.2. - [Release notes](https://github.com/nats-io/nats-server/releases) - [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml) - [Commits](https://github.com/nats-io/nats-server/compare/v2.10.1...v2.10.2) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> --- go.mod | 4 +- go.sum | 8 +- .../nats-io/nats-server/v2/server/accounts.go | 50 ++--- .../nats-io/nats-server/v2/server/auth.go | 2 +- .../nats-io/nats-server/v2/server/client.go | 40 +++- .../nats-io/nats-server/v2/server/const.go | 2 +- .../nats-io/nats-server/v2/server/consumer.go | 38 +++- .../nats-io/nats-server/v2/server/events.go | 10 +- .../nats-server/v2/server/filestore.go | 173 ++++++++++-------- .../nats-io/nats-server/v2/server/gateway.go | 12 +- .../nats-server/v2/server/jetstream.go | 76 +++----- .../nats-server/v2/server/jetstream_api.go | 87 +++++---- .../v2/server/jetstream_cluster.go | 134 +++++++------- .../nats-io/nats-server/v2/server/leafnode.go | 8 +- .../nats-io/nats-server/v2/server/monitor.go | 31 ++-- .../nats-io/nats-server/v2/server/mqtt.go | 20 +- .../nats-io/nats-server/v2/server/opts.go | 8 +- .../nats-io/nats-server/v2/server/raft.go | 26 +-- .../nats-io/nats-server/v2/server/reload.go | 18 +- .../nats-io/nats-server/v2/server/route.go | 79 ++++++-- .../nats-io/nats-server/v2/server/server.go | 148 +++++++++------ .../nats-io/nats-server/v2/server/stream.go | 111 ++++++++--- .../nats-io/nats-server/v2/server/sublist.go | 17 +- .../nats-server/v2/server/websocket.go | 43 ++--- vendor/github.com/nats-io/nats.go/.travis.yml | 21 +-- vendor/github.com/nats-io/nats.go/README.md | 2 +- vendor/github.com/nats-io/nats.go/go_test.mod | 14 +- vendor/github.com/nats-io/nats.go/go_test.sum | 29 +-- vendor/github.com/nats-io/nats.go/js.go | 173 ++++++++++++++++-- vendor/github.com/nats-io/nats.go/jserrors.go | 49 ++++- vendor/github.com/nats-io/nats.go/jsm.go | 136 ++++++++++---- vendor/github.com/nats-io/nats.go/kv.go | 13 +- vendor/github.com/nats-io/nats.go/nats.go | 8 +- vendor/github.com/nats-io/nats.go/object.go | 18 +- vendor/modules.txt | 6 +- 35 files changed, 1063 insertions(+), 551 deletions(-) diff --git a/go.mod b/go.mod index e40cd74ab60..fc53df6fd38 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.1.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.10.1 + github.com/nats-io/nats-server/v2 v2.10.2 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo v1.16.5 @@ -268,7 +268,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/nats-io/jwt/v2 v2.5.2 // indirect - github.com/nats-io/nats.go v1.29.0 // indirect + github.com/nats-io/nats.go v1.30.2 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/nxadm/tail v1.4.8 // indirect diff --git a/go.sum b/go.sum index fced304f7a3..022c159c921 100644 --- a/go.sum +++ b/go.sum @@ -1717,10 +1717,10 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ= -github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= -github.com/nats-io/nats.go v1.29.0 h1:dSXZ+SZeGyTdHVYeXimeq12FsIpb9dM8CJ2IZFiHcyE= -github.com/nats-io/nats.go v1.29.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nats-server/v2 v2.10.2 h1:2o/OOyc/dxeMCQtrF1V/9er0SU0A3LKhDlv/+rqreBM= +github.com/nats-io/nats-server/v2 v2.10.2/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM= +github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= +github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go index 5d1ba4502f0..c26c09a108e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/nats-io/jwt/v2" @@ -72,6 +73,7 @@ type Account struct { lqws map[string]int32 usersRevoked map[string]int64 mappings []*mapping + hasMapped atomic.Bool lmu sync.RWMutex lleafs []*client leafClusters map[string]uint64 @@ -291,6 +293,8 @@ func (a *Account) shallowCopy(na *Account) { if len(na.mappings) > 0 && na.prand == nil { na.prand = rand.New(rand.NewSource(time.Now().UnixNano())) } + na.hasMapped.Store(len(na.mappings) > 0) + // JetStream na.jsLimits = a.jsLimits // Server config account limits. @@ -703,6 +707,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { } // If we did not replace add to the end. a.mappings = append(a.mappings, m) + a.hasMapped.Store(len(a.mappings) > 0) // If we have connected leafnodes make sure to update. if a.nleafs > 0 { @@ -729,6 +734,7 @@ func (a *Account) RemoveMapping(src string) bool { a.mappings[i] = a.mappings[len(a.mappings)-1] a.mappings[len(a.mappings)-1] = nil // gc a.mappings = a.mappings[:len(a.mappings)-1] + a.hasMapped.Store(len(a.mappings) > 0) return true } } @@ -740,28 +746,17 @@ func (a *Account) hasMappings() bool { if a == nil { return false } - a.mu.RLock() - hm := a.hasMappingsLocked() - a.mu.RUnlock() - return hm -} - -// Indicates we have mapping entries. -// The account has been verified to be non-nil. -// Read or Write lock held on entry. -func (a *Account) hasMappingsLocked() bool { - return len(a.mappings) > 0 + return a.hasMapped.Load() } // This performs the logic to map to a new dest subject based on mappings. // Should only be called from processInboundClientMsg or service import processing. func (a *Account) selectMappedSubject(dest string) (string, bool) { - a.mu.RLock() - if len(a.mappings) == 0 { - a.mu.RUnlock() + if !a.hasMappings() { return dest, false } + a.mu.RLock() // In case we have to tokenize for subset matching. tsa := [32]string{} tts := tsa[:0] @@ -1707,29 +1702,38 @@ func (a *Account) addReverseRespMapEntry(acc *Account, reply, from string) { // This will be called from checkForReverseEntry when the reply arg is a wildcard subject. // This will usually be called in a go routine since we need to walk all the entries. func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed bool) { - a.mu.RLock() - if len(a.imports.rrMap) == 0 { - a.mu.RUnlock() + if subjectIsLiteral(reply) { + a._checkForReverseEntry(reply, nil, checkInterest, recursed) return } - if subjectIsLiteral(reply) { + a.mu.RLock() + if len(a.imports.rrMap) == 0 { a.mu.RUnlock() - a._checkForReverseEntry(reply, nil, checkInterest, recursed) return } var _rs [64]string rs := _rs[:0] + if n := len(a.imports.rrMap); n > cap(rs) { + rs = make([]string, 0, n) + } + for k := range a.imports.rrMap { - if subjectIsSubsetMatch(k, reply) { - rs = append(rs, k) - } + rs = append(rs, k) } a.mu.RUnlock() + tsa := [32]string{} + tts := tokenizeSubjectIntoSlice(tsa[:0], reply) + + rsa := [32]string{} for _, r := range rs { - a._checkForReverseEntry(r, nil, checkInterest, recursed) + rts := tokenizeSubjectIntoSlice(rsa[:0], r) + // isSubsetMatchTokenized is heavy so make sure we do this without the lock. + if isSubsetMatchTokenized(rts, tts) { + a._checkForReverseEntry(r, nil, checkInterest, recursed) + } } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/auth.go b/vendor/github.com/nats-io/nats-server/v2/server/auth.go index 23564312a01..b8e82abe415 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/auth.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/auth.go @@ -983,7 +983,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au acc.mu.RLock() c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+ "signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p", - c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappingsLocked(), acc) + c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappings(), acc) acc.mu.RUnlock() return true } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index 1cdde1cfb28..e3364c8a80f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -3014,6 +3014,10 @@ func queueMatches(queue string, qsubs [][]*subscription) bool { // Low level unsubscribe for a given client. func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) { + if s := c.srv; s != nil && s.isShuttingDown() { + return + } + c.mu.Lock() if !force && sub.max > 0 && sub.nm < sub.max { c.Debugf( @@ -3067,7 +3071,8 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool } // Now check to see if this was part of a respMap entry for service imports. - if acc != nil { + // We can skip subscriptions on reserved replies. + if acc != nil && !isReservedReply(sub.subject) { acc.checkForReverseEntry(string(sub.subject), nil, true) } } @@ -3735,7 +3740,7 @@ func (c *client) processInboundMsg(msg []byte) { } } -// selectMappedSubject will chose the mapped subject based on the client's inbound subject. +// selectMappedSubject will choose the mapped subject based on the client's inbound subject. func (c *client) selectMappedSubject() bool { nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject)) if changed { @@ -4558,6 +4563,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } + // If we are a spoke leaf node make sure to not forward across routes. + // This mimics same behavior for normal subs above. + if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER { + continue + } + // We have taken care of preferring local subs for a message from a route above. // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { @@ -5071,6 +5082,23 @@ func (c *client) closeConnection(reason ClosedState) { c.out.stc = nil } + // If we have remote latency tracking running shut that down. + if c.rrTracking != nil { + c.rrTracking.ptmr.Stop() + c.rrTracking = nil + } + + // If we are shutting down, no need to do all the accounting on subs, etc. + if reason == ServerShutdown { + s := c.srv + c.mu.Unlock() + if s != nil { + // Unregister + s.removeClient(c) + } + return + } + var ( kind = c.kind srv = c.srv @@ -5095,12 +5123,6 @@ func (c *client) closeConnection(reason ClosedState) { spoke = c.isSpokeLeafNode() } - // If we have remote latency tracking running shut that down. - if c.rrTracking != nil { - c.rrTracking.ptmr.Stop() - c.rrTracking = nil - } - c.mu.Unlock() // Remove client's or leaf node or jetstream subscriptions. @@ -5219,7 +5241,7 @@ func (c *client) reconnect() { // It is possible that the server is being shutdown. // If so, don't try to reconnect - if !srv.running { + if !srv.isRunning() { return } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index 30d7ea61c92..64ec6b6267f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.1" + VERSION = "2.10.2" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index a5fd72a29ef..15fbd1286df 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -1255,15 +1255,32 @@ func (o *consumer) setLeader(isLeader bool) { // Snapshot initial info. o.infoWithSnap(true) + // These are the labels we will use to annotate our goroutines. + labels := pprofLabels{ + "type": "consumer", + "account": mset.accName(), + "stream": mset.name(), + "consumer": o.name, + } + // Now start up Go routine to deliver msgs. - go o.loopAndGatherMsgs(qch) + go func() { + setGoRoutineLabels(labels) + o.loopAndGatherMsgs(qch) + }() // Now start up Go routine to process acks. - go o.processInboundAcks(qch) + go func() { + setGoRoutineLabels(labels) + o.processInboundAcks(qch) + }() if pullMode { // Now start up Go routine to process inbound next message requests. - go o.processInboundNextMsgReqs(qch) + go func() { + setGoRoutineLabels(labels) + o.processInboundNextMsgReqs(qch) + }() } // If we are R>1 spin up our proposal loop. @@ -1272,7 +1289,10 @@ func (o *consumer) setLeader(isLeader bool) { // They must be on server versions >= 2.7.1 o.checkAndSetPendingRequestsOk() o.checkPendingRequests() - go o.loopAndForwardProposals(qch) + go func() { + setGoRoutineLabels(labels) + o.loopAndForwardProposals(qch) + }() } } else { @@ -1536,7 +1556,7 @@ func (o *consumer) deleteNotActive() { } } - s, js := o.mset.srv, o.mset.srv.js + s, js := o.mset.srv, o.srv.js.Load() acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct o.mu.Unlock() @@ -1564,7 +1584,7 @@ func (o *consumer) deleteNotActive() { // Don't think this needs to be a monitored go routine. go func() { const ( - startInterval = 5 * time.Second + startInterval = 30 * time.Second maxInterval = 5 * time.Minute ) jitter := time.Duration(rand.Int63n(int64(startInterval))) @@ -1573,6 +1593,10 @@ func (o *consumer) deleteNotActive() { defer ticker.Stop() for range ticker.C { js.mu.RLock() + if js.shuttingDown { + js.mu.RUnlock() + return + } nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() // Make sure this is not a new consumer with the same name. @@ -2473,7 +2497,7 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo { func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { o.mu.Lock() mset := o.mset - if mset == nil || mset.srv == nil { + if o.closed || mset == nil || mset.srv == nil { o.mu.Unlock() return nil } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 3b157cef937..0f761a47c5f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -715,7 +715,7 @@ func (s *Server) eventsRunning() bool { return false } s.mu.RLock() - er := s.running && s.eventsEnabled() + er := s.isRunning() && s.eventsEnabled() s.mu.RUnlock() return er } @@ -739,7 +739,7 @@ func (s *Server) eventsEnabled() bool { func (s *Server) TrackedRemoteServers() int { s.mu.RLock() defer s.mu.RUnlock() - if !s.running || !s.eventsEnabled() { + if !s.isRunning() || !s.eventsEnabled() { return -1 } return len(s.sys.servers) @@ -875,7 +875,7 @@ func (s *Server) sendStatsz(subj string) { m.Stats.ActiveServers = len(s.sys.servers) + 1 // JetStream - if js := s.js; js != nil { + if js := s.js.Load(); js != nil { jStat := &JetStreamVarz{} s.mu.RUnlock() js.mu.RLock() @@ -1484,7 +1484,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su // Should do normal updates before bailing if wrong domain. s.mu.Lock() - if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID { + if s.isRunning() && s.eventsEnabled() && ssm.Server.ID != s.info.ID { s.updateRemoteServer(&si) } s.mu.Unlock() @@ -1943,7 +1943,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub s.mu.Lock() // check again here if we have been shutdown. - if !s.running || !s.eventsEnabled() { + if !s.isRunning() || !s.eventsEnabled() { s.mu.Unlock() return } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 8e054e1206c..78e4481112d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -60,6 +60,9 @@ type FileStoreConfig struct { Cipher StoreCipher // Compression is the algorithm to use when compressing. Compression StoreCompression + + // Internal reference to our server. + srv *Server } // FileStreamInfo allows us to remember created time. @@ -387,6 +390,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim qch: make(chan struct{}), fch: make(chan struct{}, 1), fsld: make(chan struct{}), + srv: fcfg.srv, } // Set flush in place to AsyncFlush which by default is false. @@ -527,12 +531,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim return fs, nil } -func (fs *fileStore) registerServer(s *Server) { - fs.mu.Lock() - defer fs.mu.Unlock() - fs.srv = s -} - // Lock all existing message blocks. // Lock held on entry. func (fs *fileStore) lockAllMsgBlocks() { @@ -1436,6 +1434,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { return nil, tombstones, nil } +// For doing warn logging. +// Lock should be held. +func (fs *fileStore) warn(format string, args ...any) { + // No-op if no server configured. + if fs.srv == nil { + return + } + fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...) +} + // recoverFullState will attempt to receover our last full state and re-process any state changes // that happened afterwards. func (fs *fileStore) recoverFullState() (rerr error) { @@ -1455,12 +1463,16 @@ func (fs *fileStore) recoverFullState() (rerr error) { dios <- struct{}{} if err != nil { + if !os.IsNotExist(err) { + fs.warn("Could not read stream state file: %v", err) + } return err } const minLen = 32 if len(buf) < minLen { os.Remove(fn) + fs.warn("Stream state too short (%d bytes)", len(buf)) return errCorruptState } @@ -1471,6 +1483,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { fs.hh.Write(buf) if !bytes.Equal(h, fs.hh.Sum(nil)) { os.Remove(fn) + fs.warn("Stream state checksum did not match") return errCorruptState } @@ -1482,6 +1495,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { ns := fs.aek.NonceSize() buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil) if err != nil { + fs.warn("Stream state error reading encryption key: %v", err) return err } } @@ -1489,6 +1503,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { if buf[0] != fullStateMagic || buf[1] != fullStateVersion { os.Remove(fn) + fs.warn("Stream state magic and version mismatch") return errCorruptState } @@ -1543,6 +1558,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { if lsubj := int(readU64()); lsubj > 0 { if bi+lsubj > len(buf) { os.Remove(fn) + fs.warn("Stream state bad subject len (%d)", lsubj) return errCorruptState } subj := fs.subjString(buf[bi : bi+lsubj]) @@ -1573,10 +1589,15 @@ func (fs *fileStore) recoverFullState() (rerr error) { dmap, n, err := avl.Decode(buf[bi:]) if err != nil { os.Remove(fn) + fs.warn("Stream state error decoding avl dmap: %v", err) return errCorruptState } mb.dmap = *dmap - mb.msgs -= numDeleted + if mb.msgs > numDeleted { + mb.msgs -= numDeleted + } else { + mb.msgs = 0 + } bi += n } // Only add in if not empty or the lmb. @@ -1601,6 +1622,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Check if we had any errors. if bi < 0 { os.Remove(fn) + fs.warn("Stream state has no checksum present") return errCorruptState } @@ -1610,20 +1632,22 @@ func (fs *fileStore) recoverFullState() (rerr error) { // First let's check the happy path, open the blk file that was the lmb when we created the full state. // See if we have the last block available. var matched bool - var mb *msgBlock - if mb = fs.bim[blkIndex]; mb != nil { - if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) { - // If our saved state is past what we see on disk, fallback and rebuild. - if ld, _, _ := mb.rebuildState(); ld != nil { - fs.addLostData(ld) - } - return errPriorState - } - - if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { - // Remove the last message block since we will re-process below. - fs.removeMsgBlockFromList(mb) + mb := fs.lmb + if mb == nil || mb.index != blkIndex { + fs.warn("Stream state block does not exist or index mismatch") + return errCorruptState + } + if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) { + // If our saved state is past what we see on disk, fallback and rebuild. + if ld, _, _ := mb.rebuildState(); ld != nil { + fs.addLostData(ld) } + fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex) + return errPriorState + } + if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { + // Remove the last message block since we will re-process below. + fs.removeMsgBlockFromList(mb) } // We may need to check other blocks. Even if we matched last checksum we will see if there is another block. @@ -1640,12 +1664,14 @@ func (fs *fileStore) recoverFullState() (rerr error) { return nil } os.Remove(fn) + fs.warn("Stream state could not recover msg block %d", bi) return err } if nmb != nil { // Check if we have to account for a partial message block. if !matched && mb != nil && mb.index == nmb.index { if err := fs.adjustAccounting(mb, nmb); err != nil { + fs.warn("Stream state could not adjust accounting") return err } } @@ -1677,8 +1703,13 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error { } nmb.ensurePerSubjectInfoLoaded() - lookupAndAdjust := func(seq uint64) error { - var smv StoreMsg + // Walk all the original mb's sequences that were included in the stream state. + var smv StoreMsg + for seq := mb.first.seq; seq <= mb.last.seq; seq++ { + // If we had already declared it deleted we can move on since you can not undelete. + if mb.dmap.Exists(seq) { + continue + } // Lookup the message. sm, err := nmb.cacheLookup(seq, &smv) if err != nil { @@ -1690,29 +1721,10 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error { if len(sm.subj) > 0 && fs.psim != nil { fs.removePerSubject(sm.subj) } - return nil - } - - // Walk all the original mb's sequences that were included in the stream state. - for seq := mb.first.seq; seq <= mb.last.seq; seq++ { - // If we had already declared it deleted we can move on since you can not undelete. - if mb.dmap.Exists(seq) { - continue - } - // Lookup the message. - if err := lookupAndAdjust(seq); err != nil { - return err - } } // Now check to see if we had a higher first for the recovered state mb vs nmb. if nmb.first.seq < mb.first.seq { - for seq := nmb.first.seq; seq < mb.first.seq; seq++ { - // Lookup the message. - if err := lookupAndAdjust(seq); err != nil { - return err - } - } // Now set first for nmb. nmb.first = mb.first } @@ -1837,7 +1849,10 @@ func (fs *fileStore) recoverMsgs() error { } } for _, mb := range emptyBlks { + // Need the mb lock here. + mb.mu.Lock() fs.removeMsgBlock(mb) + mb.mu.Unlock() } } @@ -2031,6 +2046,12 @@ func (fs *fileStore) expireMsgsOnRecover() { // Clear any global subject state. fs.psim = make(map[string]*psi) } + + // If we purged anything, make sure we kick flush state loop. + if purged > 0 { + fs.dirty++ + fs.kickFlushStateLoop() + } } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3086,7 +3107,9 @@ func (fs *fileStore) rebuildFirst() { isEmpty := fmb.msgs == 0 fmb.mu.RUnlock() if isEmpty { + fmb.mu.Lock() fs.removeMsgBlock(fmb) + fmb.mu.Unlock() } fs.selectNextFirst() fs.rebuildStateLocked(ld) @@ -3190,6 +3213,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() { // We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught. // So do a quick sanity check here. If we detect a skew do a rebuild then re-check. if numMsgs != fs.state.Msgs { + fs.warn("Detected skew in subject-based total (%d) vs raw total (%d), rebuilding", numMsgs, fs.state.Msgs) // Clear any global subject state. fs.psim = make(map[string]*psi) for _, mb := range fs.blks { @@ -3293,7 +3317,9 @@ func (fs *fileStore) removePerSubject(subj string) { // We do not update sense of fblk here but will do so when we resolve during lookup. if info, ok := fs.psim[subj]; ok { info.total-- - if info.total == 0 { + if info.total == 1 { + info.fblk = info.lblk + } else if info.total == 0 { delete(fs.psim, subj) } } @@ -3463,10 +3489,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( } } } else if !isEmpty { - if mb.dmap.IsEmpty() { - // Mark initial base for delete set. - mb.dmap.SetInitialMin(mb.first.seq) - } // Out of order delete. mb.dmap.Insert(seq) // Check if <25% utilization and minimum size met. @@ -3475,6 +3497,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen) if rbytes>>2 > mb.bytes { mb.compact() + fs.kickFlushStateLoop() } } } @@ -3555,10 +3578,7 @@ func (mb *msgBlock) compact() { var firstSet bool isDeleted := func(seq uint64) bool { - if seq == 0 || seq&ebit != 0 || seq < mb.first.seq { - return true - } - return mb.dmap.Exists(seq) + return seq == 0 || seq&ebit != 0 || seq < mb.first.seq || mb.dmap.Exists(seq) } for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { @@ -3601,6 +3621,12 @@ func (mb *msgBlock) compact() { index += rl } + // Handle compression + var err error + if nbuf, err = mb.cmp.Compress(nbuf); err != nil { + return + } + // Check for encryption. if mb.bek != nil && len(nbuf) > 0 { // Recreate to reset counter. @@ -3615,7 +3641,7 @@ func (mb *msgBlock) compact() { mb.closeFDsLocked() // We will write to a new file and mv/rename it in case of failure. - mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) + mfn := filepath.Join(mb.fs.fcfg.StoreDir, msgDir, fmt.Sprintf(newScan, mb.index)) if err := os.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { os.Remove(mfn) return @@ -3625,8 +3651,8 @@ func (mb *msgBlock) compact() { return } - // Remove index file and wipe delete map, then rebuild. - mb.deleteDmap() + // Wipe dmap and rebuild here. + mb.dmap.Empty() mb.rebuildStateLocked() // If we entered with the msgs loaded make sure to reload them. @@ -3635,11 +3661,6 @@ func (mb *msgBlock) compact() { } } -// Empty out our dmap. -func (mb *msgBlock) deleteDmap() { - mb.dmap.Empty() -} - // Grab info from a slot. // Lock should be held. func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { @@ -4697,29 +4718,30 @@ func (fs *fileStore) syncBlocks() { mb.mu.Unlock() continue } + // See if we can close FDs due to being idle. + if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { + mb.dirtyCloseWithRemove(false) + } + // Check if we need to sync. We will not hold lock during actual sync. + var fn string if mb.needSync { // Flush anything that may be pending. if mb.pendingWriteSizeLocked() > 0 { mb.flushPendingMsgsLocked() } - if mb.mfd != nil { - mb.mfd.Sync() - } else { - fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) - if err != nil { - mb.mu.Unlock() - continue - } + fn = mb.mfn + mb.needSync = false + } + mb.mu.Unlock() + + // Check if we need to sync. + // This is done not holding any locks. + if fn != _EMPTY_ { + if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil { fd.Sync() fd.Close() } - mb.needSync = false - } - // See if we can close FDs due to being idle. - if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle { - mb.dirtyCloseWithRemove(false) } - mb.mu.Unlock() } fs.mu.Lock() @@ -6350,6 +6372,12 @@ func (fs *fileStore) reset() error { fs.psim = make(map[string]*psi) fs.bim = make(map[uint32]*msgBlock) + // If we purged anything, make sure we kick flush state loop. + if purged > 0 { + fs.dirty++ + fs.kickFlushStateLoop() + } + fs.mu.Unlock() if cb != nil { @@ -6473,6 +6501,7 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) { // Remove from list. for i, omb := range fs.blks { if mb == omb { + fs.dirty++ blks := append(fs.blks[:i], fs.blks[i+1:]...) fs.blks = copyMsgBlocks(blks) if fs.bim != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go index b7307e47ec6..715a2c1dc5b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/gateway.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/gateway.go @@ -473,6 +473,10 @@ func (s *Server) startGateways() { // This starts the gateway accept loop in a go routine, unless it // is detected that the server has already been shutdown. func (s *Server) startGatewayAcceptLoop() { + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() @@ -482,10 +486,6 @@ func (s *Server) startGatewayAcceptLoop() { } s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port)) l, e := natsListen("tcp", hp) s.gatewayListenerErr = e @@ -1128,8 +1128,8 @@ func (c *client) processGatewayInfo(info *Info) { // connect events to switch those accounts into interest only mode. s.mu.Lock() s.ensureGWsInterestOnlyForLeafNodes() - js := s.js s.mu.Unlock() + js := s.js.Load() // If running in some tests, maintain the original behavior. if gwDoNotForceInterestOnlyMode && js != nil { @@ -1575,7 +1575,7 @@ func (s *Server) addGatewayURL(urlStr string) bool { // Returns true if the URL has been removed, false otherwise. // Server lock held on entry func (s *Server) removeGatewayURL(urlStr string) bool { - if s.shutdown { + if s.isShuttingDown() { return false } s.gateway.Lock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index 04b7430ac6c..756e75a5628 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -117,9 +117,11 @@ type jetStream struct { // Some bools regarding general state. metaRecovering bool standAlone bool - disabled bool oos bool shuttingDown bool + + // Atomic versions + disabled atomic.Bool } type remoteUsage struct { @@ -372,9 +374,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { } s.gcbMu.Unlock() - s.mu.Lock() - s.js = js - s.mu.Unlock() + s.js.Store(js) // FIXME(dlc) - Allow memory only operation? if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) { @@ -530,10 +530,7 @@ func (s *Server) setupJetStreamExports() { } func (s *Server) jetStreamOOSPending() (wasPending bool) { - s.mu.Lock() - js := s.js - s.mu.Unlock() - if js != nil { + if js := s.getJetStream(); js != nil { js.mu.Lock() wasPending = js.oos js.oos = true @@ -543,13 +540,8 @@ func (s *Server) jetStreamOOSPending() (wasPending bool) { } func (s *Server) setJetStreamDisabled() { - s.mu.Lock() - js := s.js - s.mu.Unlock() - if js != nil { - js.mu.Lock() - js.disabled = true - js.mu.Unlock() + if js := s.getJetStream(); js != nil { + js.disabled.Store(true) } } @@ -738,16 +730,15 @@ func (s *Server) configAllJetStreamAccounts() error { // a non-default system account. s.checkJetStreamExports() - // Snapshot into our own list. Might not be needed. - s.mu.Lock() // Bail if server not enabled. If it was enabled and a reload turns it off // that will be handled elsewhere. - js := s.js + js := s.getJetStream() if js == nil { - s.mu.Unlock() return nil } + // Snapshot into our own list. Might not be needed. + s.mu.RLock() if s.sys != nil { // clustered stream removal will perform this cleanup as well // this is mainly for initial cleanup @@ -764,12 +755,12 @@ func (s *Server) configAllJetStreamAccounts() error { } var jsAccounts []*Account - s.accounts.Range(func(k, v interface{}) bool { + s.accounts.Range(func(k, v any) bool { jsAccounts = append(jsAccounts, v.(*Account)) return true }) accounts := &s.accounts - s.mu.Unlock() + s.mu.RUnlock() // Process any jetstream enabled accounts here. These will be accounts we are // already aware of at startup etc. @@ -809,9 +800,7 @@ func (js *jetStream) isEnabled() bool { if js == nil { return false } - js.mu.RLock() - defer js.mu.RUnlock() - return !js.disabled + return !js.disabled.Load() } // Mark that we will be in standlone mode. @@ -821,9 +810,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { } js.mu.Lock() defer js.mu.Unlock() - js.standAlone = isStandAlone - - if isStandAlone { + if js.standAlone = isStandAlone; js.standAlone { + // Update our server atomic. + js.srv.isMetaLeader.Store(true) js.accountPurge, _ = js.srv.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, nil, js.srv.jsLeaderAccountPurgeRequest) } else if js.accountPurge != nil { js.srv.sysUnsubscribe(js.accountPurge) @@ -832,11 +821,7 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) { // JetStreamEnabled reports if jetstream is enabled for this server. func (s *Server) JetStreamEnabled() bool { - var js *jetStream - s.mu.RLock() - js = s.js - s.mu.RUnlock() - return js.isEnabled() + return s.getJetStream().isEnabled() } // JetStreamEnabledForDomain will report if any servers have JetStream enabled within this domain. @@ -909,10 +894,7 @@ func (js *jetStream) isShuttingDown() bool { // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { - s.mu.RLock() - js := s.js - s.mu.RUnlock() - + js := s.getJetStream() if js == nil { return } @@ -951,9 +933,7 @@ func (s *Server) shutdownJetStream() { a.removeJetStream() } - s.mu.Lock() - s.js = nil - s.mu.Unlock() + s.js.Store(nil) js.mu.Lock() js.accounts = nil @@ -994,23 +974,20 @@ func (s *Server) shutdownJetStream() { // created a dynamic configuration. A copy is returned. func (s *Server) JetStreamConfig() *JetStreamConfig { var c *JetStreamConfig - s.mu.Lock() - if s.js != nil { - copy := s.js.config + if js := s.getJetStream(); js != nil { + copy := js.config c = &(copy) } - s.mu.Unlock() return c } // StoreDir returns the current JetStream directory. func (s *Server) StoreDir() string { - s.mu.Lock() - defer s.mu.Unlock() - if s.js == nil { + js := s.getJetStream() + if js == nil { return _EMPTY_ } - return s.js.config.StoreDir + return js.config.StoreDir } // JetStreamNumAccounts returns the number of enabled accounts this server is tracking. @@ -1036,10 +1013,7 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) { } func (s *Server) getJetStream() *jetStream { - s.mu.RLock() - js := s.js - s.mu.RUnlock() - return js + return s.js.Load() } func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index 53736a1b935..7a3b9203c75 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -1731,14 +1731,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s var clusterWideConsCount int + js, cc := s.getJetStreamCluster() + if js == nil { + return + } // If we are in clustered mode we need to be the stream leader to proceed. - if s.JetStreamIsClustered() { + if cc != nil { // Check to make sure the stream is assigned. - js, cc := s.getJetStreamCluster() - if js == nil || cc == nil { - return - } - js.mu.RLock() isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName) var offline bool @@ -1833,15 +1832,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s } mset, err := acc.lookupStream(streamName) + // Error is not to be expected at this point, but could happen if same stream trying to be created. if err != nil { - resp.Error = NewJSStreamNotFoundError(Unless(err)) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return + if cc != nil { + // This could be inflight, pause for a short bit and try again. + // This will not be inline, so ok. + time.Sleep(10 * time.Millisecond) + mset, err = acc.lookupStream(streamName) + } + // Check again. + if err != nil { + resp.Error = NewJSStreamNotFoundError(Unless(err)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } } config := mset.config() - js, _ := s.getJetStreamCluster() - resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.stateWithDetail(details), @@ -2307,14 +2314,15 @@ func (s *Server) peerSetToNames(ps []string) []string { // looks up the peer id for a given server name. Cluster and domain name are optional filter criteria func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string { js.mu.RLock() - cc := js.cluster defer js.mu.RUnlock() - for _, p := range cc.meta.Peers() { - si, ok := s.nodeToInfo.Load(p.ID) - if ok && si.(nodeInfo).name == serverName { - if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster { - if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain { - return p.ID + if cc := js.cluster; cc != nil { + for _, p := range cc.meta.Peers() { + si, ok := s.nodeToInfo.Load(p.ID) + if ok && si.(nodeInfo).name == serverName { + if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster { + if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain { + return p.ID + } } } } @@ -4156,9 +4164,20 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, js.mu.RLock() isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName) ourID := cc.meta.ID() - var offline bool + var rg *raftGroup + var offline, isMember bool if ca != nil { - offline = s.allPeersOffline(ca.Group) + if rg = ca.Group; rg != nil { + offline = s.allPeersOffline(rg) + isMember = rg.isMember(ourID) + } + } + // Capture consumer leader here. + isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName) + // Also capture if we think there is no meta leader. + var isLeaderLess bool + if !isLeader { + isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault } js.mu.RUnlock() @@ -4181,7 +4200,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } else if ca == nil { - if js.isLeaderless() { + if isLeaderLess { resp.Error = NewJSClusterNotAvailError() // Delaying an error response gives the leader a chance to respond before us s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil) @@ -4194,38 +4213,36 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } // Check to see if we are a member of the group and if the group has no leader. - if js.isGroupLeaderless(ca.Group) { + if isMember && js.isGroupLeaderless(ca.Group) { resp.Error = NewJSClusterNotAvailError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } // We have the consumer assigned and a leader, so only the consumer leader should answer. - if !acc.JetStreamIsConsumerLeader(streamName, consumerName) { - if js.isLeaderless() { + if !isConsumerLeader { + if isLeaderLess { resp.Error = NewJSClusterNotAvailError() // Delaying an error response gives the leader a chance to respond before us s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group) return } - // We have a consumer assignment. - js.mu.RLock() var node RaftNode var leaderNotPartOfGroup bool - var isMember bool - rg := ca.Group - if rg != nil && rg.isMember(ourID) { - isMember = true + // We have a consumer assignment. + if isMember { + js.mu.RLock() if rg.node != nil { node = rg.node if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) { leaderNotPartOfGroup = true } } + js.mu.RUnlock() } - js.mu.RUnlock() + // Check if we should ignore all together. if node == nil { // We have been assigned but have not created a node yet. If we are a member return @@ -4279,7 +4296,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - resp.ConsumerInfo = obs.info() + + if resp.ConsumerInfo = obs.info(); resp.ConsumerInfo == nil { + // This consumer returned nil which means it's closed. Respond with not found. + resp.Error = NewJSConsumerNotFoundError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 311dd0ecd39..67278f20c09 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -183,7 +183,7 @@ const ( func (s *Server) trackedJetStreamServers() (js, total int) { s.mu.RLock() defer s.mu.RUnlock() - if !s.running || !s.eventsEnabled() { + if !s.isRunning() || !s.eventsEnabled() { return -1, -1 } s.nodeToInfo.Range(func(k, v interface{}) bool { @@ -198,11 +198,12 @@ func (s *Server) trackedJetStreamServers() (js, total int) { } func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { - s.mu.RLock() - shutdown, js := s.shutdown, s.js - s.mu.RUnlock() + if s.isShuttingDown() { + return nil, nil + } - if shutdown || js == nil { + js := s.getJetStream() + if js == nil { return nil, nil } @@ -219,13 +220,7 @@ func (s *Server) JetStreamIsClustered() bool { } func (s *Server) JetStreamIsLeader() bool { - js := s.getJetStream() - if js == nil { - return false - } - js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isLeader() + return s.isMetaLeader.Load() } func (s *Server) JetStreamIsCurrent() bool { @@ -233,9 +228,20 @@ func (s *Server) JetStreamIsCurrent() bool { if js == nil { return false } + // Grab what we need and release js lock. js.mu.RLock() - defer js.mu.RUnlock() - return js.cluster.isCurrent() + var meta RaftNode + cc := js.cluster + if cc != nil { + meta = cc.meta + } + js.mu.RUnlock() + + if cc == nil { + // Non-clustered mode + return true + } + return meta.Current() } func (s *Server) JetStreamSnapshotMeta() error { @@ -381,19 +387,6 @@ func (cc *jetStreamCluster) isLeader() bool { return cc.meta != nil && cc.meta.Leader() } -// isCurrent will determine if this node is a leader or an up to date follower. -// Read lock should be held. -func (cc *jetStreamCluster) isCurrent() bool { - if cc == nil { - // Non-clustered mode - return true - } - if cc.meta == nil { - return false - } - return cc.meta.Current() -} - // isStreamCurrent will determine if the stream is up to date. // For R1 it will make sure the stream is present on this server. // Read lock should be held. @@ -643,9 +636,8 @@ func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) { if js == nil { return nil, nil, nil } - js.mu.RLock() + // Lock not needed, set on creation. s := js.srv - js.mu.RUnlock() return s, js, jsa } @@ -751,7 +743,7 @@ func (js *jetStream) setupMetaGroup() error { storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) fs, err := newFileStoreWithCreated( - FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false}, + FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, srv: s}, StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName), @@ -762,9 +754,6 @@ func (js *jetStream) setupMetaGroup() error { return err } - // Register our server. - fs.registerServer(s) - cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs} // If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint, @@ -859,10 +848,8 @@ func (js *jetStream) getMetaGroup() RaftNode { } func (js *jetStream) server() *Server { - js.mu.RLock() - s := js.srv - js.mu.RUnlock() - return s + // Lock not needed, only set once on creation. + return js.srv } // Will respond if we do not think we have a metacontroller leader. @@ -1240,6 +1227,7 @@ func (js *jetStream) monitorCluster() { // Make sure to stop the raft group on exit to prevent accidental memory bloat. defer n.Stop() + defer s.isMetaLeader.Store(false) const compactInterval = time.Minute t := time.NewTicker(compactInterval) @@ -1727,6 +1715,11 @@ func (js *jetStream) processAddPeer(peer string) { } func (js *jetStream) processRemovePeer(peer string) { + // We may be already disabled. + if js == nil || js.disabled.Load() { + return + } + js.mu.Lock() s, cc := js.srv, js.cluster if cc == nil || cc.meta == nil { @@ -1736,14 +1729,8 @@ func (js *jetStream) processRemovePeer(peer string) { isLeader := cc.isLeader() // All nodes will check if this is them. isUs := cc.meta.ID() == peer - disabled := js.disabled js.mu.Unlock() - // We may be already disabled. - if disabled { - return - } - if isUs { s.Errorf("JetStream being DISABLED, our server was removed from the cluster") adv := &JSServerRemovedAdvisory{ @@ -2028,7 +2015,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor var store StreamStore if storage == FileStorage { fs, err := newFileStoreWithCreated( - FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, + FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s}, StreamConfig{Name: rg.Name, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name), @@ -2038,8 +2025,6 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor s.Errorf("Error creating filestore WAL: %v", err) return err } - // Register our server. - fs.registerServer(s) store = fs } else { ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage}) @@ -2229,7 +2214,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil { lastState, lastSnapTime = curState, time.Now() } else if err != errNoSnapAvailable && err != errNodeClosed { - s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) + s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } @@ -2423,6 +2408,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // We are not current, but current means exactly caught up. Under heavy publish // loads we may never reach this, so check if we are within 90% caught up. _, c, a := mset.node.Progress() + if c == 0 { + mset.mu.Unlock() + continue + } if p := float64(a) / float64(c) * 100.0; p < syncThreshold { mset.mu.Unlock() continue @@ -2716,6 +2705,11 @@ func (mset *stream) resetClusteredState(err error) bool { if sa != nil { js.mu.Lock() + if js.shuttingDown { + js.mu.Unlock() + return + } + s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) // Now wipe groups from assignments. sa.Group.node = nil @@ -2929,6 +2923,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // If we are the leader or recovering, meaning we own the snapshot, // we should stepdown and clear our raft state since our snapshot is bad. if isRecovering || mset.IsLeader() { + mset.mu.RLock() + s, accName, streamName := mset.srv, mset.acc.GetName(), mset.cfg.Name + mset.mu.RUnlock() + s.Warnf("Detected bad stream state, resetting '%s > %s'", accName, streamName) mset.resetClusteredState(err) } } @@ -3900,6 +3898,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { s, cc := js.srv, js.cluster accName, stream, consumerName := ca.Client.serviceAccount(), ca.Stream, ca.Name noMeta := cc == nil || cc.meta == nil + shuttingDown := js.shuttingDown var ourID string if !noMeta { ourID = cc.meta.ID() @@ -3910,7 +3909,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { } js.mu.RUnlock() - if s == nil || noMeta { + if s == nil || noMeta || shuttingDown { return } @@ -5275,21 +5274,31 @@ func (js *jetStream) stopUpdatesSub() { } func (js *jetStream) processLeaderChange(isLeader bool) { + if js == nil { + return + } + s := js.srv + if s == nil { + return + } + // Update our server atomic. + s.isMetaLeader.Store(isLeader) + if isLeader { - js.srv.Noticef("Self is new JetStream cluster metadata leader") + s.Noticef("Self is new JetStream cluster metadata leader") } else { var node string if meta := js.getMetaGroup(); meta != nil { node = meta.GroupLeader() } if node == _EMPTY_ { - js.srv.Noticef("JetStream cluster no metadata leader") + s.Noticef("JetStream cluster no metadata leader") } else if srv := js.srv.serverNameForNode(node); srv == _EMPTY_ { - js.srv.Noticef("JetStream cluster new remote metadata leader") + s.Noticef("JetStream cluster new remote metadata leader") } else if clst := js.srv.clusterNameForNode(node); clst == _EMPTY_ { - js.srv.Noticef("JetStream cluster new metadata leader: %s", srv) + s.Noticef("JetStream cluster new metadata leader: %s", srv) } else { - js.srv.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) + s.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) } } @@ -5310,7 +5319,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) { for acc, asa := range cc.streams { for _, sa := range asa { if sa.Sync == _EMPTY_ { - js.srv.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) + s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name) nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client} nsa.Sync = syncSubjForStream() cc.meta.Propose(encodeUpdateStreamAssignment(nsa)) @@ -7719,8 +7728,8 @@ func (mset *stream) isCurrent() bool { return mset.node.Current() && !mset.catchup } -// Maximum requests for the whole server that can be in flight. -const maxConcurrentSyncRequests = 8 +// Maximum requests for the whole server that can be in flight at the same time. +const maxConcurrentSyncRequests = 16 var ( errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected") @@ -7897,11 +7906,11 @@ RETRY: // Grab sync request again on failures. if sreq == nil { - mset.mu.Lock() + mset.mu.RLock() var state StreamState mset.store.FastState(&state) sreq = mset.calculateSyncRequest(&state, snap) - mset.mu.Unlock() + mset.mu.RUnlock() if sreq == nil { return nil } @@ -8134,19 +8143,18 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { s := js.srv if rg == nil || rg.node == nil { return &ClusterInfo{ - Name: s.ClusterName(), + Name: s.cachedClusterName(), Leader: s.Name(), } } - n := rg.node + n := rg.node ci := &ClusterInfo{ - Name: s.ClusterName(), + Name: s.cachedClusterName(), Leader: s.serverNameForNode(n.GroupLeader()), } now := time.Now() - id, peers := n.ID(), n.Peers() // If we are leaderless, do not suppress putting us in the peer list. @@ -8267,7 +8275,7 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() - sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg + sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg isLeader := mset.isLeader() mset.mu.RUnlock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go index 47bb8087a37..d5d41c53362 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go @@ -681,11 +681,11 @@ func (s *Server) startLeafNodeAcceptLoop() { port = 0 } - s.mu.Lock() - if s.shutdown { - s.mu.Unlock() + if s.isShuttingDown() { return } + + s.mu.Lock() hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port)) l, e := natsListen("tcp", hp) s.leafNodeListenerErr = e @@ -878,7 +878,7 @@ func (s *Server) addLeafNodeURL(urlStr string) bool { func (s *Server) removeLeafNodeURL(urlStr string) bool { // Don't need to do this if we are removing the route connection because // we are shuting down... - if s.shutdown { + if s.isShuttingDown() { return false } if s.leafURLsMap.removeUrl(urlStr) { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index 7a01cfe21b5..66f5e81a363 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -1465,14 +1465,14 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { // We want to do that outside of the lock. pse.ProcUsage(&pcpu, &rss, &vss) - s.mu.Lock() - js := s.js + s.mu.RLock() // We need to create a new instance of Varz (with no reference // whatsoever to anything stored in the server) since the user // has access to the returned value. v := s.createVarz(pcpu, rss) - s.mu.Unlock() - if js != nil { + s.mu.RUnlock() + + if js := s.getJetStream(); js != nil { s.updateJszVarz(js, &v.JetStream, true) } @@ -1798,7 +1798,6 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { // Use server lock to create/update the server's varz object. s.mu.Lock() var created bool - js := s.js s.httpReqStats[VarzPath]++ if s.varz == nil { s.varz = s.createVarz(pcpu, rss) @@ -1809,19 +1808,20 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() // Since locking is jetStream -> Server, need to update jetstream // varz outside of server lock. - if js != nil { + + if js := s.getJetStream(); js != nil { var v JetStreamVarz // Work on stack variable s.updateJszVarz(js, &v, created) // Now update server's varz - s.mu.Lock() + s.mu.RLock() sv := &s.varz.JetStream if created { sv.Config = v.Config } sv.Stats = v.Stats sv.Meta = v.Meta - s.mu.Unlock() + s.mu.RUnlock() } // Do the marshaling outside of server lock, but under varzMu lock. @@ -2835,10 +2835,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } jsa.mu.RUnlock() - if optStreams { + if js := s.getJetStream(); js != nil && optStreams { for _, stream := range streams { rgroup := stream.raftGroup() - ci := s.js.clusterInfo(rgroup) + ci := js.clusterInfo(rgroup) var cfg *StreamConfig if optCfg { c := stream.config() @@ -2884,7 +2884,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, } func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { - if s.js == nil { + js := s.getJetStream() + if js == nil { return nil, fmt.Errorf("jetstream not enabled") } acc := opts.Account @@ -2892,9 +2893,9 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not found", acc) } - s.js.mu.RLock() - jsa, ok := s.js.accounts[account.(*Account).Name] - s.js.mu.RUnlock() + js.mu.RLock() + jsa, ok := js.accounts[account.(*Account).Name] + js.mu.RUnlock() if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } @@ -2916,7 +2917,7 @@ func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo { Peers: peerList, node: node, } - return s.js.clusterInfo(group) + return s.getJetStream().clusterInfo(group) } // Jsz returns a Jsz structure containing information about JetStream. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go index bc93ca5ddaf..c0d17f26fd3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go @@ -425,6 +425,10 @@ type mqttParsedPublishNATSHeader struct { } func (s *Server) startMQTT() { + if s.isShuttingDown() { + return + } + sopts := s.getOpts() o := &sopts.MQTT @@ -437,10 +441,6 @@ func (s *Server) startMQTT() { } hp := net.JoinHostPort(o.Host, strconv.Itoa(port)) s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } s.mqtt.sessmgr.sessions = make(map[string]*mqttAccountSessionManager) hl, err = net.Listen("tcp", hp) s.mqtt.listenerErr = err @@ -499,8 +499,8 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client { c.mu.Unlock() s.mu.Lock() - if !s.running || s.ldm { - if s.shutdown { + if !s.isRunning() || s.ldm { + if s.isShuttingDown() { conn.Close() } s.mu.Unlock() @@ -3915,6 +3915,14 @@ func (c *client) mqttEnqueuePubResponse(packetType byte, pi uint16, trace bool) proto := [4]byte{packetType, 0x2, 0, 0} proto[2] = byte(pi >> 8) proto[3] = byte(pi) + + // Bits 3,2,1 and 0 of the fixed header in the PUBREL Control Packet are + // reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat + // any other value as malformed and close the Network Connection [MQTT-3.6.1-1]. + if packetType == mqttPacketPubRel { + proto[0] |= 0x2 + } + c.mu.Lock() c.enqueueProto(proto[:4]) c.mu.Unlock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index 508587bb4cf..039f982c027 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -307,6 +307,7 @@ type Options struct { Websocket WebsocketOpts `json:"-"` MQTT MQTTOpts `json:"-"` ProfPort int `json:"-"` + ProfBlockRate int `json:"-"` PidFile string `json:"-"` PortsFileDir string `json:"-"` LogFile string `json:"-"` @@ -394,6 +395,9 @@ type Options struct { // OCSP Cache config enables next-gen cache for OCSP features OCSPCacheConfig *OCSPResponseCacheConfig + + // Used to mark that we had a top level authorization block. + authBlockDefined bool } // WebsocketOpts are options for websocket @@ -884,7 +888,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error *errors = append(*errors, err) return } - + o.authBlockDefined = true o.Username = auth.user o.Password = auth.pass o.Authorization = auth.token @@ -1013,6 +1017,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error o.PortsFileDir = v.(string) case "prof_port": o.ProfPort = int(v.(int64)) + case "prof_block_rate": + o.ProfBlockRate = int(v.(int64)) case "max_control_line": if v.(int64) > 1<<31-1 { err := &configErr{tk, fmt.Sprintf("%s value is too big", k)} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 076b8e0df94..7baa949dfc4 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -134,6 +134,7 @@ type raft struct { track bool werr error state RaftState + isLeader atomic.Bool hh hash.Hash64 snapfile string csz int @@ -588,14 +589,15 @@ func (s *Server) stepdownRaftNodes() { s.Debugf("Stepping down all leader raft nodes") } for _, n := range s.raftNodes { - if n.Leader() { - nodes = append(nodes, n) - } + nodes = append(nodes, n) } s.rnMu.RUnlock() for _, node := range nodes { - node.StepDown() + if node.Leader() { + node.StepDown() + } + node.SetObserver(true) } } @@ -652,9 +654,9 @@ func (s *Server) transferRaftLeaders() bool { // This should only be called on the leader. func (n *raft) Propose(data []byte) error { n.RLock() - if n.state != Leader { + if state := n.state; state != Leader { n.RUnlock() - n.debug("Proposal ignored, not leader (state: %v)", n.state) + n.debug("Proposal ignored, not leader (state: %v)", state) return errNotLeader } // Error if we had a previous write error. @@ -1157,14 +1159,12 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) { } // Leader returns if we are the leader for our group. +// We use an atomic here now vs acquiring the read lock. func (n *raft) Leader() bool { if n == nil { return false } - n.RLock() - isLeader := n.state == Leader - n.RUnlock() - return isLeader + return n.isLeader.Load() } func (n *raft) isCatchingUp() bool { @@ -1687,8 +1687,7 @@ func (n *raft) run() { // We want to wait for some routing to be enabled, so we will wait for // at least a route, leaf or gateway connection to be established before // starting the run loop. - gw := s.gateway - for { + for gw := s.gateway; ; { s.mu.Lock() ready := s.numRemotes()+len(s.leafs) > 0 if !ready && gw.enabled { @@ -3830,6 +3829,9 @@ func (n *raft) quorumNeeded() int { // Lock should be held. func (n *raft) updateLeadChange(isLeader bool) { + // Update our atomic about being the leader. + n.isLeader.Store(isLeader) + // We don't care about values that have not been consumed (transitory states), // so we dequeue any state that is pending and push the new one. for { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/reload.go b/vendor/github.com/nats-io/nats-server/v2/server/reload.go index ec22911bf70..239881715e1 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/reload.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/reload.go @@ -805,6 +805,16 @@ func (o *mqttInactiveThresholdReload) Apply(s *Server) { s.Noticef("Reloaded: MQTT consumer_inactive_threshold = %v", o.newValue) } +type profBlockRateReload struct { + noopOption + newValue int +} + +func (o *profBlockRateReload) Apply(s *Server) { + s.setBlockProfileRate(o.newValue) + s.Noticef("Reloaded: block_prof_rate = %v", o.newValue) +} + type leafNodeOption struct { noopOption tlsFirstChanged bool @@ -1589,6 +1599,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &ocspOption{newValue: newValue.(*OCSPConfig)}) case "ocspcacheconfig": diffOpts = append(diffOpts, &ocspResponseCacheOption{newValue: newValue.(*OCSPResponseCacheConfig)}) + case "profblockrate": + new := newValue.(int) + old := oldValue.(int) + if new != old { + diffOpts = append(diffOpts, &profBlockRateReload{newValue: new}) + } default: // TODO(ik): Implement String() on those options to have a nice print. // %v is difficult to figure what's what, %+v print private fields and @@ -1857,7 +1873,7 @@ func (s *Server) reloadAuthorization() { awcsti, _ = s.configureAccounts(true) s.configureAuthorization() // Double check any JetStream configs. - checkJetStream = s.js != nil + checkJetStream = s.getJetStream() != nil } else if opts.AccountResolver != nil { s.configureResolver() if _, ok := s.accResolver.(*MemAccResolver); ok { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/route.go b/vendor/github.com/nats-io/nats-server/v2/server/route.go index dfda145bfdc..493f03e4748 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/route.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/route.go @@ -653,11 +653,14 @@ func (c *client) processRouteInfo(info *Info) { // We receive an INFO from a server that informs us about another server, // so the info.ID in the INFO protocol does not match the ID of this route. if remoteID != _EMPTY_ && remoteID != info.ID { + // We want to know if the existing route supports pooling/pinned-account + // or not when processing the implicit route. + noPool := c.route.noPool c.mu.Unlock() // Process this implicit route. We will check that it is not an explicit // route and/or that it has not been connected already. - s.processImplicitRoute(info) + s.processImplicitRoute(info, noPool) return } @@ -812,10 +815,14 @@ func (c *client) processRouteInfo(info *Info) { } } // For accounts that are configured to have their own route: - // If this is a solicit route, we already have c.route.accName set in createRoute. + // If this is a solicited route, we already have c.route.accName set in createRoute. // For non solicited route (the accept side), we will set the account name that // is present in the INFO protocol. - if !didSolicit { + if didSolicit && len(c.route.accName) > 0 { + // Set it in the info.RouteAccount so that addRoute can use that + // and we properly gossip that this is a route for an account. + info.RouteAccount = string(c.route.accName) + } else if !didSolicit && info.RouteAccount != _EMPTY_ { c.route.accName = []byte(info.RouteAccount) } accName := string(c.route.accName) @@ -977,7 +984,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) { func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) { // If there are no clients supporting async INFO protocols, we are done. // Also don't send if we are shutting down... - if s.cproto == 0 || s.shutdown { + if s.cproto == 0 || s.isShuttingDown() { return } info := s.copyInfo() @@ -1002,7 +1009,7 @@ func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) { // This will process implicit route information received from another server. // We will check to see if we have configured or are already connected, // and if so we will ignore. Otherwise we will attempt to connect. -func (s *Server) processImplicitRoute(info *Info) { +func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { remoteID := info.ID s.mu.Lock() @@ -1012,8 +1019,16 @@ func (s *Server) processImplicitRoute(info *Info) { if remoteID == s.info.ID { return } + + // Snapshot server options. + opts := s.getOpts() + // Check if this route already exists if accName := info.RouteAccount; accName != _EMPTY_ { + // If we don't support pooling/pinned account, bail. + if opts.Cluster.PoolSize <= 0 { + return + } if remotes, ok := s.accRoutes[accName]; ok { if r := remotes[remoteID]; r != nil { return @@ -1034,13 +1049,22 @@ func (s *Server) processImplicitRoute(info *Info) { return } - // Snapshot server options. - opts := s.getOpts() - if info.AuthRequired { r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password) } s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) }) + // If we are processing an implicit route from a route that does not + // support pooling/pinned-accounts, we won't receive an INFO for each of + // the pinned-accounts that we would normally receive. In that case, just + // initiate routes for all our configured pinned accounts. + if routeNoPool && info.RouteAccount == _EMPTY_ && len(opts.Cluster.PinnedAccounts) > 0 { + // Copy since we are going to pass as closure to a go routine. + rURL := r + for _, an := range opts.Cluster.PinnedAccounts { + accName := an + s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) }) + } + } } // hasThisRouteConfigured returns true if info.Host:info.Port is present @@ -1071,7 +1095,10 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { s.forEachRemote(func(r *client) { r.mu.Lock() - if r.route.remoteID != info.ID { + // If this is a new route for a given account, do not send to a server + // that does not support pooling/pinned-accounts. + if r.route.remoteID != info.ID && + (info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) { r.enqueueProto(infoJSON) } r.mu.Unlock() @@ -1834,7 +1861,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string id := info.ID s.mu.Lock() - if !s.running || s.routesReject { + if !s.isRunning() || s.routesReject { s.mu.Unlock() return false } @@ -1855,7 +1882,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // server and need to handle things differently. if info.RoutePoolSize <= 0 || opts.Cluster.PoolSize < 0 { if accName != _EMPTY_ { - invProtoErr = fmt.Sprintf("Not possible to have a dedicate route for account %q between those servers", accName) + invProtoErr = fmt.Sprintf("Not possible to have a dedicated route for account %q between those servers", accName) // In this case, make sure this route does not attempt to reconnect c.setNoReconnect() } else { @@ -2302,6 +2329,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del // is detected that the server has already been shutdown. // It will also start soliciting explicit routes. func (s *Server) startRouteAcceptLoop() { + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() @@ -2316,10 +2347,6 @@ func (s *Server) startRouteAcceptLoop() { clusterName := s.ClusterName() s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } s.Noticef("Cluster name is %s", clusterName) if s.isClusterNameDynamic() { s.Warnf("Cluster name was dynamically generated, consider setting one") @@ -2654,7 +2681,9 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error // We will take on their name since theirs is configured or higher then ours. srv.setClusterName(proto.Cluster) if !proto.Dynamic { - srv.getOpts().Cluster.Name = proto.Cluster + srv.optsMu.Lock() + srv.opts.Cluster.Name = proto.Cluster + srv.optsMu.Unlock() } c.mu.Lock() remoteID := c.opts.Name @@ -2729,6 +2758,7 @@ func (s *Server) removeRoute(c *client) { opts = s.getOpts() rURL *url.URL noPool bool + didSolicit bool ) c.mu.Lock() cid := c.cid @@ -2747,6 +2777,7 @@ func (s *Server) removeRoute(c *client) { connectURLs = r.connectURLs wsConnectURLs = r.wsConnURLs rURL = r.url + didSolicit = r.didSolicit } c.mu.Unlock() if accName != _EMPTY_ { @@ -2805,10 +2836,18 @@ func (s *Server) removeRoute(c *client) { if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) { s.sendAsyncLeafNodeInfo() } - // If this server has pooling and the route for this remote - // was a "no pool" route, attempt to reconnect. - if s.routesPoolSize > 1 && noPool { - s.startGoRoutine(func() { s.connectToRoute(rURL, true, true, _EMPTY_) }) + // If this server has pooling/pinned accounts and the route for + // this remote was a "no pool" route, attempt to reconnect. + if noPool { + if s.routesPoolSize > 1 { + s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) }) + } + if len(opts.Cluster.PinnedAccounts) > 0 { + for _, an := range opts.Cluster.PinnedAccounts { + accName := an + s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) }) + } + } } } // This is for gateway code. Remove this route from a map that uses diff --git a/vendor/github.com/nats-io/nats-server/v2/server/server.go b/vendor/github.com/nats-io/nats-server/v2/server/server.go index 5664e77c0e4..d1d0d109d49 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -131,13 +131,14 @@ type Server struct { configFile string optsMu sync.RWMutex opts *Options - running bool - shutdown bool + running atomic.Bool + shutdown atomic.Bool listener net.Listener listenerErr error gacc *Account sys *internal - js *jetStream + js atomic.Pointer[jetStream] + isMetaLeader atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32 @@ -572,13 +573,18 @@ func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration // with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so // this is more versatile. func s2WriterOptions(cm string) []s2.WriterOption { + _opts := [2]s2.WriterOption{} + opts := append( + _opts[:0], + s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines + ) switch cm { case CompressionS2Uncompressed: - return []s2.WriterOption{s2.WriterUncompressed()} + return append(opts, s2.WriterUncompressed()) case CompressionS2Best: - return []s2.WriterOption{s2.WriterBestCompression()} + return append(opts, s2.WriterBestCompression()) case CompressionS2Better: - return []s2.WriterOption{s2.WriterBetterCompression()} + return append(opts, s2.WriterBetterCompression()) default: return nil } @@ -1234,8 +1240,8 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) // If we have defined a system account here check to see if its just us and the $G account. // We would do this to add user/pass to the system account. If this is the case add in // no-auth-user for $G. - // Only do this if non-operator mode. - if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ { + // Only do this if non-operator mode and we did not have an authorization block defined. + if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined { // If we come here from config reload, let's not recreate the fake user name otherwise // it will cause currently clients to be disconnected. uname := s.sysAccOnlyNoAuthUser @@ -1267,6 +1273,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) // Setup the account resolver. For memory resolver, make sure the JWTs are // properly formed but do not enforce expiration etc. +// Lock is held on entry, but may be released/reacquired during this call. func (s *Server) configureResolver() error { opts := s.getOpts() s.accResolver = opts.AccountResolver @@ -1281,7 +1288,12 @@ func (s *Server) configureResolver() error { } } if len(opts.resolverPreloads) > 0 { - if s.accResolver.IsReadOnly() { + // Lock ordering is account resolver -> server, so we need to release + // the lock and reacquire it when done with account resolver's calls. + ar := s.accResolver + s.mu.Unlock() + defer s.mu.Lock() + if ar.IsReadOnly() { return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR") } for k, v := range opts.resolverPreloads { @@ -1289,7 +1301,7 @@ func (s *Server) configureResolver() error { if err != nil { return fmt.Errorf("preload account error for %q: %v", k, err) } - s.accResolver.Store(k, v) + ar.Store(k, v) } } } @@ -1477,10 +1489,7 @@ func (s *Server) Running() bool { // Protected check on running state func (s *Server) isRunning() bool { - s.mu.RLock() - running := s.running - s.mu.RUnlock() - return running + return s.running.Load() } func (s *Server) logPid() error { @@ -2078,8 +2087,8 @@ func (s *Server) Start() { s.checkAuthforWarnings() // Avoid RACE between Start() and Shutdown() + s.running.Store(true) s.mu.Lock() - s.running = true // Update leafNodeEnabled in case options have changed post NewServer() // and before Start() (we should not be able to allow that, but server has // direct reference to user-provided options - at least before a Reload() is @@ -2096,6 +2105,10 @@ func (s *Server) Start() { // Pprof http endpoint for the profiler. if opts.ProfPort != 0 { s.StartProfiler() + } else { + // It's still possible to access this profile via a SYS endpoint, so set + // this anyway. (Otherwise StartProfiler would have called it.) + s.setBlockProfileRate(opts.ProfBlockRate) } if opts.ConfigFile != _EMPTY_ { @@ -2338,6 +2351,10 @@ func (s *Server) Start() { s.startOCSPResponseCache() } +func (s *Server) isShuttingDown() bool { + return s.shutdown.Load() +} + // Shutdown will shutdown the server instance by kicking out the AcceptLoop // and closing all associated clients. func (s *Server) Shutdown() { @@ -2357,20 +2374,20 @@ func (s *Server) Shutdown() { // eventing items associated with accounts. s.shutdownEventing() - s.mu.Lock() // Prevent issues with multiple calls. - if s.shutdown { - s.mu.Unlock() + if s.isShuttingDown() { return } + + s.mu.Lock() s.Noticef("Initiating Shutdown...") accRes := s.accResolver opts := s.getOpts() - s.shutdown = true - s.running = false + s.shutdown.Store(true) + s.running.Store(false) s.grMu.Lock() s.grRunning = false s.grMu.Unlock() @@ -2380,7 +2397,7 @@ func (s *Server) Shutdown() { accRes.Close() } - // Now check jetstream. + // Now check and shutdown jetstream. s.shutdownJetStream() // Now shutdown the nodes @@ -2533,16 +2550,15 @@ func (s *Server) AcceptLoop(clr chan struct{}) { } }() + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() // Setup state that can enable shutdown s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } - hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) l, e := natsListen("tcp", hp) s.listenerErr = e @@ -2665,6 +2681,10 @@ func (s *Server) setInfoHostPort() error { // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { + if s.isShuttingDown() { + return + } + // Snapshot server options. opts := s.getOpts() @@ -2676,12 +2696,7 @@ func (s *Server) StartProfiler() { } s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } hp := net.JoinHostPort(opts.Host, strconv.Itoa(port)) - l, err := net.Listen("tcp", hp) if err != nil { @@ -2699,14 +2714,13 @@ func (s *Server) StartProfiler() { s.profiler = l s.profilingServer = srv + s.setBlockProfileRate(opts.ProfBlockRate) + go func() { // if this errors out, it's probably because the server is being shutdown err := srv.Serve(l) if err != nil { - s.mu.Lock() - shutdown := s.shutdown - s.mu.Unlock() - if !shutdown { + if !s.isShuttingDown() { s.Fatalf("error starting profiler: %s", err) } } @@ -2716,6 +2730,15 @@ func (s *Server) StartProfiler() { s.mu.Unlock() } +func (s *Server) setBlockProfileRate(rate int) { + // Passing i ProfBlockRate <= 0 here will disable or > 0 will enable. + runtime.SetBlockProfileRate(rate) + + if rate > 0 { + s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate) + } +} + // StartHTTPMonitoring will enable the HTTP monitoring port. // DEPRECATED: Should use StartMonitoring. func (s *Server) StartHTTPMonitoring() { @@ -2804,6 +2827,10 @@ func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, er // Start the monitoring server func (s *Server) startMonitoring(secure bool) error { + if s.isShuttingDown() { + return nil + } + // Snapshot server options. opts := s.getOpts() @@ -2885,11 +2912,6 @@ func (s *Server) startMonitoring(secure bool) error { ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0), } s.mu.Lock() - if s.shutdown { - httpListener.Close() - s.mu.Unlock() - return nil - } s.http = httpListener s.httpHandler = mux s.monitoringServer = srv @@ -2897,10 +2919,7 @@ func (s *Server) startMonitoring(secure bool) error { go func() { if err := srv.Serve(httpListener); err != nil { - s.mu.Lock() - shutdown := s.shutdown - s.mu.Unlock() - if !shutdown { + if !s.isShuttingDown() { s.Fatalf("Error starting monitor on %q: %v", hp, err) } } @@ -3036,13 +3055,13 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { // list of connections to close. It won't contain this one, so we need // to bail out now otherwise the readLoop started down there would not // be interrupted. Skip also if in lame duck mode. - if !s.running || s.ldm { + if !s.isRunning() || s.ldm { // There are some tests that create a server but don't start it, // and use "async" clients and perform the parsing manually. Such // clients would branch here (since server is not running). However, // when a server was really running and has been shutdown, we must // close this connection. - if s.shutdown { + if s.isShuttingDown() { conn.Close() } s.mu.Unlock() @@ -3590,22 +3609,28 @@ func (s *Server) String() string { type pprofLabels map[string]string +func setGoRoutineLabels(tags ...pprofLabels) { + var labels []string + for _, m := range tags { + for k, v := range m { + labels = append(labels, k, v) + } + } + if len(labels) > 0 { + pprof.SetGoroutineLabels( + pprof.WithLabels(context.Background(), pprof.Labels(labels...)), + ) + } +} + func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool { var started bool s.grMu.Lock() defer s.grMu.Unlock() if s.grRunning { - var labels []string - for _, m := range tags { - for k, v := range m { - labels = append(labels, k, v) - } - } s.grWG.Add(1) go func() { - pprof.SetGoroutineLabels( - pprof.WithLabels(context.Background(), pprof.Labels(labels...)), - ) + setGoRoutineLabels(tags...) f() }() started = true @@ -3944,11 +3969,12 @@ func (s *Server) isLameDuckMode() bool { } // This function will close the client listener then close the clients -// at some interval to avoid a reconnecting storm. +// at some interval to avoid a reconnect storm. +// We will also transfer any raft leaders and shutdown JetStream. func (s *Server) lameDuckMode() { s.mu.Lock() // Check if there is actually anything to do - if s.shutdown || s.ldm || s.listener == nil { + if s.isShuttingDown() || s.ldm || s.listener == nil { s.mu.Unlock() return } @@ -3985,6 +4011,12 @@ func (s *Server) lameDuckMode() { } } + // Now check and shutdown jetstream. + s.shutdownJetStream() + + // Now shutdown the nodes + s.shutdownRaftNodes() + // Wait for accept loops to be done to make sure that no new // client can connect for i := 0; i < expected; i++ { @@ -3993,7 +4025,7 @@ func (s *Server) lameDuckMode() { s.mu.Lock() // Need to recheck few things - if s.shutdown || len(s.clients) == 0 { + if s.isShuttingDown() || len(s.clients) == 0 { s.mu.Unlock() // If there is no client, we need to call Shutdown() to complete // the LDMode. If server has been shutdown while lock was released, diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index 2180dc960a3..72ca5f354d1 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -246,8 +246,9 @@ type stream struct { mirror *sourceInfo // Sources - sources map[string]*sourceInfo - sourceRetries map[string]*time.Timer + sources map[string]*sourceInfo + sourceRetries map[string]*time.Timer + sourcesConsumerSetup *time.Timer // Indicates we have direct consumers. directs int @@ -404,12 +405,21 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } // Make sure we are ok when these are done in parallel. - v, loaded := jsa.inflight.LoadOrStore(cfg.Name, &sync.WaitGroup{}) + // We used to call Add(1) in the "else" clause of the "if loaded" + // statement. This caused a data race because it was possible + // that one go routine stores (with count==0) and another routine + // gets "loaded==true" and calls wg.Wait() while the other routine + // then calls wg.Add(1). It also could mean that two routines execute + // the rest of the code concurrently. + swg := &sync.WaitGroup{} + swg.Add(1) + v, loaded := jsa.inflight.LoadOrStore(cfg.Name, swg) wg := v.(*sync.WaitGroup) if loaded { wg.Wait() + // This waitgroup is "thrown away" (since there was an existing one). + swg.Done() } else { - wg.Add(1) defer func() { jsa.inflight.Delete(cfg.Name) wg.Done() @@ -812,6 +822,11 @@ func (mset *stream) setLeader(isLeader bool) error { return err } } else { + // cancel timer to create the source consumers if not fired yet + if mset.sourcesConsumerSetup != nil { + mset.sourcesConsumerSetup.Stop() + mset.sourcesConsumerSetup = nil + } // Stop responding to sync requests. mset.stopClusterSubs() // Unsubscribe from direct stream. @@ -2380,7 +2395,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() { } // To make *sure* that the next request will not fail, add a bit of buffer // and some randomness. - next += time.Duration(rand.Intn(50)) + 10*time.Millisecond + next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond time.AfterFunc(next, func() { mset.mu.Lock() mset.setupMirrorConsumer() @@ -2620,9 +2635,10 @@ func (mset *stream) setupMirrorConsumer() error { if !mset.srv.startGoRoutine( func() { mset.processMirrorMsgs(mirror, &ready) }, pprofLabels{ - "type": "mirror", - "account": mset.acc.Name, - "stream": mset.cfg.Name, + "type": "mirror", + "account": mset.acc.Name, + "stream": mset.cfg.Name, + "consumer": mirror.cname, }, ) { ready.Done() @@ -2727,7 +2743,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6 } // To make *sure* that the next request will not fail, add a bit of buffer // and some randomness. - next += time.Duration(rand.Intn(50)) + 10*time.Millisecond + next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime) } @@ -2950,9 +2966,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T if !mset.srv.startGoRoutine( func() { mset.processSourceMsgs(si, &ready) }, pprofLabels{ - "type": "source", - "account": mset.acc.Name, - "stream": mset.cfg.Name, + "type": "source", + "account": mset.acc.Name, + "stream": mset.cfg.Name, + "consumer": si.cname, }, ) { ready.Done() @@ -3286,16 +3303,9 @@ func (mset *stream) setStartingSequenceForSource(iName string, external *Externa } } -// Lock should be held. -// This will do a reverse scan on startup or leader election -// searching for the starting sequence number. -// This can be slow in degenerative cases. -// Lock should be held. -func (mset *stream) startingSequenceForSources() { - if len(mset.cfg.Sources) == 0 { - return - } - // Always reset here. +// lock should be held. +// Resets the SourceInfo for all the sources +func (mset *stream) resetSourceInfo() { mset.sources = make(map[string]*sourceInfo) for _, ssi := range mset.cfg.Sources { @@ -3322,6 +3332,20 @@ func (mset *stream) startingSequenceForSources() { } mset.sources[ssi.iname] = si } +} + +// Lock should be held. +// This will do a reverse scan on startup or leader election +// searching for the starting sequence number. +// This can be slow in degenerative cases. +// Lock should be held. +func (mset *stream) startingSequenceForSources() { + if len(mset.cfg.Sources) == 0 { + return + } + + // Always reset here. + mset.resetSourceInfo() var state StreamState mset.store.FastState(&state) @@ -3405,6 +3429,11 @@ func (mset *stream) setupSourceConsumers() error { } } + // If we are no longer the leader, give up + if !mset.isLeader() { + return nil + } + mset.startingSequenceForSources() // Setup our consumers at the proper starting position. @@ -3430,13 +3459,35 @@ func (mset *stream) subscribeToStream() error { } // Check if we need to setup mirroring. if mset.cfg.Mirror != nil { - if err := mset.setupMirrorConsumer(); err != nil { - return err + // setup the initial mirror sourceInfo + mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name} + sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms)) + trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms)) + + for i, tr := range mset.cfg.Mirror.SubjectTransforms { + // will not fail as already checked before that the transform will work + subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination) + if err != nil { + mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) + } + + sfs[i] = tr.Source + trs[i] = subjectTransform } + mset.mirror.sfs = sfs + mset.mirror.trs = trs + // delay the actual mirror consumer creation for after a delay + mset.scheduleSetupMirrorConsumerRetryAsap() } else if len(mset.cfg.Sources) > 0 { - if err := mset.setupSourceConsumers(); err != nil { - return err - } + // Setup the initial source infos for the sources + mset.resetSourceInfo() + // Delay the actual source consumer(s) creation(s) for after a delay + + mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() { + mset.mu.Lock() + mset.setupSourceConsumers() + mset.mu.Unlock() + }) } // Check for direct get access. // We spin up followers for clustered streams in monitorStream(). @@ -3656,14 +3707,14 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { fsCfg.Cipher = s.getOpts().JetStreamCipher } oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name) - fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf, oldprf) + cfg := *fsCfg + cfg.srv = s + fs, err := newFileStoreWithCreated(cfg, mset.cfg, mset.created, prf, oldprf) if err != nil { mset.mu.Unlock() return err } mset.store = fs - // Register our server. - fs.registerServer(s) } // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go index 6f12f89148b..a78ff86c3c3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go @@ -540,6 +540,7 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { if doLock { s.RLock() } + cacheEnabled := s.cache != nil r, ok := s.cache[subject] if doLock { s.RUnlock() @@ -574,7 +575,11 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { var n int if doLock { - s.Lock() + if cacheEnabled { + s.Lock() + } else { + s.RLock() + } } matchLevel(s.root, tokens, result) @@ -582,16 +587,20 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { if len(result.psubs) == 0 && len(result.qsubs) == 0 { result = emptyResult } - if s.cache != nil { + if cacheEnabled { s.cache[subject] = result n = len(s.cache) } if doLock { - s.Unlock() + if cacheEnabled { + s.Unlock() + } else { + s.RUnlock() + } } // Reduce the cache count if we have exceeded our set maximum. - if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) { + if cacheEnabled && n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) { go s.reduceCacheCount() } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/websocket.go b/vendor/github.com/nats-io/nats-server/v2/server/websocket.go index 6bf82305c1c..014a1d72fc8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/websocket.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/websocket.go @@ -1050,6 +1050,10 @@ func (s *Server) wsConfigAuth(opts *WebsocketOpts) { } func (s *Server) startWebsocketServer() { + if s.isShuttingDown() { + return + } + sopts := s.getOpts() o := &sopts.Websocket @@ -1071,10 +1075,6 @@ func (s *Server) startWebsocketServer() { // avoid the possibility of it being "intercepted". s.mu.Lock() - if s.shutdown { - s.mu.Unlock() - return - } // Do not check o.NoTLS here. If a TLS configuration is available, use it, // regardless of NoTLS. If we don't have a TLS config, it means that the // user has configured NoTLS because otherwise the server would have failed @@ -1220,8 +1220,8 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client { c.mu.Unlock() s.mu.Lock() - if !s.running || s.ldm { - if s.shutdown { + if !s.isRunning() || s.ldm { + if s.isShuttingDown() { conn.Close() } s.mu.Unlock() @@ -1295,8 +1295,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mfs > 0 && c.ws.nocompfrag { mfs = 0 } - buf := &bytes.Buffer{} - + buf := bytes.NewBuffer(nbPoolGet(usz)) cp := c.ws.compressor if cp == nil { c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed) @@ -1331,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p[:lp]) } - new := nbPoolGet(wsFrameSizeForBrowsers) - lp = copy(new[:wsFrameSizeForBrowsers], p[:lp]) - bufs = append(bufs, fh[:n], new[:lp]) + bufs = append(bufs, fh[:n], p[:lp]) csz += n + lp p = p[lp:] } @@ -1343,15 +1340,16 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mask { wsMaskBuf(key, p) } - bufs = append(bufs, h) - for len(p) > 0 { - new := nbPoolGet(len(p)) - n := copy(new[:cap(new)], p) - bufs = append(bufs, new[:n]) - p = p[n:] + if ol > 0 { + bufs = append(bufs, h, p) } csz = len(h) + ol } + // Make sure that the compressor no longer holds a reference to + // the bytes.Buffer, so that the underlying memory gets cleaned + // up after flushOutbound/flushAndClose. For this to be safe, we + // always cp.Reset(...) before reusing the compressor again. + cp.Reset(nil) // Add to pb the compressed data size (including headers), but // remove the original uncompressed data size that was added // during the queueing. @@ -1362,14 +1360,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if mfs > 0 { // We are limiting the frame size. startFrame := func() int { - bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize]) + bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)) return len(bufs) - 1 } endFrame := func(idx, size int) { + bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize] n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size) + bufs[idx] = bufs[idx][:n] c.out.pb += int64(n) c.ws.fs += int64(n + size) - bufs[idx] = bufs[idx][:n] if mask { wsMaskBufs(key, bufs[idx+1:]) } @@ -1395,10 +1394,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if endStart { fhIdx = startFrame() } - new := nbPoolGet(total) - n := copy(new[:cap(new)], b[:total]) - bufs = append(bufs, new[:n]) - b = b[n:] + bufs = append(bufs, b[:total]) + b = b[total:] } } if total > 0 { diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index 962109d907a..368797051b0 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -1,12 +1,12 @@ language: go go: +- "1.21.x" - "1.20.x" -- "1.19.x" go_import_path: github.com/nats-io/nats.go install: - go get -t ./... - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin -- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then +- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then go install github.com/mattn/goveralls@latest; go install github.com/wadey/gocovmerge@latest; go install honnef.co/go/tools/cmd/staticcheck@latest; @@ -15,27 +15,22 @@ install: before_script: - $(exit $(go fmt ./... | wc -l)) - go vet -modfile=go_test.mod ./... -- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then +- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then find . -type f -name "*.go" | xargs misspell -error -locale US; GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...; fi - golangci-lint run ./jetstream/... script: - go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off -- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi after_success: -- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi jobs: include: - - name: "Go: 1.20.x (nats-server@dev)" - go: "1.20.x" - before_script: - - go get -modfile go_test.mod github.com/nats-io/nats-server/v2@dev - - name: "Go: 1.20.x (nats-server@main)" - go: "1.20.x" + - name: "Go: 1.21.x (nats-server@main)" + go: "1.21.x" before_script: - go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main allow_failures: - - name: "Go: 1.20.x (nats-server@dev)" - - name: "Go: 1.20.x (nats-server@main)" + - name: "Go: 1.21.x (nats-server@main)" diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index b4978701b2d..108db4e35c3 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -29,7 +29,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.29.0 +go get github.com/nats-io/nats.go/@v1.30.2 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index af69c5ab23f..8902c1edd77 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -4,19 +4,19 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.16.5 - github.com/nats-io/nats-server/v2 v2.9.19 - github.com/nats-io/nkeys v0.4.4 + github.com/klauspost/compress v1.17.0 + github.com/nats-io/nats-server/v2 v2.10.0 + github.com/nats-io/nkeys v0.4.5 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.2.1 - golang.org/x/text v0.9.0 + golang.org/x/text v0.13.0 google.golang.org/protobuf v1.23.0 ) require ( github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.4.1 // indirect - golang.org/x/crypto v0.9.0 // indirect - golang.org/x/sys v0.8.0 // indirect + github.com/nats-io/jwt/v2 v2.5.2 // indirect + golang.org/x/crypto v0.13.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index b9c9fd47681..ce4ba9205c3 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -10,29 +10,30 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= -github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= -github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.19 h1:OF9jSKZGo425C/FcVVIvNgpd36CUe7aVTTXEZRJk6kA= -github.com/nats-io/nats-server/v2 v2.9.19/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= -github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= -github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg= +github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index a8deb98d6e9..7fdb0131c36 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -1118,6 +1118,7 @@ type ConsumerConfig struct { MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` + FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` @@ -1143,6 +1144,11 @@ type ConsumerConfig struct { Replicas int `json:"num_replicas"` // Force memory storage. MemoryStorage bool `json:"mem_storage,omitempty"` + + // Metadata is additional metadata for the Consumer. + // Keys starting with `_nats` are reserved. + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -1176,10 +1182,11 @@ type SequencePair struct { // nextRequest is for getting next messages for pull based consumers from JetStream. type nextRequest struct { - Expires time.Duration `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - NoWait bool `json:"no_wait,omitempty"` - MaxBytes int `json:"max_bytes,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + NoWait bool `json:"no_wait,omitempty"` + MaxBytes int `json:"max_bytes,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` } // jsSub includes JetStream subscription info. @@ -2469,6 +2476,7 @@ func EnableFlowControl() SubOpt { } // IdleHeartbeat enables push based consumers to have idle heartbeats delivered. +// For pull consumers, idle heartbeat has to be set on each [Fetch] call. func IdleHeartbeat(duration time.Duration) SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.Heartbeat = duration @@ -2568,6 +2576,16 @@ func ConsumerName(name string) SubOpt { }) } +// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer. +// It has to be used in conjunction with [nats.BindStream] and +// with empty 'subject' parameter. +func ConsumerFilterSubjects(subjects ...string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.FilterSubjects = subjects + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2588,6 +2606,7 @@ type pullOpts struct { maxBytes int ttl time.Duration ctx context.Context + hb time.Duration } // PullOpt are the options that can be passed when pulling a batch of messages. @@ -2603,6 +2622,16 @@ func PullMaxWaiting(n int) SubOpt { }) } +type PullHeartbeat time.Duration + +func (h PullHeartbeat) configurePull(opts *pullOpts) error { + if h <= 0 { + return fmt.Errorf("%w: idle heartbeat has to be greater than 0", ErrInvalidArg) + } + opts.hb = time.Duration(h) + return nil +} + // PullMaxBytes defines the max bytes allowed for a fetch request. type PullMaxBytes int @@ -2646,6 +2675,11 @@ func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) { if !checkSts { return } + + // if it's a heartbeat message, report as not user msg + if isHb, _ := isJSControlMessage(msg); isHb { + return + } switch val { case noResponders: err = ErrNoResponders @@ -2732,7 +2766,6 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { ) if ctx == nil { ctx, cancel = context.WithTimeout(context.Background(), ttl) - defer cancel() } else if _, hasDeadline := ctx.Deadline(); !hasDeadline { // Prevent from passing the background context which will just block // and cannot be canceled either. @@ -2743,7 +2776,17 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // If the context did not have a deadline, then create a new child context // that will use the default timeout from the JS context. ctx, cancel = context.WithTimeout(ctx, ttl) - defer cancel() + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + // if heartbeat is set, validate it against the context timeout + if o.hb > 0 { + deadline, _ := ctx.Deadline() + if 2*o.hb >= time.Until(deadline) { + return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg) + } } // Check if context not done already before making the request. @@ -2783,6 +2826,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { msgs = append(msgs, msg) } } + var hbTimer *time.Timer + var hbErr error if err == nil && len(msgs) < batch { // For batch real size of 1, it does not make sense to set no_wait in // the request. @@ -2813,8 +2858,26 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { nr.Expires = expires nr.NoWait = noWait nr.MaxBytes = o.maxBytes + if 2*o.hb < expires { + nr.Heartbeat = o.hb + } else { + nr.Heartbeat = 0 + } req, _ := json.Marshal(nr) - return nc.PublishRequest(nms, rply, req) + if err := nc.PublishRequest(nms, rply, req); err != nil { + return err + } + if o.hb > 0 { + if hbTimer == nil { + hbTimer = time.AfterFunc(2*o.hb, func() { + hbErr = ErrNoHeartbeat + cancel() + }) + } else { + hbTimer.Reset(2 * o.hb) + } + } + return nil } err = sendReq() @@ -2822,6 +2885,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // Ask for next message and wait if there are no messages msg, err = sub.nextMsgWithContext(ctx, true, true) if err == nil { + if hbTimer != nil { + hbTimer.Reset(2 * o.hb) + } var usrMsg bool usrMsg, err = checkMsg(msg, true, noWait) @@ -2840,9 +2906,15 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { } } } + if hbTimer != nil { + hbTimer.Stop() + } } // If there is at least a message added to msgs, then need to return OK and no error if err != nil && len(msgs) == 0 { + if hbErr != nil { + return nil, hbErr + } return nil, o.checkCtxErr(err) } return msgs, nil @@ -2970,14 +3042,24 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e // If the context did not have a deadline, then create a new child context // that will use the default timeout from the JS context. ctx, cancel = context.WithTimeout(ctx, ttl) + } else { + ctx, cancel = context.WithCancel(ctx) } defer func() { // only cancel the context here if we are sure the fetching goroutine has not been started yet - if cancel != nil && cancelContext { + if cancelContext { cancel() } }() + // if heartbeat is set, validate it against the context timeout + if o.hb > 0 { + deadline, _ := ctx.Deadline() + if 2*o.hb >= time.Until(deadline) { + return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg) + } + } + // Check if context not done already before making the request. select { case <-ctx.Done(): @@ -3031,9 +3113,10 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e requestBatch := batch - len(result.msgs) req := nextRequest{ - Expires: expires, - Batch: requestBatch, - MaxBytes: o.maxBytes, + Expires: expires, + Batch: requestBatch, + MaxBytes: o.maxBytes, + Heartbeat: o.hb, } reqJSON, err := json.Marshal(req) if err != nil { @@ -3051,11 +3134,17 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e result.err = err return result, nil } + var hbTimer *time.Timer + var hbErr error + if o.hb > 0 { + hbTimer = time.AfterFunc(2*o.hb, func() { + hbErr = ErrNoHeartbeat + cancel() + }) + } cancelContext = false go func() { - if cancel != nil { - defer cancel() - } + defer cancel() var requestMsgs int for requestMsgs < requestBatch { // Ask for next message and wait if there are no messages @@ -3063,6 +3152,9 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e if err != nil { break } + if hbTimer != nil { + hbTimer.Reset(2 * o.hb) + } var usrMsg bool usrMsg, err = checkMsg(msg, true, false) @@ -3082,7 +3174,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e } } if err != nil { - result.err = o.checkCtxErr(err) + if hbErr != nil { + result.err = hbErr + } else { + result.err = o.checkCtxErr(err) + } } close(result.msgs) result.done <- struct{}{} @@ -3654,6 +3750,53 @@ func (st *StorageType) UnmarshalJSON(data []byte) error { return nil } +type StoreCompression uint8 + +const ( + NoCompression StoreCompression = iota + S2Compression +) + +func (alg StoreCompression) String() string { + switch alg { + case NoCompression: + return "None" + case S2Compression: + return "S2" + default: + return "Unknown StoreCompression" + } +} + +func (alg StoreCompression) MarshalJSON() ([]byte, error) { + var str string + switch alg { + case S2Compression: + str = "s2" + case NoCompression: + str = "none" + default: + return nil, fmt.Errorf("unknown compression algorithm") + } + return json.Marshal(str) +} + +func (alg *StoreCompression) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err != nil { + return err + } + switch str { + case "s2": + *alg = S2Compression + case "none": + *alg = NoCompression + default: + return fmt.Errorf("unknown compression algorithm") + } + return nil +} + // Length of our hash used for named consumers. const nameHashLen = 8 diff --git a/vendor/github.com/nats-io/nats.go/jserrors.go b/vendor/github.com/nats-io/nats.go/jserrors.go index d7959ca8387..c8b1f5fc627 100644 --- a/vendor/github.com/nats-io/nats.go/jserrors.go +++ b/vendor/github.com/nats-io/nats.go/jserrors.go @@ -33,6 +33,26 @@ var ( // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} + // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} + + // ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"} + // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} @@ -42,6 +62,15 @@ var ( // ErrBadRequest is returned when invalid request is sent to JetStream API. ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}} + // ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer. + ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} + + // ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer. + ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}} + + // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. + ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} + // Client errors // ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists. @@ -62,6 +91,11 @@ var ( // ErrConsumerNameRequired is returned when the provided consumer durable name is empty. ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"} + // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting + // multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid + // configuration was already created in the server. + ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} + // ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer. ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"} @@ -104,6 +138,9 @@ var ( // ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"} + // ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer. + ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"} + // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") @@ -120,13 +157,17 @@ const ( JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 - JSErrCodeConsumerNotFound ErrorCode = 10014 - JSErrCodeConsumerNameExists ErrorCode = 10013 - JSErrCodeConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeConsumerNotFound ErrorCode = 10014 + JSErrCodeConsumerNameExists ErrorCode = 10013 + JSErrCodeConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 + JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 + JSErrCodeConsumerEmptyFilter ErrorCode = 10139 JSErrCodeMessageNotFound ErrorCode = 10037 - JSErrCodeBadRequest ErrorCode = 10003 + JSErrCodeBadRequest ErrorCode = 10003 + JSStreamInvalidConfig ErrorCode = 10052 JSErrCodeStreamWrongLastSequence ErrorCode = 10071 ) diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index c6684692b5f..266bf0665f7 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -102,30 +102,35 @@ type JetStreamManager interface { // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. type StreamConfig struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` - DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` - MaxAge time.Duration `json:"max_age"` - MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` - Sealed bool `json:"sealed,omitempty"` - DenyDelete bool `json:"deny_delete,omitempty"` - DenyPurge bool `json:"deny_purge,omitempty"` - AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + Discard DiscardPolicy `json:"discard"` + DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Template string `json:"template_owner,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` + DenyDelete bool `json:"deny_delete,omitempty"` + DenyPurge bool `json:"deny_purge,omitempty"` + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + Compression StoreCompression `json:"compression"` + FirstSeq uint64 `json:"first_seq,omitempty"` + + // Allow applying a subject transform to incoming messages before doing anything else. + SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` // Allow republish of the message after being sequenced and stored. RePublish *RePublish `json:"republish,omitempty"` @@ -134,6 +139,20 @@ type StreamConfig struct { AllowDirect bool `json:"allow_direct"` // Allow higher performance and unified direct access for mirrors as well. MirrorDirect bool `json:"mirror_direct"` + + // Limits for consumers on this stream. + ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` + + // Metadata is additional metadata for the Stream. + // Keys starting with `_nats` are reserved. + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` +} + +// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received. +type SubjectTransformConfig struct { + Source string `json:"src,omitempty"` + Destination string `json:"dest"` } // RePublish is for republishing messages once committed to a stream. The original @@ -152,12 +171,13 @@ type Placement struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - External *ExternalStream `json:"external,omitempty"` - Domain string `json:"-"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another @@ -167,6 +187,13 @@ type ExternalStream struct { DeliverPrefix string `json:"deliver,omitempty"` } +// StreamConsumerLimits are the limits for a consumer on a stream. +// These can be overridden on a per consumer basis. +type StreamConsumerLimits struct { + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` +} + // Helper for copying when we do not want to change user's version. func (ss *StreamSource) copy() *StreamSource { nss := *ss @@ -407,6 +434,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o } return nil, info.Error } + + // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo + if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 { + return nil, ErrConsumerMultipleFilterSubjectsNotSupported + } return info.ConsumerInfo, nil } @@ -780,6 +812,21 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned ConsumerInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Config.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported + } + } + } + return resp.StreamInfo, nil } @@ -897,11 +944,13 @@ type StreamAlternate struct { // StreamSourceInfo shows information about an upstream stream source. type StreamSourceInfo struct { - Name string `json:"name"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` - External *ExternalStream `json:"external"` - Error *APIError `json:"error"` + Name string `json:"name"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + External *ExternalStream `json:"external"` + Error *APIError `json:"error"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamState is information about the given stream. @@ -973,6 +1022,23 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error } return nil, resp.Error } + + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Config.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported + } + } + } + return resp.StreamInfo, nil } diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index 17c9e2fd5f9..7382f4d872c 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -432,14 +432,21 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { scfg.Mirror = m scfg.MirrorDirect = true } else if len(cfg.Sources) > 0 { - // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. for _, ss := range cfg.Sources { - if !strings.HasPrefix(ss.Name, kvBucketNamePre) { - ss = ss.copy() + var sourceBucketName string + if strings.HasPrefix(ss.Name, kvBucketNamePre) { + sourceBucketName = ss.Name[len(kvBucketNamePre):] + } else { + sourceBucketName = ss.Name ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name) } + + if ss.External == nil || sourceBucketName != cfg.Bucket { + ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}} + } scfg.Sources = append(scfg.Sources, ss) } + scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } else { scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} } diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 36fb9df294e..82b79730a9c 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -47,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.29.0" + Version = "1.30.2" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -3168,8 +3168,10 @@ func (nc *Conn) processMsg(data []byte) { } } - // Skip processing if this is a control message. - if !ctrlMsg { + // Skip processing if this is a control message and + // if not a pull consumer heartbeat. For pull consumers, + // heartbeats have to be handled on per request basis. + if !ctrlMsg || (jsi != nil && jsi.pull) { var chanSubCheckFC bool // Subscription internal stats (applicable only for non ChanSubscription's) if sub.typ != ChanSubscription { diff --git a/vendor/github.com/nats-io/nats.go/object.go b/vendor/github.com/nats-io/nats.go/object.go index 86b72abddd3..f6ba8fb1642 100644 --- a/vendor/github.com/nats-io/nats.go/object.go +++ b/vendor/github.com/nats-io/nats.go/object.go @@ -149,6 +149,10 @@ type ObjectStoreConfig struct { Storage StorageType `json:"storage,omitempty"` Replicas int `json:"num_replicas,omitempty"` Placement *Placement `json:"placement,omitempty"` + + // Bucket-specific metadata + // NOTE: Metadata requires nats-server v2.10.0+ + Metadata map[string]string `json:"metadata,omitempty"` } type ObjectStoreStatus interface { @@ -168,6 +172,8 @@ type ObjectStoreStatus interface { Size() uint64 // BackingStore provides details about the underlying storage BackingStore() string + // Metadata is the user supplied metadata for the bucket + Metadata() map[string]string } // ObjectMetaOptions @@ -178,9 +184,10 @@ type ObjectMetaOptions struct { // ObjectMeta is high level information about an object. type ObjectMeta struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Headers Header `json:"headers,omitempty"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Headers Header `json:"headers,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` // Optional options. Opts *ObjectMetaOptions `json:"options,omitempty"` @@ -272,6 +279,7 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) { Discard: DiscardNew, AllowRollup: true, AllowDirect: true, + Metadata: cfg.Metadata, } // Create our stream. @@ -966,6 +974,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { info.Name = meta.Name info.Description = meta.Description info.Headers = meta.Headers + info.Metadata = meta.Metadata // Prepare the meta message if err = publishMeta(info, obs.js); err != nil { @@ -1189,6 +1198,9 @@ func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes } // BackingStore indicates what technology is used for storage of the bucket func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" } +// Metadata is the metadata supplied when creating the bucket +func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.Metadata } + // StreamInfo is the stream info retrieved to create the status func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo } diff --git a/vendor/modules.txt b/vendor/modules.txt index 450a6ec7800..c75bb39854d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1345,7 +1345,7 @@ github.com/mschoch/smat # github.com/nats-io/jwt/v2 v2.5.2 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.1 +# github.com/nats-io/nats-server/v2 v2.10.2 ## explicit; go 1.20 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/ldap @@ -1356,8 +1356,8 @@ github.com/nats-io/nats-server/v2/server/certidp github.com/nats-io/nats-server/v2/server/certstore github.com/nats-io/nats-server/v2/server/pse github.com/nats-io/nats-server/v2/server/sysmem -# github.com/nats-io/nats.go v1.29.0 -## explicit; go 1.19 +# github.com/nats-io/nats.go v1.30.2 +## explicit; go 1.20 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/internal/parser