Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not update leader last-contact when granting a pre-vote request #609

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package raft

import (
"bytes"
"context"
"fmt"
"os"
"sync/atomic"
Expand Down Expand Up @@ -490,3 +491,98 @@ func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) {
})
}
}

// TestRaft_PreVote_LeaderSpam test that when a leader spam the followers
// with pre-vote requests they can still transition to candidate.
// The reason this test need to live in here is that we need the transport heartbeat fast-path
// to use as a trick to avoid heartbeat keeping the cluster stable.
// That fast-path only exists in the net transport.
func TestRaft_PreVote_LeaderSpam(t *testing.T) {
CheckInteg(t)
conf := DefaultConfig()
conf.LocalID = ServerID("first")
conf.HeartbeatTimeout = 50 * time.Millisecond
conf.ElectionTimeout = 50 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Second
conf.SnapshotThreshold = 100
conf.TrailingLogs = 10

// Create a single node
leader := MakeRaft(t, conf, true)
NoErr(WaitFor(leader, Leader), t)

// Join a few nodes!
var followers []*RaftEnv
for i := 0; i < 2; i++ {
conf.LocalID = ServerID(fmt.Sprintf("next-batch-%d", i))
env := MakeRaft(t, conf, false)
addr := env.trans.LocalAddr()
NoErr(WaitFuture(leader.raft.AddVoter(conf.LocalID, addr, 0, 0)), t)
followers = append(followers, env)
}

// Wait for a leader
_, err := WaitForAny(Leader, append([]*RaftEnv{leader}, followers...))
NoErr(err, t)

CheckConsistent(append([]*RaftEnv{leader}, followers...), t)

leaderT := leader.raft.trans

// spam all the followers with pre-vote requests from the leader
// those requests should be granted as long as the leader haven't changed.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
ticker := time.NewTicker(conf.HeartbeatTimeout / 2)
for _, f := range followers {
rsp := RequestPreVoteResponse{}
reqPreVote := RequestPreVoteRequest{
RPCHeader: leader.raft.getRPCHeader(),
Term: leader.raft.getCurrentTerm() + 1,
LastLogIndex: leader.raft.getLastIndex(),
LastLogTerm: leader.raft.getCurrentTerm(),
}
// We don't need to check the error here because when leader change
// it will start failing with "rejecting pre-vote request since we have a leader"
_ = leaderT.(WithPreVote).RequestPreVote(f.raft.localID, f.raft.localAddr, &reqPreVote, &rsp)
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}()
time.Sleep(time.Second)

// for all followers ignore heartbeat from current leader, so we can transition to candidate state.
// the purpose of this test is to verify that spamming nodes with pre-votes don't cause them to never
// transition to Candidates.
for _, f := range followers {
//copy f to avoid data race
f1 := f
f1.trans.SetHeartbeatHandler(func(rpc RPC) {
if a, ok := rpc.Command.(*AppendEntriesRequest); ok {
if ServerID(a.GetRPCHeader().ID) == leader.raft.localID {
resp := &AppendEntriesResponse{
RPCHeader: f1.raft.getRPCHeader(),
Term: f1.raft.getCurrentTerm(),
LastLog: f1.raft.getLastIndex(),
Success: false,
NoRetryBackoff: false,
}
rpc.Respond(resp, nil)
} else {
f.raft.processHeartbeat(rpc)
}
}
})
}
time.Sleep(1 * time.Second)
// New leader should be one of the former followers.
_, err = WaitForAny(Leader, followers)
NoErr(err, t)
}
1 change: 0 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,6 @@ func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) {
}

resp.Granted = true
r.setLastContact()
}

// installSnapshot is invoked when we get a InstallSnapshot RPC call.
Expand Down
Loading