diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 93f681c323d..5bd3cf6c322 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3303,7 +3303,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } if isRecovering || !mset.IsLeader() { - if err := mset.processSnapshot(ss); err != nil { + if err := mset.processSnapshot(ss, ce.Index); err != nil { return err } } @@ -8343,11 +8343,12 @@ type streamSyncRequest struct { FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` DeleteRangesOk bool `json:"delete_ranges"` + MinApplied uint64 `json:"min_applied"` } // Given a stream state that represents a snapshot, calculate the sync request based on our current state. // Stream lock must be held. -func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState) *streamSyncRequest { +func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState, index uint64) *streamSyncRequest { // Shouldn't happen, but consequences are pretty bad if we have the lock held and // our caller tries to take the lock again on panic defer, as in processSnapshot. if state == nil || snap == nil || mset.node == nil { @@ -8357,7 +8358,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplica if state.LastSeq >= snap.LastSeq { return nil } - return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true} + return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true, MinApplied: index} } // processSnapshotDeletes will update our current store based on the snapshot @@ -8493,7 +8494,7 @@ var ( ) // Process a stream snapshot. -func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { +func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (e error) { // Update any deletes, etc. mset.processSnapshotDeletes(snap) mset.setCLFS(snap.Failed) @@ -8501,7 +8502,7 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { mset.mu.Lock() var state StreamState mset.store.FastState(&state) - sreq := mset.calculateSyncRequest(&state, snap) + sreq := mset.calculateSyncRequest(&state, snap, index) s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name) @@ -8639,7 +8640,7 @@ RETRY: mset.mu.RLock() var state StreamState mset.store.FastState(&state) - sreq = mset.calculateSyncRequest(&state, snap) + sreq = mset.calculateSyncRequest(&state, snap, index) mset.mu.RUnlock() if sreq == nil { return nil @@ -9187,6 +9188,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq + + // The follower received a snapshot from another leader, and we've become leader since. + // We have an up-to-date log but could be behind on applies. We must wait until we've reached the minimum required. + // The follower will automatically retry after a timeout, so we can safely return here. + if node := mset.raftNode(); node != nil { + index, _, applied := node.Progress() + // Only skip if our log has enough entries, and they could be applied in the future. + if index >= sreq.MinApplied && applied < sreq.MinApplied { + return + } + // We know here we've either applied enough entries, or our log doesn't have enough entries. + // In the latter case the request expects us to have more. Just continue and value availability here. + // This should only be possible if the logs have already desynced, and we shouldn't have become leader + // in the first place. Not much we can do here in this (hypothetical) scenario. + } + mset.setCatchupPeer(sreq.Peer, last-seq) // Check if we can compress during this. diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 06adf875c5d..c882d5a9d0e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6815,6 +6815,94 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) { } } +func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + // Reconnect to stream leader. + l := c.streamLeader(globalAccountName, "TEST") + nc.Close() + nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + // Setup wiretap and grab stream. + sendSubject := "test-wiretap" + sub, err := nc.SubscribeSync(sendSubject) + require_NoError(t, err) + err = nc.Flush() // Must flush, otherwise our subscription could be too late. + require_NoError(t, err) + acc, err := l.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + // We have a message at sequence 1, so expect a successful catchup. + sreq1 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true} + require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq1) })) + // Expect the message at sequence 1. + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, entryOp(msg.Data[0]), streamMsgOp) + subj, _, _, _, seq, _, err := decodeStreamMsg(msg.Data[1:]) + require_NoError(t, err) + require_Equal(t, seq, 1) + require_Equal(t, subj, "foo") + // And end with EOF. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Len(t, len(msg.Data), 0) + + // Add one additional entry into the log that's not applied yet. + n := mset.node.(*raft) + n.Lock() + ae := n.buildAppendEntry(nil) + err = n.storeToWAL(ae) + n.Unlock() + index, commit, applied := n.Progress() + require_NoError(t, err) + require_LessThan(t, applied, index) + require_Equal(t, commit, applied) + // We have a message at sequence 1, but we haven't applied as many append entries. + // We can't fulfill the request right now as we don't know yet if + // that message will be deleted as part of upcoming append entries. + sreq2 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: index} + require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq2) })) + _, err = sub.NextMsg(time.Second) + require_Error(t, err, nats.ErrTimeout) + + // We have a message at sequence 1, but we haven't applied as many append entries. + // Also, we seem to have a log that doesn't contain enough entries, even though we became leader. + // Something has already gone wrong and got the logs to desync. + // Value availability here and just fulfill the request. + sreq3 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: 100} + require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq3) })) + // Expect the message at sequence 1. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, entryOp(msg.Data[0]), streamMsgOp) + subj, _, _, _, seq, _, err = decodeStreamMsg(msg.Data[1:]) + require_NoError(t, err) + require_Equal(t, seq, 1) + require_Equal(t, subj, "foo") + // And end with EOF. + msg, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_Len(t, len(msg.Data), 0) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index cd6465d6e5e..b4cbb57a59a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3997,8 +3997,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { // Processing a snapshot while there's no leader elected is considered a cluster reset. // If a leader is temporarily unavailable we shouldn't blow away our state. var snap StreamReplicatedState - snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot - err := mset.processSnapshot(&snap) + snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot + appliedIndex := uint64(0) // incorrect index, but doesn't matter for this test + err := mset.processSnapshot(&snap, appliedIndex) require_True(t, errors.Is(err, errCatchupAbortedNoLeader)) require_True(t, isClusterResetErr(err)) mset.resetClusteredState(err) diff --git a/server/raft.go b/server/raft.go index 48b9db2c453..23aa15d9233 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1651,7 +1651,7 @@ func (n *raft) State() RaftState { func (n *raft) Progress() (index, commit, applied uint64) { n.RLock() defer n.RUnlock() - return n.pindex + 1, n.commit, n.applied + return n.pindex, n.commit, n.applied } // Size returns number of entries and total bytes for our WAL.