Skip to content

Commit

Permalink
NRG: Add single RAFT node test
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Oct 14, 2024
1 parent 0012d42 commit b62c058
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 4 deletions.
18 changes: 14 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined})
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
// initRaftNode will initialize the raft node, to be used by startRaftNode or when testing to not run the Go routine.
func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (*raft, error) {
if cfg == nil {
return nil, errNilCfg
}
Expand Down Expand Up @@ -520,6 +520,16 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
labels["group"] = n.group
s.registerRaftNode(n.group, n)

return n, nil
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
n, err := s.initRaftNode(accName, cfg, labels)
if err != nil {
return nil, err
}

// Start the run goroutine for the Raft state machine.
s.startGoRoutine(n.run, labels)

Expand Down Expand Up @@ -3374,8 +3384,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
// Check if this is a lower index than what we were expecting.
if ae.pindex < n.pindex {
n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse

Expand Down
127 changes: 127 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,3 +1076,130 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) {
}
}
}

func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.servers[0] // RunBasicJetStreamServer not available

ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage})
require_NoError(t, err)
cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms}

err = s.bootstrapRaftNode(cfg, nil, false)
require_NoError(t, err)
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(t, err)

encode := func(ae *appendEntry) *appendEntry {
buf, err := ae.encode(nil)
require_NoError(t, err)
ae.buf = buf
return ae
}

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, for first leader
aeInitial := encode(&appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeUncommitted := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeNoQuorum := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})

// Timeline, after leader change
aeMissed := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 2, entries: entries})
aeCatchupTrigger := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries})
aeHeartbeat2 := encode(&appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil})
aeHeartbeat3 := encode(&appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeInitial, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 1)

// We get one entry that has quorum (but we don't know that yet), so it stays uncommitted for a bit.
n.processAppendEntry(aeUncommitted, n.aesub)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We get one entry that has NO quorum (but we don't know that yet).
n.processAppendEntry(aeNoQuorum, n.aesub)
require_Equal(t, n.wal.State().Msgs, 3)
entry, err = n.loadEntry(3)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We've just had a leader election, and we missed one message from the previous leader, we should catchup.
n.processAppendEntry(aeCatchupTrigger, n.aesub)
require_Equal(t, n.wal.State().Msgs, 3)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.commit

// Make sure our WAL was not truncated, and we're still catching up.
aeUncommitted.leader = nats1
aeUncommitted = encode(aeUncommitted)
n.processAppendEntry(aeUncommitted, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 3)
require_True(t, n.catchup != nil)
// Our entry should not be touched, so the 'leader' should've stayed the same.
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We now notice the leader indicated a different entry at the (no quorum) index, should truncate.
n.processAppendEntry(aeMissed, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 2)
require_True(t, n.catchup == nil)

// We get a heartbeat that prompts us to catchup again.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.wal.State().Msgs, 2)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.commit

// We get the uncommitted entry again, it should stay the same.
n.processAppendEntry(aeUncommitted, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 2)
require_True(t, n.catchup != nil)
// Our entry should still stay the same.
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We now get the missed append entry, store it.
n.processAppendEntry(aeMissed, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 3)
require_True(t, n.catchup != nil)
entry, err = n.loadEntry(3)
require_NoError(t, err)
require_Equal(t, entry.leader, nats1)

// We now get the entry that initially triggered us to catchup again, it should be added.
n.processAppendEntry(aeCatchupTrigger, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 4)
require_True(t, n.catchup != nil)
entry, err = n.loadEntry(4)
require_NoError(t, err)
require_Equal(t, entry.leader, nats1)

// Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date).
n.processAppendEntry(aeHeartbeat3, n.aesub)
require_Equal(t, n.commit, 4)
require_True(t, n.catchup == nil)
}

0 comments on commit b62c058

Please sign in to comment.