Skip to content

Commit

Permalink
Merge pull request #42 from csmoe/fix_deadlock_during_prevote_migrati…
Browse files Browse the repository at this point in the history
…on_process

fix deadlock during prevote migration process
  • Loading branch information
A. Hobden authored May 14, 2018
2 parents 8453663 + 7dd78e9 commit 444e99e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ pub struct Raft<T: Storage> {
heartbeat_elapsed: usize,

pub check_quorum: bool,
pre_vote: bool,
#[doc(hidden)]
pub pre_vote: bool,
skip_bcast_commit: bool,

heartbeat_timeout: usize,
Expand Down Expand Up @@ -938,7 +939,7 @@ impl<T: Storage> Raft<T> {
}
}
} else if m.get_term() < self.term {
if self.check_quorum
if (self.check_quorum || self.pre_vote)
&& (m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgAppend)
{
Expand All @@ -965,6 +966,28 @@ impl<T: Storage> Raft<T> {
// fresh election. This can be prevented with Pre-Vote phase.
let to_send = new_message(m.get_from(), MessageType::MsgAppendResponse, None);
self.send(to_send);
} else if m.get_msg_type() == MessageType::MsgRequestPreVote {
// Before pre_vote enable, there may be a recieving candidate with higher term,
// but less log. After update to pre_vote, the cluster may deadlock if
// we drop messages with a lower term.
info!(
"{} [log_term: {}, index: {}, vote: {}] rejected {:?} from {} [log_term: {}, index: {}] at term {}",
self.id,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_msg_type(),
m.get_from(),
m.get_log_term(),
m.get_index(),
self.term,
);

let mut to_send =
new_message(m.get_from(), MessageType::MsgRequestPreVoteResponse, None);
to_send.set_term(self.term);
to_send.set_reject(true);
self.send(to_send);
} else {
// ignore other cases
info!(
Expand Down
108 changes: 108 additions & 0 deletions tests/cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3950,6 +3950,114 @@ fn test_remove_learner() {
assert!(n1.prs().learner_nodes().is_empty());
}

// simulate rolling update a cluster for Pre-Vote. cluster has 3 nodes [n1, n2, n3].
// n1 is leader with term 2
// n2 is follower with term 2
// n3 is partitioned, with term 4 and less log, state is candidate
fn new_prevote_migration_cluster() -> Network {
// We intentionally do not enable pre_vote for n3, this is done so in order
// to simulate a rolling restart process where it's possible to have a mixed
// version cluster with replicas with pre_vote enabled, and replicas without.
let mut n1 = new_test_raft_with_prevote(1, vec![1, 2, 3], 10, 1, new_storage(), true);
let mut n2 = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true);
let mut n3 = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), false);

n1.become_follower(1, INVALID_ID);
n2.become_follower(1, INVALID_ID);
n3.become_follower(1, INVALID_ID);

let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)]);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// Cause a network partition to isolate n3.
nt.isolate(3);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);

nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

// check state
// n1.state == Leader
// n2.state == Follower
// n3.state == Candidate
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);

// check term
// n1.Term == 2
// n2.Term == 2
// n3.Term == 4
assert_eq!(nt.peers[&1].term, 2);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 4);

// Enable prevote on n3, then recover the network
nt.peers.get_mut(&3).unwrap().pre_vote = true;
nt.recover();

nt
}

#[test]
fn test_prevote_migration_can_complete_election() {
// n1 is leader with term 2
// n2 is follower with term 2
// n3 is pre-candidate with term 4, and less log
let mut nt = new_prevote_migration_cluster();

// simulate leader down
nt.isolate(1);

// Call for elections from both n2 and n3.
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);
nt.send(vec![new_message(2, 2, MessageType::MsgHup, 0)]);

// check state
// n2.state == Follower
// n3.state == PreCandidate
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::PreCandidate);

nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);
nt.send(vec![new_message(2, 2, MessageType::MsgHup, 0)]);

// Do we have a leader?
assert!(
(nt.peers[&2].state == StateRole::Leader) || (nt.peers[&3].state == StateRole::Follower)
);
}

#[test]
fn test_prevote_migration_with_free_stuck_pre_candidate() {
let mut nt = new_prevote_migration_cluster();

// n1 is leader with term 2
// n2 is follower with term 2
// n3 is pre-candidate with term 4, and less log
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::PreCandidate);

// Pre-Vote again for safety
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::PreCandidate);

let mut to_send = new_message(1, 3, MessageType::MsgHeartbeat, 0);
to_send.set_term(nt.peers[&1].term);
nt.send(vec![to_send]);

// Disrupt the leader so that the stuck peer is freed
assert_eq!(nt.peers[&1].state, StateRole::Follower);

assert_eq!(nt.peers[&3].term, nt.peers[&1].term);
}

#[test]
fn test_learner_respond_vote() {
let mut n1 = new_test_learner_raft(1, vec![1, 2], vec![3], 10, 1, new_storage());
Expand Down

0 comments on commit 444e99e

Please sign in to comment.