Skip to content

Commit

Permalink
raftstore/hibernate: wake up for more cases (tikv#6672) (tikv#6732)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay authored Feb 28, 2020
1 parent 99dbd23 commit 5dcac54
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 6 deletions.
18 changes: 18 additions & 0 deletions components/test_raftstore/src/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,21 @@ impl Filter for LeaseReadFilter {
Ok(())
}
}

#[derive(Clone)]
pub struct DropMessageFilter {
ty: MessageType,
}

impl DropMessageFilter {
pub fn new(ty: MessageType) -> DropMessageFilter {
DropMessageFilter { ty }
}
}

impl Filter for DropMessageFilter {
fn before(&self, msgs: &mut Vec<RaftMessage>) -> Result<()> {
msgs.retain(|m| m.get_message().get_msg_type() != self.ty);
Ok(())
}
}
7 changes: 7 additions & 0 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ pub fn create_test_engine(
(engines, path)
}

pub fn configure_for_hibernate<T: Simulator>(cluster: &mut Cluster<T>) {
// Uses long check interval to make leader keep sleeping during tests.
cluster.cfg.raft_store.abnormal_leader_missing_duration = ReadableDuration::secs(20);
cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(40);
cluster.cfg.raft_store.peer_stale_state_check_interval = ReadableDuration::secs(10);
}

pub fn configure_for_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
// Truncate the log quickly so that we can force sending snapshot.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
Expand Down
12 changes: 9 additions & 3 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,10 +835,13 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let from_peer_id = msg.get_from_peer().get_id();
self.fsm.peer.insert_peer_cache(msg.take_from_peer());
self.fsm.peer.step(self.ctx, msg.take_message())?;
if self.fsm.peer.should_wake_up {
self.reset_raft_tick(GroupState::Ordered);
}

if self.fsm.peer.any_new_peer_catch_up(from_peer_id) {
self.fsm.peer.heartbeat_pd(self.ctx);
self.register_raft_base_tick();
self.reset_raft_tick(GroupState::Ordered);
}

self.fsm.has_ready = true;
Expand All @@ -848,6 +851,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
fn reset_raft_tick(&mut self, state: GroupState) {
self.fsm.group_state = state;
self.fsm.missing_ticks = 0;
self.fsm.peer.should_wake_up = false;
self.register_raft_base_tick();
}

Expand Down Expand Up @@ -2248,8 +2252,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
bind_term(&mut resp, term);
if self.fsm.peer.propose(self.ctx, cb, msg, resp) {
self.fsm.has_ready = true;
self.fsm.group_state = GroupState::Ordered;
self.register_raft_base_tick();
}

if self.fsm.peer.should_wake_up {
self.reset_raft_tick(GroupState::Ordered);
}

self.register_pd_heartbeat_tick();
Expand Down
22 changes: 19 additions & 3 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ pub struct Peer {

/// If it fails to send messages to leader.
pub leader_unreachable: bool,
/// Indicates whether the peer should be woken up.
pub should_wake_up: bool,
/// Whether this peer is destroyed asynchronously.
/// If it's true when merging, its data in storeMeta will be removed early by the target peer
pub pending_remove: bool,
Expand Down Expand Up @@ -376,6 +378,7 @@ impl Peer {
compaction_declined_bytes: 0,
leader_unreachable: false,
pending_remove: false,
should_wake_up: false,
pending_merge_state: None,
last_committed_prepare_merge_idx: 0,
leader_missing_time: Some(Instant::now()),
Expand Down Expand Up @@ -613,14 +616,20 @@ impl Peer {
return res;
}
}
if self.raft_group.raft.pending_read_count() > 0 {
return res;
}
if self.raft_group.raft.lead_transferee.is_some() {
return res;
}
// Unapplied entries can change the configuration of the group.
res.up_to_date = self.get_store().applied_index() == last_index;
res
}

