Skip to content

Commit

Permalink
raftstore/hibernate: wake up for more cases (tikv#6672)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Feb 28, 2020
1 parent 1e25fa6 commit 792a1cb
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 7 deletions.
18 changes: 18 additions & 0 deletions components/test_raftstore/src/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,21 @@ impl Filter for LeaseReadFilter {
Ok(())
}
}

#[derive(Clone)]
pub struct DropMessageFilter {
ty: MessageType,
}

impl DropMessageFilter {
pub fn new(ty: MessageType) -> DropMessageFilter {
DropMessageFilter { ty }
}
}

impl Filter for DropMessageFilter {
fn before(&self, msgs: &mut Vec<RaftMessage>) -> Result<()> {
msgs.retain(|m| m.get_message().get_msg_type() != self.ty);
Ok(())
}
}
7 changes: 7 additions & 0 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,13 @@ pub fn create_test_engine(
(engines, path)
}

pub fn configure_for_hibernate<T: Simulator>(cluster: &mut Cluster<T>) {
// Uses long check interval to make leader keep sleeping during tests.
cluster.cfg.raft_store.abnormal_leader_missing_duration = ReadableDuration::secs(20);
cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(40);
cluster.cfg.raft_store.peer_stale_state_check_interval = ReadableDuration::secs(10);
}

pub fn configure_for_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
// Truncate the log quickly so that we can force sending snapshot.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
Expand Down
12 changes: 9 additions & 3 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,10 +832,13 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let from_peer_id = msg.get_from_peer().get_id();
self.fsm.peer.insert_peer_cache(msg.take_from_peer());
self.fsm.peer.step(self.ctx, msg.take_message())?;
if self.fsm.peer.should_wake_up {
self.reset_raft_tick(GroupState::Ordered);
}

if self.fsm.peer.any_new_peer_catch_up(from_peer_id) {
self.fsm.peer.heartbeat_pd(self.ctx);
self.register_raft_base_tick();
self.reset_raft_tick(GroupState::Ordered);
}

self.fsm.has_ready = true;
Expand All @@ -845,6 +848,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
fn reset_raft_tick(&mut self, state: GroupState) {
self.fsm.group_state = state;
self.fsm.missing_ticks = 0;
self.fsm.peer.should_wake_up = false;
self.register_raft_base_tick();
}

Expand Down Expand Up @@ -2271,8 +2275,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
bind_term(&mut resp, term);
if self.fsm.peer.propose(self.ctx, cb, msg, resp) {
self.fsm.has_ready = true;
self.fsm.group_state = GroupState::Ordered;
self.register_raft_base_tick();
}

if self.fsm.peer.should_wake_up {
self.reset_raft_tick(GroupState::Ordered);
}

self.register_pd_heartbeat_tick();
Expand Down
26 changes: 22 additions & 4 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub struct Peer {

/// If it fails to send messages to leader.
pub leader_unreachable: bool,
/// Indicates whether the peer should be woken up.
pub should_wake_up: bool,
/// Whether this peer is destroyed asynchronously.
/// If it's true when merging, its data in storeMeta will be removed early by the target peer
pub pending_remove: bool,
Expand Down Expand Up @@ -286,6 +288,7 @@ impl Peer {
compaction_declined_bytes: 0,
leader_unreachable: false,
pending_remove: false,
should_wake_up: false,
pending_merge_state: None,
last_committed_prepare_merge_idx: 0,
leader_missing_time: Some(Instant::now()),
Expand Down Expand Up @@ -518,14 +521,20 @@ impl Peer {
return res;
}
}
if self.raft_group.raft.pending_read_count() > 0 {
return res;
}
if self.raft_group.raft.lead_transferee.is_some() {
return res;
}
// Unapplied entries can change the configuration of the group.
res.up_to_date = self.get_store().applied_index() == last_index;
res
}

