Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: port disruptive test #41

Merged
merged 4 commits into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ pub struct Raft<T: Storage> {
heartbeat_elapsed: usize,

pub check_quorum: bool,
pre_vote: bool,
pub pre_vote: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it doesn't have to be pub now?

skip_bcast_commit: bool,

heartbeat_timeout: usize,
Expand Down Expand Up @@ -952,7 +952,16 @@ impl<T: Storage> Raft<T> {
// two features is to minimize the disruption caused by nodes that have been
// removed from the cluster's configuration: a removed node will send MsgVotes
// which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it
// will not create disruptive term increases
// will not create disruptive term increases, by notifying leader of this node's
// activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
let to_send = new_message(m.get_from(), MessageType::MsgAppendResponse, None);
self.send(to_send);
} else {
Expand Down
145 changes: 145 additions & 0 deletions tests/cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2260,6 +2260,151 @@ fn test_non_promotable_voter_which_check_quorum() {
assert_eq!(nt.peers[&2].leader_id, 1);
}

/// `test_disruptive_follower` tests isolated follower,
/// with slow network incoming from leader, election times out
/// to become a candidate with an increased term. Then, the
/// candiate's response to late leader heartbeat forces the leader
/// to step down.
#[test]
fn test_disruptive_follower() {
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage());
let mut n3 = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage());

n1.check_quorum = true;
n2.check_quorum = true;
n3.check_quorum = true;

n1.become_follower(1, INVALID_ID);
n2.become_follower(1, INVALID_ID);
n3.become_follower(1, INVALID_ID);

let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Follower);

// etcd server "advanceTicksForElection" on restart;
// this is to expedite campaign trigger when given larger
// election timeouts (e.g. multi-datacenter deploy)
// Or leader messages are being delayed while ticks elapse
let timeout = nt.peers[&3].get_election_timeout();
nt.peers
.get_mut(&3)
.unwrap()
.set_randomized_election_timeout(timeout + 2);
let timeout = nt.peers[&3].get_randomized_election_timeout();
for _ in 0..timeout - 1 {
nt.peers.get_mut(&3).unwrap().tick();
}

// ideally, before last election tick elapses,
// the follower n3 receives "pb.MsgApp" or "pb.MsgHeartbeat"
// from leader n1, and then resets its "electionElapsed"
// however, last tick may elapse before receiving any
// messages from leader, thus triggering campaign
nt.peers.get_mut(&3).unwrap().tick();

// n1 is still leader yet
// while its heartbeat to candidate n3 is being delayed
// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);

// check term
// n1.Term == 2
// n2.Term == 2
// n3.Term == 3
assert_eq!(nt.peers[&1].term, 2);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 3);

// while outgoing vote requests are still queued in n3,
// leader heartbeat finally arrives at candidate n3
// however, due to delayed network from leader, leader
// heartbeat was sent with lower term than candidate's
let mut msg = new_message(1, 3, MessageType::MsgHeartbeat, 0);
msg.set_term(nt.peers[&1].term);
nt.send(vec![msg]);

// then candidate n3 responds with "pb.MsgAppResp" of higher term
// and leader steps down from a message with higher term
// this is to disrupt the current leader, so that candidate
// with higher term can be freed with following election

// check state
assert_eq!(nt.peers[&1].state, StateRole::Follower);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);

// check term
// n1.Term == 3
// n2.Term == 2
// n3.Term == 3
assert_eq!(nt.peers[&1].term, 3);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 3);
}

/// `test_disruptive_follower_pre_vote` tests isolated follower,
/// with slow network incoming from leader, election times out
/// to become a pre-candidate with less log than current leader.
/// Then pre-vote phase prevents this isolated node from forcing
/// current leader to step down, thus less disruptions.
#[test]
fn test_disruptive_follower_pre_vote() {
let mut n1 = new_test_raft_with_prevote(1, vec![1, 2, 3], 10, 1, new_storage(), true);
let mut n2 = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true);
let mut n3 = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), true);

n1.check_quorum = true;
n2.check_quorum = true;
n3.check_quorum = true;

n1.become_follower(1, INVALID_ID);
n2.become_follower(1, INVALID_ID);
n3.become_follower(1, INVALID_ID);

let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Follower);

nt.isolate(3);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);

nt.recover();
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

// check state
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::PreCandidate);

// check term
// n1.Term == 2
// n2.Term == 2
// n3.Term == 2
assert_eq!(nt.peers[&1].term, 2);
assert_eq!(nt.peers[&2].term, 2);
assert_eq!(nt.peers[&3].term, 2);

// delayed leader heartbeat does not force current leader to step down
let mut msg = new_message(1, 3, MessageType::MsgHeartbeat, 0);
msg.set_term(nt.peers[&1].term);
nt.send(vec![msg]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);
}

#[test]
fn test_read_only_option_safe() {
let a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
Expand Down