diff --git a/.travis.yml b/.travis.yml index 28aa8fd393..f1337e3b75 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,8 @@ language: go go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - - "1.23.4" - - "1.22.10" + - "1.23.5" + - "1.22.11" go_import_path: github.com/nats-io/nats-server diff --git a/server/consumer.go b/server/consumer.go index f267718361..54033d2680 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1373,6 +1373,8 @@ func (o *consumer) setLeader(isLeader bool) { // If we were the leader make sure to drain queued up acks. if wasLeader { o.ackMsgs.drain() + // Reset amount of acks that need to be processed. + atomic.StoreInt64(&o.awl, 0) // Also remove any pending replies since we should not be the one to respond at this point. o.replies = nil } @@ -2860,7 +2862,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return ackInPlace + // Return true to let caller respond back to the client. + return true } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true @@ -3600,17 +3603,21 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } continue } - if seq > 0 { - pmsg := getJSPubMsgFromPool() - sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) - if sm == nil || err != nil { - pmsg.returnToPool() - pmsg, dc = nil, 0 - // Adjust back deliver count. - o.decDeliveryCount(seq) - } - return pmsg, dc, err + pmsg := getJSPubMsgFromPool() + sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) + if sm == nil || err != nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + // Adjust back deliver count. + o.decDeliveryCount(seq) + } + // Message was scheduled for redelivery but was removed in the meantime. + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + delete(o.pending, seq) + delete(o.rdc, seq) + continue } + return pmsg, dc, err } } diff --git a/server/filestore.go b/server/filestore.go index 245c68a7a2..4e0c28ca4a 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -495,7 +495,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if isPermissionError(err) { + return nil, err + } fs.startAgeChk() } @@ -1978,9 +1981,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -1992,7 +1995,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2008,8 +2011,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if isPermissionError(err) { + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2023,8 +2030,11 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) mb.mu.Unlock() + if isPermissionError(err) { + return err + } continue } @@ -2148,6 +2158,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3463,6 +3474,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { dios <- struct{}{} if err != nil { + if isPermissionError(err) { + return nil, err + } mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) } @@ -5469,16 +5483,23 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { <-dios tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms) dios <- struct{}{} + if err != nil { return fmt.Errorf("failed to create temporary file: %w", err) } + errorCleanup := func(err error) error { + tmpFD.Close() + os.Remove(tmpFN) + return err + } + // The original buffer at this point is uncompressed, so we will now compress // it if needed. Note that if the selected algorithm is NoCompression, the // Compress function will just return the input buffer unmodified. cmpBuf, err := alg.Compress(origBuf) if err != nil { - return fmt.Errorf("failed to compress block: %w", err) + return errorCleanup(fmt.Errorf("failed to compress block: %w", err)) } // We only need to write out the metadata header if compression is enabled. @@ -5496,7 +5517,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { if mb.bek != nil && len(cmpBuf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { - return err + return errorCleanup(err) } mb.bek = bek mb.bek.XORKeyStream(cmpBuf, cmpBuf) @@ -5504,11 +5525,6 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // Write the new block data (which might be compressed or encrypted) to the // temporary file. - errorCleanup := func(err error) error { - tmpFD.Close() - os.Remove(tmpFN) - return err - } if n, err := tmpFD.Write(cmpBuf); err != nil { return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err)) } else if n != len(cmpBuf) { @@ -7779,9 +7795,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -7803,13 +7819,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if isPermissionError(err) { + return err + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if isPermissionError(err) { + return err + } } } + return nil } // Remove a seq from the fss and select new first. @@ -8222,7 +8245,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if isPermissionError(err) && fs.srv != nil { + fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return + } + case <-qch: return } @@ -8430,7 +8461,11 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable isPermissionError is set to true dios <- struct{}{} + if isPermissionError(err) { + return err + } // Update dirty if successful. if err == nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index 458cef7a74..18493b6101 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math/bits" "math/rand" "os" @@ -143,9 +144,9 @@ func TestFileStoreBasics(t *testing.T) { func TestFileStoreMsgHeaders(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) defer fs.Stop() - subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8 if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen { @@ -8194,3 +8195,81 @@ func TestFileStoreNumPendingMulti(t *testing.T) { } require_Equal(t, total, checkTotal) } + +func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + changeDirectoryPermission(directory, READONLY_MODE) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg) + if err != nil { + break + } + } + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg) + if err != nil { + break + } + } + changeDirectoryPermission(directory, READONLY_MODE) + err = fs.writeFullState() + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func changeDirectoryPermission(directory string, mode fs.FileMode) error { + err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %q: %w", path, err) + } + + // Check if the path is a directory or file and set permissions accordingly + if info.IsDir() { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing directory permissions for %q: %w", path, err) + } + } else { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing file permissions for %q: %w", path, err) + } + } + return nil + }) + return err +} diff --git a/server/ipqueue.go b/server/ipqueue.go index b26a749ed7..95bf27457e 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -190,14 +190,16 @@ func (q *ipQueue[T]) len() int { } // Empty the queue and consumes the notification signal if present. +// Returns the number of items that were drained from the queue. // Note that this could cause a reader go routine that has been // notified that there is something in the queue (reading from queue's `ch`) // may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`. -func (q *ipQueue[T]) drain() { +func (q *ipQueue[T]) drain() int { if q == nil { - return + return 0 } q.Lock() + olen := len(q.elts) if q.elts != nil { q.resetAndReturnToPool(&q.elts) q.elts, q.pos = nil, 0 @@ -209,6 +211,7 @@ func (q *ipQueue[T]) drain() { default: } q.Unlock() + return olen } // Since the length of the queue goes to 0 after a pop(), it is good to diff --git a/server/jetstream.go b/server/jetstream.go index 2e606e6a6f..c1e709a19e 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2974,3 +2974,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) { cfg.Duplicates = 0 } } + +func (s *Server) handleWritePermissionError() { + //TODO Check if we should add s.jetStreamOOSPending in condition + if s.JetStreamEnabled() { + s.Errorf("File system permission denied while writing, disabling JetStream") + + go s.DisableJetStream() + + //TODO Send respective advisory if needed, same as in handleOutOfSpace + } +} diff --git a/server/jetstream_api.go b/server/jetstream_api.go index d47b6dcc90..e7fb21c1a1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -836,7 +836,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub limit := atomic.LoadInt64(&js.queueLimit) if pending >= int(limit) { s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) - s.jsAPIRoutedReqs.drain() + drained := int64(s.jsAPIRoutedReqs.drain()) + atomic.AddInt64(&js.apiInflight, -drained) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ TypedEvent: TypedEvent{ @@ -846,7 +847,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub }, Server: s.Name(), Domain: js.config.Domain, - Dropped: int64(pending), + Dropped: drained, }) } } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 8e62dbb12a..4f1b4ad310 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1526,3 +1526,159 @@ func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *t require_NoError(t, err) require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3)) } + +func TestJetStreamConsumerRetryAckAfterTimeout(t *testing.T) { + for _, ack := range []struct { + title string + policy nats.SubOpt + }{ + {title: "AckExplicit", policy: nats.AckExplicit()}, + {title: "AckAll", policy: nats.AckAll()}, + } { + t.Run(ack.title, func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + sub, err := js.PullSubscribe("foo", "CONSUMER", ack.policy) + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + + msg := msgs[0] + // Send core request so the client is unaware of the ack being sent. + _, err = nc.Request(msg.Reply, nil, time.Second) + require_NoError(t, err) + + // It could be we have already acked this specific message, but we haven't received the success response. + // Retrying the ack should not time out and still signal success. + err = msg.AckSync() + require_NoError(t, err) + }) + } +} + +func TestJetStreamConsumerSwitchLeaderDuringInflightAck(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + for i := 0; i < 2_000; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.MaxAckPending(2_000), + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(2*time.Second), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(2_000) + require_NoError(t, err) + require_Len(t, len(msgs), 2_000) + + // Simulate an ack being pushed, and o.setLeader(false) being called before the ack is processed and resets o.awl + atomic.AddInt64(&o.awl, 1) + o.setLeader(false) + o.setLeader(true) + + msgs, err = sub.Fetch(1, nats.MaxWait(5*time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) +} + +func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { + storageTypes := []nats.StorageType{nats.MemoryStorage, nats.FileStorage} + for _, storageType := range storageTypes { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storageType, + }) + require_NoError(t, err) + + for i := 0; i < 3; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(time.Second), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(3) + require_NoError(t, err) + require_Len(t, len(msgs), 3) + + err = js.DeleteMsg("TEST", 2) + require_NoError(t, err) + + o.mu.Lock() + defer o.mu.Unlock() + for seq := range o.rdc { + o.removeFromRedeliverQueue(seq) + } + + o.pending = make(map[uint64]*Pending) + o.pending[2] = &Pending{} + o.addToRedeliverQueue(2) + + _, _, err = o.getNextMsg() + require_Error(t, err, ErrStoreEOF) + require_Len(t, len(o.pending), 0) + require_Len(t, len(o.rdc), 0) + }) + } +} diff --git a/server/leafnode.go b/server/leafnode.go index 5f3fa4583f..6cd4b3c02f 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2443,7 +2443,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { } key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -2454,7 +2453,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -2477,7 +2475,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { if !spoke { // If we are routing add to the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, delta) - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } } @@ -2519,27 +2517,27 @@ func (c *client) processLeafUnsub(arg []byte) error { return nil } - updateGWs := false spoke := c.isSpokeLeafNode() // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] + if !ok { + // If not found, don't try to update routes/gws/leaf nodes. + c.mu.Unlock() + return nil + } delta := int32(1) - if ok && len(sub.queue) > 0 { + if len(sub.queue) > 0 { delta = sub.qw } c.mu.Unlock() - if ok { - c.unsubscribe(acc, sub, true, true) - updateGWs = srv.gateway.enabled - } - + c.unsubscribe(acc, sub, true, true) if !spoke { // If we are routing subtract from the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, -delta) // Gateways - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, -delta) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 47d84eb299..ceede72ecc 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4554,6 +4554,349 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } } +func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testing.T) { + SetGatewaysSolicitDelay(0) + defer ResetGatewaysSolicitDelay() + + // + // D + // | + // Leaf + // | + // v + // C + // ^ ^ + // / \ + // GW GW + // / \ + // v \ + // B1 <--- route ---> B2 <----*----------* + // ^ <---* | | + // | | Leaf Leaf + // Leaf *-- Leaf ---* | | + // | | | | + // A1 <--- route ---> A2 OTHER1 OTHER2 + // + + accs := ` + accounts { + SYS: {users: [{user:sys, password: pwd}]} + USER: {users: [{user:user, password: pwd}]} + } + system_account: SYS + ` + bConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: "B" + listen: "127.0.0.1:-1" + no_advertise: true + %s + } + gateway { + name: "B" + listen: "127.0.0.1:-1" + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + sb1Conf := createConfFile(t, []byte(fmt.Sprintf(bConf, accs, "B1", _EMPTY_))) + sb1, sb1o := RunServerWithConfig(sb1Conf) + defer sb1.Shutdown() + + sb2Conf := createConfFile(t, []byte(fmt.Sprintf(bConf, accs, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", sb1o.Cluster.Port)))) + sb2, sb2o := RunServerWithConfig(sb2Conf) + defer sb2.Shutdown() + + checkClusterFormed(t, sb1, sb2) + + cConf := ` + %s + server_name: C + listen: "127.0.0.1:-1" + cluster { + name: "C" + listen: "127.0.0.1:-1" + } + gateway { + name: "C" + listen: "127.0.0.1:-1" + gateways [ + { + name: B + url: "nats://127.0.0.1:%d" + } + ] + } + leafnodes { + listen: "127.0.0.1:-1" + } + ` + scConf := createConfFile(t, []byte(fmt.Sprintf(cConf, accs, sb1o.Gateway.Port))) + sc, sco := RunServerWithConfig(scConf) + defer sc.Shutdown() + + waitForOutboundGateways(t, sc, 1, 2*time.Second) + waitForOutboundGateways(t, sb1, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + waitForInboundGateways(t, sc, 2, 2*time.Second) + waitForInboundGateways(t, sb1, 1, 2*time.Second) + + dConf := ` + %s + server_name: D + listen: "127.0.0.1:-1" + cluster { + name: "D" + listen: "127.0.0.1:-1" + } + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + sdConf := createConfFile(t, []byte(fmt.Sprintf(dConf, accs, sco.LeafNode.Port))) + sd, _ := RunServerWithConfig(sdConf) + defer sd.Shutdown() + + checkLeafNodeConnected(t, sc) + checkLeafNodeConnected(t, sd) + + aConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: A + listen: "127.0.0.1:-1" + no_advertise: true + %s + } + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + a1Conf := createConfFile(t, []byte(fmt.Sprintf(aConf, accs, "A1", _EMPTY_, sb1o.LeafNode.Port))) + sa1, sa1o := RunServerWithConfig(a1Conf) + defer sa1.Shutdown() + + checkLeafNodeConnected(t, sa1) + checkLeafNodeConnected(t, sb1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(aConf, accs, "A2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", sa1o.Cluster.Port), sb1o.LeafNode.Port))) + sa2, _ := RunServerWithConfig(a2Conf) + defer sa2.Shutdown() + + checkClusterFormed(t, sa1, sa2) + checkLeafNodeConnected(t, sa2) + checkLeafNodeConnectedCount(t, sb1, 2) + + otherLeafsConf := ` + %s + server_name: %s + listen: "127.0.0.1:-1" + leafnodes { + remotes [ + { + url: "nats://user:pwd@127.0.0.1:%d" + account: USER + } + ] + } + ` + o1Conf := createConfFile(t, []byte(fmt.Sprintf(otherLeafsConf, accs, "OTHERLEAF1", sb2o.LeafNode.Port))) + so1, _ := RunServerWithConfig(o1Conf) + defer so1.Shutdown() + checkLeafNodeConnected(t, so1) + checkLeafNodeConnectedCount(t, sb2, 1) + + o2Conf := createConfFile(t, []byte(fmt.Sprintf(otherLeafsConf, accs, "OTHERLEAF2", sb2o.LeafNode.Port))) + so2, _ := RunServerWithConfig(o2Conf) + defer so2.Shutdown() + checkLeafNodeConnected(t, so2) + checkLeafNodeConnectedCount(t, sb2, 2) + + // Helper to check that the interest is propagated to all servers + checkInterest := func(t *testing.T, expected []int, expectedGW int32) { + t.Helper() + subj := "foo" + for i, s := range []*Server{sa1, sa2, so1, so2, sb1, sb2, sc, sd} { + if s == sc || !s.isRunning() { + continue + } + acc, err := s.LookupAccount("USER") + require_NoError(t, err) + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + n := acc.Interest(subj) + if n == expected[i] { + return nil + } + return fmt.Errorf("Expected interest count for server %q to be %v, got %v", s, expected[i], n) + }) + } + // For server C, need to check in gateway's account. + checkForRegisteredQSubInterest(t, sc, "B", "USER", "foo", expected[6], time.Second) + + // For server B1 and B2, check that we have the proper counts in the map. + for _, s := range []*Server{sb1, sb2} { + if !s.isRunning() { + continue + } + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + s.gateway.pasi.Lock() + accMap := s.gateway.pasi.m + st := accMap["USER"] + var n int32 + entry, ok := st["foo bar"] + if ok { + n = entry.n + } + s.gateway.pasi.Unlock() + if n == expectedGW { + return nil + } + return fmt.Errorf("Expected GW interest count for server %q to be %v, got %v", s, expectedGW, n) + }) + } + } + + ncA1 := natsConnect(t, sa1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA1.Close() + for i := 0; i < 3; i++ { + natsQueueSubSync(t, ncA1, "foo", "bar") + } + natsFlush(t, ncA1) + // With 3 queue subs on A1, we should have for servers (in order checked in checkInterest) + // for A1: 3 locals, for all others, 1 for the remote sub from A1. + // B1 and B2 GW map will be 3 (1 for each sub) + checkInterest(t, []int{3, 1, 1, 1, 1, 1, 1, 1}, 3) + + ncA2 := natsConnect(t, sa2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA2.Close() + ncA2qsub1 := natsQueueSubSync(t, ncA2, "foo", "bar") + ncA2qsub2 := natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + // A1 will have 1 more for remote sub, same for A2 (2 locals + 1 remote). + // B1 will have 2 interest (1 per leaf connection) + // B1 and B2 GW map goes to 5. + checkInterest(t, []int{4, 3, 1, 1, 2, 1, 1, 1}, 5) + + ncOther1 := natsConnect(t, so1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncOther1.Close() + natsQueueSubSync(t, ncOther1, "foo", "bar") + natsQueueSubSync(t, ncOther1, "foo", "bar") + natsFlush(t, ncOther1) + // A1, A2 will have one more because of routed interest + // O1 will have 3 (2 locals + 1 for remote interest) + // O2 has still 1 for remote interest + // B1 has 1 more because of new leaf interest and B2 because of routed interest. + // B1 and B2 GW map goes to 7. + checkInterest(t, []int{5, 4, 3, 1, 3, 2, 1, 1}, 7) + + ncOther2 := natsConnect(t, so2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncOther2.Close() + natsQueueSubSync(t, ncOther2, "foo", "bar") + natsFlush(t, ncOther2) + // O2 1 more for local interest + // B2 1 more for the new leaf interest + // B1 and B2 GW map goes to 8. + checkInterest(t, []int{5, 4, 3, 2, 3, 3, 1, 1}, 8) + + // Stop the server so1. + so1.Shutdown() + so1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sb2, 1) + // Now check interest still valid, but wait a little bit to make sure that + // even with the bug where we would send an RS- through the gateway, there + // would be enough time for it to propagate before we check for interest. + time.Sleep(250 * time.Millisecond) + // O1 is stopped, so expect 0 + // B2 has 1 less because leaf connection went away. + // B1 and B2 GW map goes down to 6. + checkInterest(t, []int{5, 4, 0, 2, 3, 2, 1, 1}, 6) + + // Store server sa1. + sa1.Shutdown() + sa1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sb1, 1) + time.Sleep(250 * time.Millisecond) + // A1 and O1 are gone, so 0 + // A2 has 1 less due to loss of routed interest + // B1 has 1 less because 1 leaf connection went away. + // B1 and B2 GW map goes down to 3. + checkInterest(t, []int{0, 3, 0, 2, 2, 2, 1, 1}, 3) + + // Now remove the queue subs from A2 + ncA2qsub1.Unsubscribe() + natsFlush(t, ncA2) + // A2 has 1 less + checkInterest(t, []int{0, 2, 0, 2, 2, 2, 1, 1}, 2) + + ncA2qsub2.Unsubscribe() + natsFlush(t, ncA2) + // A2 has 1 (no more locals but still interest for O2). + // O2 has 1 (no more for remote interest, only local). + // B1, B2 has 1 less since no interest from any of its leaf connections. + checkInterest(t, []int{0, 1, 0, 1, 1, 1, 1, 1}, 1) + + // Removing (closing connection) of the sub on O2 will remove + // interest globally. + ncOther2.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Resubscribe now, and again, interest should be propagated. + natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + checkInterest(t, []int{0, 1, 0, 1, 1, 1, 1, 1}, 1) + + natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + checkInterest(t, []int{0, 2, 0, 1, 1, 1, 1, 1}, 2) + + // Close the client connection that has the 2 queue subs. + ncA2.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Now we will test when a route is lost on a server that has gateway enabled + // that we update counts properly. + ncB2 := natsConnect(t, sb2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB2.Close() + natsQueueSubSync(t, ncB2, "foo", "bar") + natsQueueSubSync(t, ncB2, "foo", "bar") + natsQueueSubSync(t, ncB2, "foo", "bar") + natsFlush(t, ncB2) + checkInterest(t, []int{0, 1, 0, 1, 1, 3, 1, 1}, 3) + + ncB1 := natsConnect(t, sb1.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB1.Close() + natsQueueSubSync(t, ncB1, "foo", "bar") + natsQueueSubSync(t, ncB1, "foo", "bar") + checkInterest(t, []int{0, 1, 0, 1, 3, 4, 1, 1}, 5) + + // Now shutdown B2 + sb2.Shutdown() + sa1.WaitForShutdown() + time.Sleep(250 * time.Millisecond) + checkInterest(t, []int{0, 1, 0, 0, 2, 0, 1, 1}, 2) + + ncB1.Close() + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) +} + func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) { // Note that this is not what a normal configuration should be. Users should diff --git a/server/raft.go b/server/raft.go index fb70ee776b..427e8ce677 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1848,7 +1848,7 @@ runner: // just will remove them from the central monitoring map queues := []interface { unregister() - drain() + drain() int }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} for _, q := range queues { q.drain() @@ -3861,6 +3861,10 @@ func (n *raft) setWriteErrLocked(err error) { n.error("Critical write error: %v", err) n.werr = err + if isPermissionError(err) { + go n.s.handleWritePermissionError() + } + if isOutOfSpaceErr(err) { // For now since this can be happening all under the covers, we will call up and disable JetStream. go n.s.handleOutOfSpace(nil) diff --git a/server/route.go b/server/route.go index 0c455547c9..a865122e61 100644 --- a/server/route.go +++ b/server/route.go @@ -1348,8 +1348,6 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { return nil } - updateGWs := false - _keya := [128]byte{} _key := _keya[:0] @@ -1373,19 +1371,21 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { if ok { delete(c.subs, key) acc.sl.Remove(sub) - updateGWs = srv.gateway.enabled if len(sub.queue) > 0 { delta = sub.qw } } c.mu.Unlock() - if updateGWs { - srv.gatewayUpdateSubInterest(accountName, sub, -delta) - } + // Update gateways and leaf nodes only if the subscription was found. + if ok { + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(accountName, sub, -delta) + } - // Now check on leafnode updates. - acc.updateLeafNodes(sub, -delta) + // Now check on leafnode updates. + acc.updateLeafNodes(sub, -delta) + } if c.opts.Verbose { c.sendOK() @@ -1600,7 +1600,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // We use the sub.sid for the key of the c.subs map. key := bytesToString(sub.sid) osub := c.subs[key] - updateGWs := false if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1611,7 +1610,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { c.sendErr("Invalid Subscription") return nil } - updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. delta = sub.qw - atomic.LoadInt32(&osub.qw) @@ -1620,7 +1618,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } c.mu.Unlock() - if updateGWs { + if srv.gateway.enabled { srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } diff --git a/server/store.go b/server/store.go index 1c8f7f7ec1..2d72f69474 100644 --- a/server/store.go +++ b/server/store.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "os" "strings" "time" "unsafe" @@ -780,3 +781,7 @@ func copyString(s string) string { copy(b, s) return bytesToString(b) } + +func isPermissionError(err error) bool { + return err != nil && os.IsPermission(err) +} diff --git a/server/stream.go b/server/stream.go index e34f8cd4ba..a2883631d4 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4640,6 +4640,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err != nil { + if isPermissionError(err) { + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err) + return err + } // If we did not succeed put those values back and increment clfs in case we are clustered. var state StreamState mset.store.FastState(&state)