From 39ebc976c8661931d0145d195479eccc0a744f64 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 17 Dec 2024 12:00:20 +0100 Subject: [PATCH] NRG: Only continue if aligned with pindex/pterm Signed-off-by: Maurice van Veen --- server/raft.go | 9 +++++--- server/raft_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/server/raft.go b/server/raft.go index 23aa15d923..519e673258 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3401,7 +3401,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } -RETRY: if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { @@ -3427,8 +3426,11 @@ RETRY: } } else if eae.term == ae.pterm { // If terms match we can delete all entries past this one, and then continue storing the current entry. - n.truncateWAL(eae.term, eae.pindex+1) - goto RETRY + n.truncateWAL(ae.pterm, ae.pindex) + // Only continue if truncation was successful, and we ended up such that we can safely continue. + if ae.pterm == n.pterm && ae.pindex == n.pindex { + goto CONTINUE + } } else { // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. @@ -3512,6 +3514,7 @@ RETRY: return } +CONTINUE: // Save to our WAL if we have entries. if ae.shouldStore() { // Only store if an original which will have sub != nil diff --git a/server/raft_test.go b/server/raft_test.go index a785971303..fbce5251e6 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1634,6 +1634,61 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { require_Equal(t, n.commit, 2) } +type mockWALTruncateAlwaysFails struct { + WAL +} + +func (m mockWALTruncateAlwaysFails) Truncate(seq uint64) error { + return errors.New("test: truncate always fails") +} + +func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + n.Lock() + n.wal = mockWALTruncateAlwaysFails{n.wal} + n.Unlock() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, we are leader + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + + // Timeline, after leader change + aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries}) + + // Simply receive first message. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.commit, 0) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Receive second message, which commits the first message. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We receive an entry from another leader, should truncate down to commit / remove the second message. + // But, truncation fails so should register that and not change pindex/pterm. + bindex, bterm := n.pindex, n.pterm + n.processAppendEntry(aeMsg3, n.aesub) + require_Error(t, n.werr, errors.New("test: truncate always fails")) + require_Equal(t, bindex, n.pindex) + require_Equal(t, bterm, n.pterm) +} + func TestNRGForwardProposalResponse(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()