Skip to content

Commit

Permalink
NRG: Truncate down to commit
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Nov 4, 2024
1 parent f981ac3 commit 91eae7f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 17 deletions.
20 changes: 9 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3381,17 +3381,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse
var success bool

if n.commit > 0 && ae.pindex <= n.commit {
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
// If we have already committed this entry, just mark success.
success = true
}
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
seq := ae.pindex
// We have an entry stored at this index, move sequence up so we truncate to after applying the commit.
if n.commit > 0 && seq == n.commit {
seq++
}
if seq < n.commit {
// If we have already committed this entry, just mark success.
success = true
} else if eae, _ := n.loadEntry(seq); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
Expand Down
66 changes: 60 additions & 6 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,8 +1271,8 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)

// Timeline.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})

Expand All @@ -1283,17 +1283,17 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)

// Deliver a message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 2)

// Deliver another message.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.wal.State().Msgs, 3)
Expand All @@ -1303,7 +1303,7 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)

// Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.commit, 2)

// Heartbeat, makes sure we commit.
n.processAppendEntry(aeHeartbeat2, n.aesub)
Expand Down Expand Up @@ -1617,3 +1617,57 @@ func TestNRGRecoverPindexPtermOnlyIfLogNotEmpty(t *testing.T) {
require_Equal(t, rn.pterm, 0)
require_Equal(t, rn.pindex, 0)
}

func TestNRGTruncateDownToCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, we are leader
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})

// Timeline, after leader change
aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 0)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Receive second message, which commits the first message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We receive an entry from another leader, should truncate down to commit / remove the second message.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 1)

// Just replay the third message again so it's stored.
// (normally this would be done during a catchup, but use this as a short-cut for a shorter test)
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats1)

// Heartbeat moves commit up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 2)
}

0 comments on commit 91eae7f

Please sign in to comment.