pub fn check_after_tick(&self, state: GroupState, res: CheckTickResult) -> bool {
if res.leader {
res.up_to_date && self.is_leader() && self.raft_group.raft.pending_read_count() == 0
res.up_to_date && self.is_leader()
} else {
// If follower keeps receiving data from leader, then it's safe to stop
// ticking, as leader will make sure it has the latest logs.
Expand All @@ -629,6 +638,8 @@ impl Peer {
state != GroupState::Chaos
&& self.raft_group.raft.leader_id != raft::INVALID_ID
&& self.raft_group.raft.raft_log.last_term() == self.raft_group.raft.term
// If it becomes leader, the stats is not valid anymore.
&& !self.is_leader()
}
}

Expand Down Expand Up @@ -1722,7 +1733,7 @@ impl Peer {
/// need to be up to date for now. If 'allow_remove_leader' is false then
/// the peer to be removed should not be the leader.
fn check_conf_change<T, C>(
&self,
&mut self,
ctx: &mut PollContext<T, C>,
cmd: &RaftCmdRequest,
) -> Result<()> {
Expand Down Expand Up @@ -1797,6 +1808,8 @@ impl Peer {
"healthy" => healthy,
"quorum_after_change" => quorum_after_change,
);
// Waking it up to replicate logs to candidate.
self.should_wake_up = true;
Err(box_err!(
"unsafe to perform conf change {:?}, total {}, healthy {}, quorum after \
change {}",
Expand Down Expand Up @@ -2196,7 +2209,10 @@ impl Peer {
"last_index" => self.get_store().last_index(),
);
}
None => self.transfer_leader(&from),
None => {
self.transfer_leader(&from);
self.should_wake_up = true;
}
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/raftstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod test_compact_after_delete;
mod test_compact_lock_cf;
mod test_compact_log;
mod test_conf_change;
mod test_hibernate;
mod test_lease_read;
mod test_merge;
mod test_multi;
Expand Down
144 changes: 144 additions & 0 deletions tests/integrations/raftstore/test_hibernate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::*;
use std::thread;
use std::time::*;

use raft::eraftpb::MessageType;

use test_raftstore::*;
use tikv_util::HandyRwLock;

/// Tests whether single voter still replicates log to learner after restart.
///
/// A voter will become leader in a single tick. The case check if the role
/// change is detected correctly.
#[test]
fn test_single_voter_restart() {
let mut cluster = new_server_cluster(0, 2);
configure_for_hibernate(&mut cluster);
cluster.pd_client.disable_default_operator();
cluster.run_conf_change();
cluster.pd_client.must_add_peer(1, new_learner_peer(2, 2));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
cluster.stop_node(2);
cluster.must_put(b"k2", b"v2");
cluster.stop_node(1);
// Restart learner first to avoid network influence.
cluster.run_node(2).unwrap();
cluster.run_node(1).unwrap();
must_get_equal(&cluster.get_engine(2), b"k2", b"v2");
}

/// Tests whether an isolated learner can be prompted to voter.
#[test]
fn test_prompt_learner() {
let mut cluster = new_server_cluster(0, 4);
configure_for_hibernate(&mut cluster);
cluster.cfg.raft_store.raft_log_gc_count_limit = 20;
cluster.pd_client.disable_default_operator();
cluster.run_conf_change();
cluster.pd_client.must_add_peer(1, new_peer(2, 2));
cluster.pd_client.must_add_peer(1, new_peer(3, 3));

cluster.pd_client.must_add_peer(1, new_learner_peer(4, 4));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(4), b"k1", b"v1");

// Suppose there is only one way partition.
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 3).direction(Direction::Send),
));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 4).direction(Direction::Send),
));
let idx = cluster.truncated_state(1, 1).get_index();
// Trigger a log compaction.
for i in 0..cluster.cfg.raft_store.raft_log_gc_count_limit * 2 {
cluster.must_put(format!("k{}", i).as_bytes(), format!("v{}", i).as_bytes());
}
let timer = Instant::now();
loop {
if cluster.truncated_state(1, 1).get_index() > idx {
break;
}
thread::sleep(Duration::from_millis(10));
if timer.elapsed() > Duration::from_secs(3) {
panic!("log is not compact after 3 seconds");
}
}
// Wait till leader peer goes to sleep again.
thread::sleep(
cluster.cfg.raft_store.raft_base_tick_interval.0
* 2
* cluster.cfg.raft_store.raft_election_timeout_ticks as u32,
);
cluster.clear_send_filters();
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 3).direction(Direction::Send),
));
cluster.pd_client.must_add_peer(1, new_peer(4, 4));
}

/// Tests whether leader resumes correctly when pre-transfer
/// leader response is delayed more than an election timeout.
#[test]
fn test_transfer_leader_delay() {
let mut cluster = new_node_cluster(0, 3);
configure_for_hibernate(&mut cluster);
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

let messages = Arc::new(Mutex::new(vec![]));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(1, 3)
.direction(Direction::Send)
.msg_type(MessageType::MsgTransferLeader)
.reserve_dropped(messages.clone()),
));
cluster.transfer_leader(1, new_peer(3, 3));
let timer = Instant::now();
while timer.elapsed() < Duration::from_secs(3) && messages.lock().unwrap().is_empty() {
thread::sleep(Duration::from_millis(10));
}
assert_eq!(messages.lock().unwrap().len(), 1);
// Wait till leader peer goes to sleep again.
thread::sleep(
cluster.cfg.raft_store.raft_base_tick_interval.0
* 2
* cluster.cfg.raft_store.raft_election_timeout_ticks as u32,
);
cluster.clear_send_filters();
cluster.add_send_filter(CloneFilterFactory(DropMessageFilter::new(
MessageType::MsgTimeoutNow,
)));
let router = cluster.sim.wl().get_router(1).unwrap();
router
.send_raft_message(messages.lock().unwrap().pop().unwrap())
.unwrap();
let timer = Instant::now();
while timer.elapsed() < Duration::from_secs(3) {
let resp = cluster.request(
b"k2",
vec![new_put_cmd(b"k2", b"v2")],
false,
Duration::from_secs(5),
);
let header = resp.get_header();
if !header.has_error() {
return;
}
if !header
.get_error()
.get_message()
.contains("proposal dropped")
{
panic!("response {:?} has error", resp);
}
thread::sleep(Duration::from_millis(10));
}
panic!("failed to request after 3 seconds");
}

0 comments on commit 5dcac54

Please sign in to comment.