Skip to content

Commit

Permalink
NRG: Only continue if aligned with pindex/pterm (#6271)
Browse files Browse the repository at this point in the history
If truncation fails we would spin on retrying. Now only continue if
`pindex/pterm` are aligned, otherwise responds non-success to the
leader.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 17, 2024
2 parents e425a9d + 39ebc97 commit d323e23
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
9 changes: 6 additions & 3 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3401,7 +3401,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.updateLeadChange(false)
}

RETRY:
if ae.pterm != n.pterm || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
Expand All @@ -3427,8 +3426,11 @@ RETRY:
}
} else if eae.term == ae.pterm {
// If terms match we can delete all entries past this one, and then continue storing the current entry.
n.truncateWAL(eae.term, eae.pindex+1)
goto RETRY
n.truncateWAL(ae.pterm, ae.pindex)
// Only continue if truncation was successful, and we ended up such that we can safely continue.
if ae.pterm == n.pterm && ae.pindex == n.pindex {
goto CONTINUE
}
} else {
// If terms mismatched, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
Expand Down Expand Up @@ -3512,6 +3514,7 @@ RETRY:
return
}

CONTINUE:
// Save to our WAL if we have entries.
if ae.shouldStore() {
// Only store if an original which will have sub != nil
Expand Down
55 changes: 55 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,61 @@ func TestNRGTruncateDownToCommitted(t *testing.T) {
require_Equal(t, n.commit, 2)
}

type mockWALTruncateAlwaysFails struct {
WAL
}

func (m mockWALTruncateAlwaysFails) Truncate(seq uint64) error {
return errors.New("test: truncate always fails")
}

func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

n.Lock()
n.wal = mockWALTruncateAlwaysFails{n.wal}
n.Unlock()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, we are leader
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})

// Timeline, after leader change
aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries})

// Simply receive first message.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 0)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Receive second message, which commits the first message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We receive an entry from another leader, should truncate down to commit / remove the second message.
// But, truncation fails so should register that and not change pindex/pterm.
bindex, bterm := n.pindex, n.pterm
n.processAppendEntry(aeMsg3, n.aesub)
require_Error(t, n.werr, errors.New("test: truncate always fails"))
require_Equal(t, bindex, n.pindex)
require_Equal(t, bterm, n.pterm)
}

func TestNRGForwardProposalResponse(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down

0 comments on commit d323e23

Please sign in to comment.