pub fn check_after_tick(&self, state: GroupState, res: CheckTickResult) -> bool {
if res.leader {
res.up_to_date && self.is_leader() && self.raft_group.raft.pending_read_count() == 0
res.up_to_date && self.is_leader()
} else {
// If follower keeps receiving data from leader, then it's safe to stop
// ticking, as leader will make sure it has the latest logs.
Expand All @@ -535,6 +544,8 @@ impl Peer {
&& self.raft_group.raft.leader_id != raft::INVALID_ID
&& self.raft_group.raft.raft_log.last_term() == self.raft_group.raft.term
&& !self.has_unresolved_reads()
// If it becomes leader, the stats is not valid anymore.
&& !self.is_leader()
}
}

Expand Down Expand Up @@ -711,7 +722,8 @@ impl Peer {
if msg_type == MessageType::MsgReadIndex && expected_term == self.raft_group.raft.term {
// If the leader hasn't committed any entries in its term, it can't response read only
// requests. Please also take a look at raft-rs.
if let LeaseState::Valid = self.inspect_lease() {
let state = self.inspect_lease();
if let LeaseState::Valid = state {
let mut resp = eraftpb::Message::default();
resp.set_msg_type(MessageType::MsgReadIndexResp);
resp.to = m.from;
Expand All @@ -721,6 +733,7 @@ impl Peer {
self.pending_messages.push(resp);
return Ok(());
}
self.should_wake_up = state == LeaseState::Expired;
}

if msg_type == MessageType::MsgTransferLeader {
Expand Down Expand Up @@ -1663,7 +1676,7 @@ impl Peer {
/// need to be up to date for now. If 'allow_remove_leader' is false then
/// the peer to be removed should not be the leader.
fn check_conf_change<T, C>(
&self,
&mut self,
ctx: &mut PollContext<T, C>,
cmd: &RaftCmdRequest,
) -> Result<()> {
Expand Down Expand Up @@ -1738,6 +1751,8 @@ impl Peer {
"healthy" => healthy,
"quorum_after_change" => quorum_after_change,
);
// Waking it up to replicate logs to candidate.
self.should_wake_up = true;
Err(box_err!(
"unsafe to perform conf change {:?}, total {}, healthy {}, quorum after \
change {}",
Expand Down Expand Up @@ -2167,7 +2182,10 @@ impl Peer {
"last_index" => self.get_store().last_index(),
);
}
None => self.transfer_leader(&from),
None => {
self.transfer_leader(&from);
self.should_wake_up = true;
}
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/raftstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod test_compact_after_delete;
mod test_compact_lock_cf;
mod test_compact_log;
mod test_conf_change;
mod test_hibernate;
mod test_lease_read;
mod test_merge;
mod test_multi;
Expand Down
144 changes: 144 additions & 0 deletions tests/integrations/raftstore/test_hibernate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::*;
use std::thread;
use std::time::*;

use raft::eraftpb::MessageType;

use test_raftstore::*;
use tikv_util::HandyRwLock;

/// Tests whether single voter still replicates log to learner after restart.
///
/// A voter will become leader in a single tick. The case check if the role
/// change is detected correctly.
#[test]
fn test_single_voter_restart() {
let mut cluster = new_server_cluster(0, 2);
configure_for_hibernate(&mut cluster);
cluster.pd_client.disable_default_operator();
cluster.run_conf_change();
cluster.pd_client.must_add_peer(1, new_learner_peer(2, 2));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
cluster.stop_node(2);
cluster.must_put(b"k2", b"v2");
cluster.stop_node(1);
// Restart learner first to avoid network influence.
cluster.run_node(2).unwrap();
cluster.run_node(1).unwrap();
must_get_equal(&cluster.get_engine(2), b"k2", b"v2");
}

/// Tests whether an isolated learner can be prompted to voter.
#[test]
fn test_prompt_learner() {
let mut cluster = new_server_cluster(0, 4);
configure_for_hibernate(&mut cluster);
cluster.cfg.raft_store.raft_log_gc_count_limit = 20;
cluster.pd_client.disable_default_operator();
cluster.run_conf_change();
cluster.pd_client.must_add_peer(1, new_peer(2, 2));
cluster.pd_client.must_add_peer(1, new_peer(3, 3));

cluster.pd_client.must_add_peer(1, new_learner_peer(4, 4));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(4), b"k1", b"v1");

// Suppose there is only one way partition.
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 3).direction(Direction::Send),
));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 4).direction(Direction::Send),
));
let idx = cluster.truncated_state(1, 1).get_index();
// Trigger a log compaction.
for i in 0..cluster.cfg.raft_store.raft_log_gc_count_limit * 2 {
cluster.must_put(format!("k{}", i).as_bytes(), format!("v{}", i).as_bytes());
}
let timer = Instant::now();
loop {
if cluster.truncated_state(1, 1).get_index() > idx {
break;
}
thread::sleep(Duration::from_millis(10));
if timer.elapsed() > Duration::from_secs(3) {
panic!("log is not compact after 3 seconds");
}
}
// Wait till leader peer goes to sleep again.
thread::sleep(
cluster.cfg.raft_store.raft_base_tick_interval.0
* 2
* cluster.cfg.raft_store.raft_election_timeout_ticks as u32,
);
cluster.clear_send_filters();
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 3).direction(Direction::Send),
));
cluster.pd_client.must_add_peer(1, new_peer(4, 4));
}

/// Tests whether leader resumes correctly when pre-transfer
/// leader response is delayed more than an election timeout.
#[test]
fn test_transfer_leader_delay() {
let mut cluster = new_node_cluster(0, 3);
configure_for_hibernate(&mut cluster);
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

let messages = Arc::new(Mutex::new(vec![]));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 3)
.direction(Direction::Send)
.msg_type(MessageType::MsgTransferLeader)
.reserve_dropped(messages.clone()),
));
cluster.transfer_leader(1, new_peer(3, 3));
let timer = Instant::now();
while timer.elapsed() < Duration::from_secs(3) && messages.lock().unwrap().is_empty() {
thread::sleep(Duration::from_millis(10));
}
assert_eq!(messages.lock().unwrap().len(), 1);
// Wait till leader peer goes to sleep again.
thread::sleep(
cluster.cfg.raft_store.raft_base_tick_interval.0
* 2
* cluster.cfg.raft_store.raft_election_timeout_ticks as u32,
);
cluster.clear_send_filters();
cluster.add_send_filter(CloneFilterFactory(DropMessageFilter::new(
MessageType::MsgTimeoutNow,
)));
let router = cluster.sim.wl().get_router(1).unwrap();
router
.send_raft_message(messages.lock().unwrap().pop().unwrap())
.unwrap();
let timer = Instant::now();
while timer.elapsed() < Duration::from_secs(3) {
let resp = cluster.request(
b"k2",
vec![new_put_cmd(b"k2", b"v2")],
false,
Duration::from_secs(5),
);
let header = resp.get_header();
if !header.has_error() {
return;
}
if !header
.get_error()
.get_message()
.contains("proposal dropped")
{
panic!("response {:?} has error", resp);
}
thread::sleep(Duration::from_millis(10));
}
panic!("failed to request after 3 seconds");
}

0 comments on commit 792a1cb

Please sign in to comment.