Skip to content

Commit

Permalink
NRG: Don't run catchup when behind on applies
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Dec 12, 2024
1 parent daf3e34 commit 3392957
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 9 deletions.
29 changes: 23 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3303,7 +3303,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}

if isRecovering || !mset.IsLeader() {
if err := mset.processSnapshot(ss); err != nil {
if err := mset.processSnapshot(ss, ce.Index); err != nil {
return err
}
}
Expand Down Expand Up @@ -8343,11 +8343,12 @@ type streamSyncRequest struct {
FirstSeq uint64 `json:"first_seq"`
LastSeq uint64 `json:"last_seq"`
DeleteRangesOk bool `json:"delete_ranges"`
MinApplied uint64 `json:"min_applied"`
}

// Given a stream state that represents a snapshot, calculate the sync request based on our current state.
// Stream lock must be held.
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState) *streamSyncRequest {
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState, index uint64) *streamSyncRequest {
// Shouldn't happen, but consequences are pretty bad if we have the lock held and
// our caller tries to take the lock again on panic defer, as in processSnapshot.
if state == nil || snap == nil || mset.node == nil {
Expand All @@ -8357,7 +8358,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplica
if state.LastSeq >= snap.LastSeq {
return nil
}
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true}
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true, MinApplied: index}
}

// processSnapshotDeletes will update our current store based on the snapshot
Expand Down Expand Up @@ -8493,15 +8494,15 @@ var (
)

// Process a stream snapshot.
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (e error) {
// Update any deletes, etc.
mset.processSnapshotDeletes(snap)
mset.setCLFS(snap.Failed)

mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
sreq := mset.calculateSyncRequest(&state, snap)
sreq := mset.calculateSyncRequest(&state, snap, index)

s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
Expand Down Expand Up @@ -8639,7 +8640,7 @@ RETRY:
mset.mu.RLock()
var state StreamState
mset.store.FastState(&state)
sreq = mset.calculateSyncRequest(&state, snap)
sreq = mset.calculateSyncRequest(&state, snap, index)
mset.mu.RUnlock()
if sreq == nil {
return nil
Expand Down Expand Up @@ -9187,6 +9188,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq

// The follower received a snapshot from another leader, and we've become leader since.
// We have an up-to-date log but could be behind on applies. We must wait until we've reached the minimum required.
// The follower will automatically retry after a timeout, so we can safely return here.
if node := mset.raftNode(); node != nil {
index, _, applied := node.Progress()
// Only skip if our log has enough entries, and they could be applied in the future.
if index >= sreq.MinApplied && applied < sreq.MinApplied {
return
}
// We know here we've either applied enough entries, or our log doesn't have enough entries.
// In the latter case the request expects us to have more. Just continue and value availability here.
// This should only be possible if the logs have already desynced, and we shouldn't have become leader
// in the first place. Not much we can do here in this (hypothetical) scenario.
}

mset.setCatchupPeer(sreq.Peer, last-seq)

// Check if we can compress during this.
Expand Down
88 changes: 88 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6815,6 +6815,94 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
}
}

func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Reconnect to stream leader.
l := c.streamLeader(globalAccountName, "TEST")
nc.Close()
nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

// Setup wiretap and grab stream.
sendSubject := "test-wiretap"
sub, err := nc.SubscribeSync(sendSubject)
require_NoError(t, err)
err = nc.Flush() // Must flush, otherwise our subscription could be too late.
require_NoError(t, err)
acc, err := l.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// We have a message at sequence 1, so expect a successful catchup.
sreq1 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq1) }))
// Expect the message at sequence 1.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, _, err := decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 1)
require_Equal(t, subj, "foo")
// And end with EOF.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Len(t, len(msg.Data), 0)

// Add one additional entry into the log that's not applied yet.
n := mset.node.(*raft)
n.Lock()
ae := n.buildAppendEntry(nil)
err = n.storeToWAL(ae)
n.Unlock()
index, commit, applied := n.Progress()
require_NoError(t, err)
require_LessThan(t, applied, index)
require_Equal(t, commit, applied)
// We have a message at sequence 1, but we haven't applied as many append entries.
// We can't fulfill the request right now as we don't know yet if
// that message will be deleted as part of upcoming append entries.
sreq2 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: index}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq2) }))
_, err = sub.NextMsg(time.Second)
require_Error(t, err, nats.ErrTimeout)

// We have a message at sequence 1, but we haven't applied as many append entries.
// Also, we seem to have a log that doesn't contain enough entries, even though we became leader.
// Something has already gone wrong and got the logs to desync.
// Value availability here and just fulfill the request.
sreq3 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: 100}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq3) }))
// Expect the message at sequence 1.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, _, err = decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 1)
require_Equal(t, subj, "foo")
// And end with EOF.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Len(t, len(msg.Data), 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
5 changes: 3 additions & 2 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3997,8 +3997,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
// Processing a snapshot while there's no leader elected is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
var snap StreamReplicatedState
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
err := mset.processSnapshot(&snap)
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
appliedIndex := uint64(0) // incorrect index, but doesn't matter for this test
err := mset.processSnapshot(&snap, appliedIndex)
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
require_True(t, isClusterResetErr(err))
mset.resetClusteredState(err)
Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ func (n *raft) State() RaftState {
func (n *raft) Progress() (index, commit, applied uint64) {
n.RLock()
defer n.RUnlock()
return n.pindex + 1, n.commit, n.applied
return n.pindex, n.commit, n.applied
}

// Size returns number of entries and total bytes for our WAL.
Expand Down

0 comments on commit 3392957

Please sign in to comment.