Skip to content

Commit

Permalink
[FIXED] Catchup must not extend past requested sequence range (#6038)
Browse files Browse the repository at this point in the history
When we're being asked to provide data within a range during catchup, we
should not extend that range and provide more data. Especially since
that range was defined by a snapshot, which also specifies which RAFT
entries should be sent after processing that snapshot. This would just
result in duplicated work and possible desync for the follower, so these
lines can safely be removed.

**Timeline of these lines:**
Previously when receiving a catchup request the `FirstSeq` could be
moved up to match what the state says:
[[IMPROVED] Catchup improvements
#3348](https://github.com/nats-io/nats-server/pull/3348/files#diff-5cb252c37caef12e7027803018861c82724b120ddb62cfedc2f36addf57f6970R7132-R7138)
(August, 2022)

Afterward this was removed in favor of only extending the `LastSeq`:
[[FIXED] KeyValue not found after server restarts
#5054](https://github.com/nats-io/nats-server/pull/5054/files#diff-5cb252c37caef12e7027803018861c82724b120ddb62cfedc2f36addf57f6970R8579-R8588)
(February, 2024)

This was done to solve for KeyValue not found issues.
However this change would have also fixed that case:
[[FIXED] Do not bump clfs on seq mismatch when before stream LastSeq
#5821](https://github.com/nats-io/nats-server/pull/5821/files#diff-2f4991438bb868a8587303cde9107f83127e88ad70bd19d5c6a31c238a20c299R4694-R4699)
(August, 2024)

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Oct 24, 2024
2 parents 2086858 + 98ef43a commit 3e8715d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 34 deletions.
34 changes: 4 additions & 30 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8964,17 +8964,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// mset.store never changes after being set, don't need lock.
mset.store.FastState(&state)

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq
mset.setCatchupPeer(sreq.Peer, last-seq)
Expand Down Expand Up @@ -9138,25 +9127,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
select {
case <-remoteQuitCh:
Expand Down
20 changes: 16 additions & 4 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6720,12 +6720,24 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
return nil
})

// Make sure we only sent 1002 sync catchup msgs.
// This is for the new messages, the delete range, and the EOF.
// Make sure we only sent 2 sync catchup msgs.
// This is for the delete range, and the EOF.
nmsgs, _, _ := sub.Pending()
if nmsgs != 1002 {
t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
if nmsgs != 2 {
t.Fatalf("Expected only 2 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
}

msg, err := sub.NextMsg(0)
require_NoError(t, err)
mbuf := msg.Data[1:]
dr, err := decodeDeleteRange(mbuf)
require_NoError(t, err)
require_Equal(t, dr.First, 1001)
require_Equal(t, dr.Num, 1000)

msg, err = sub.NextMsg(0)
require_NoError(t, err)
require_Equal(t, len(msg.Data), 0)
}

func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) {
Expand Down

0 comments on commit 3e8715d

Please sign in to comment.