Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Fix term handling in candidate state and use higher term from vote request #5671

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3201,11 +3201,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// If the append entry term is newer than the current term, erase our
// vote.
if ae.term > n.term {
n.term = ae.term
n.vote = noVote
n.writeTermVote()
}
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
n.term = ae.term
n.writeTermVote()
n.stepdown.push(ae.leader)
}
}
Expand Down Expand Up @@ -3974,8 +3974,8 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
n.debug("Stepping down from %s, detected higher term: %d vs %d",
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdown.push(noLeader)
n.term = vr.term
}
n.term = vr.term
n.vote = noVote
n.writeTermVote()
}
Expand Down
89 changes: 67 additions & 22 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestNRGInvalidTAVDoesntPanic(t *testing.T) {
c.waitOnAllCurrent()
}

func TestNRGCandidateStepsDownAfterAE(t *testing.T) {
func TestNRGAssumeHighTermAfterCandidateIsolation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
c.waitOnLeader()
Expand All @@ -441,32 +441,38 @@ func TestNRGCandidateStepsDownAfterAE(t *testing.T) {
rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// Pick a random follower node. Bump the term up by a considerable
// Bump the term up on one of the follower nodes by a considerable
// amount and force it into the candidate state. This is what happens
// after a period of time in isolation.
n := rg.nonLeader().node().(*raft)
n.Lock()
n.term += 100
n.switchState(Candidate)
n.Unlock()
follower := rg.nonLeader().node().(*raft)
follower.Lock()
follower.term += 100
follower.switchState(Candidate)
follower.Unlock()

follower.requestVote()
time.Sleep(time.Millisecond * 100)

// The candidate will shortly send a vote request. When that happens,
// the rest of the nodes in the cluster should move up to that term,
// even though they will not grant the vote.
nterm := follower.term
for _, n := range rg {
require_Equal(t, n.node().Term(), nterm)
}

// Have the leader push through something on the current term just
// for good measure, although the heartbeats probably work too.
// Have the leader send out a proposal, which will force the candidate
// back into follower state.
rg.waitOnLeader()
rg.leader().(*stateAdder).proposeDelta(1)
rg.waitOnTotal(t, 1)

// Wait for the leader to receive the next append entry from the
// current leader. What should happen is that the node steps down
// and starts following the leader, as nothing in the log of the
// follower is newer than the term of the leader.
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if n.State() == Candidate {
return fmt.Errorf("shouldn't still be candidate state")
}
if nterm, lterm := n.Term(), rg.leader().node().Term(); nterm != lterm {
return fmt.Errorf("follower term %d should match leader term %d", nterm, lterm)
}
return nil
})
// The candidate should have switched to a follower on a term equal to
// or newer than the candidate had.
for _, n := range rg {
require_NotEqual(t, n.node().State(), Candidate)
require_True(t, n.node().Term() >= nterm)
}
}

// Test to make sure this does not cause us to truncate our wal or enter catchup state.
Expand Down Expand Up @@ -588,3 +594,42 @@ func TestNRGLeavesObserverAfterPause(t *testing.T) {
n.ResumeApply()
checkState(false, false)
}

func TestNRGCandidateDoesntRevertTermAfterOldAE(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// Bump the term up a few times.
for i := 0; i < 3; i++ {
rg.leader().node().StepDown()
time.Sleep(time.Millisecond * 50) // Needed because stepdowns not synchronous
rg.waitOnLeader()
}

leader := rg.leader().node().(*raft)
follower := rg.nonLeader().node().(*raft)

// Sanity check that we are where we expect to be.
require_Equal(t, leader.term, 4)
require_Equal(t, follower.term, 4)

// At this point the active term is 4 and pterm is 4, force the
// term up to 9. This won't bump the pterm.
rg.lockAll()
for _, n := range rg {
n.node().(*raft).term += 5
}
rg.unlockAll()

// Build an AE that has a term newer than the pterm but older than
// the term. Give it to the follower in candidate state.
ae := newAppendEntry(leader.id, 6, leader.commit, leader.pterm, leader.pindex, nil)
follower.switchToCandidate()
follower.processAppendEntry(ae, nil)

// The candidate must not have reverted back to term 6.
require_NotEqual(t, follower.term, 6)
}