Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: clean callback when peer is destroyed #1683

Merged
merged 5 commits into from
Mar 17, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ struct ReadIndexRequest {
renew_lease_time: Timespec,
}

impl Drop for ReadIndexRequest {
fn drop(&mut self) {
if !self.cmds.is_empty() {
panic!("callback of index read {} is leak.", self.uuid);
}
}
}

#[derive(Default)]
struct ReadIndexQueue {
reads: VecDeque<ReadIndexRequest>,
Expand All @@ -71,8 +79,8 @@ struct ReadIndexQueue {

impl ReadIndexQueue {
fn clear_uncommitted(&mut self, tag: &str, term: u64) {
for read in self.reads.drain(self.ready_cnt..) {
for (req, cb) in read.cmds {
for mut read in self.reads.drain(self.ready_cnt..) {
for (req, cb) in read.cmds.drain(..) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just a refactoring behavior and have nothing to do with this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When drop is implemented, it can be moved partially.

let uuid = util::get_uuid_from_req(&req).unwrap();
apply::notify_stale_req(tag, term, uuid, cb);
}
Expand Down Expand Up @@ -362,6 +370,14 @@ impl Peer {
error!("{} failed to schedule clear data task: {:?}", self.tag, e);
}
}

for mut read in self.pending_reads.reads.drain(..) {
for (req, cb) in read.cmds.drain(..) {
let uuid = util::get_uuid_from_req(&req).unwrap();
apply::notify_req_region_removed(region.get_id(), self.peer.get_id(), uuid, cb);
}
}

self.coprocessor_host.shutdown();
info!("{} destroy itself, takes {:?}", self.tag, t.elapsed());

Expand Down Expand Up @@ -739,9 +755,9 @@ impl Peer {
let mut propose_time = None;
if self.ready_to_handle_read() {
for state in &ready.read_states {
let read = self.pending_reads.reads.pop_front().unwrap();
let mut read = self.pending_reads.reads.pop_front().unwrap();
assert_eq!(state.request_ctx.as_slice(), read.uuid.as_bytes());
for (req, cb) in read.cmds {
for (req, cb) in read.cmds.drain(..) {
// TODO: we should add test case that a split happens before pending
// read-index is handled. To do this we need to control async-apply
// procedure precisely.
Expand All @@ -760,12 +776,10 @@ impl Peer {

// Note that only after handle read_states can we identify what requests are
// actually stale.
if let Some(ref ss) = ready.ss {
if ss.raft_state != StateRole::Leader {
let term = self.term();
// all uncommitted reads will be dropped silently in raft.
self.pending_reads.clear_uncommitted(&self.tag, term);
}
if ready.ss.is_some() {
let term = self.term();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not clear for leader before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the peer is not leader before, there will be no uncommitted in pending reads. Remove the check just make code more simple.

// all uncommitted reads will be dropped silently in raft.
self.pending_reads.clear_uncommitted(&self.tag, term);
}

if let Some(Either::Right(_)) = self.leader_lease_expired_time {
Expand Down Expand Up @@ -809,8 +823,8 @@ impl Peer {

if self.pending_reads.ready_cnt > 0 && self.ready_to_handle_read() {
for _ in 0..self.pending_reads.ready_cnt {
let read = self.pending_reads.reads.pop_front().unwrap();
for (req, cb) in read.cmds {
let mut read = self.pending_reads.reads.pop_front().unwrap();
for (req, cb) in read.cmds.drain(..) {
self.handle_read(req, cb);
}
}
Expand Down
21 changes: 9 additions & 12 deletions src/raftstore/store/worker/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,10 @@ pub struct PendingCmd {
pub cb: Option<Callback>,
}

impl PendingCmd {
#[inline]
fn call(&mut self, resp: RaftCmdResponse) {
self.cb.take().unwrap().call_box((resp,));
}
}

impl Drop for PendingCmd {
fn drop(&mut self) {
if self.cb.is_some() {
panic!("callback of {} is leak.", self.uuid);
panic!("callback of pending command {} is leak.", self.uuid);
}
}
}
Expand Down Expand Up @@ -119,15 +112,19 @@ pub enum ExecResult {
}

/// Call the callback of `cmd` that the region is removed.
pub fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd) {
fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd) {
notify_req_region_removed(region_id, peer_id, cmd.uuid, cmd.cb.take().unwrap());
}

pub fn notify_req_region_removed(region_id: u64, peer_id: u64, uuid: Uuid, cb: Callback) {
let region_not_found = Error::RegionNotFound(region_id);
let mut resp = cmd_resp::new_error(region_not_found);
cmd_resp::bind_uuid(&mut resp, cmd.uuid);
cmd_resp::bind_uuid(&mut resp, uuid);
debug!("[region {}] {} is removed, notify {}.",
region_id,
peer_id,
cmd.uuid);
cmd.call(resp);
uuid);
cb(resp);
}

/// Call the callback of `cmd` when it can not be processed further.
Expand Down
2 changes: 1 addition & 1 deletion src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl<T: FnOnce()> Drop for DeferContext<T> {
}

/// Represents a value of one of two possible types (a more generic Result.)
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Either<L, R> {
Left(L),
Right(R),
Expand Down
53 changes: 46 additions & 7 deletions tests/raftstore/test_lease_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

use std::thread;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::sync::atomic::*;
use std::time::*;
use std::collections::HashSet;

use time::Duration as TimeDuration;

use kvproto::eraftpb::MessageType;
use kvproto::eraftpb::{MessageType, ConfChangeType};
use kvproto::metapb::{Peer, Region};
use kvproto::raft_cmdpb::CmdType;
use kvproto::raft_serverpb::{RaftMessage, RaftLocalState};
Expand All @@ -37,18 +38,22 @@ use super::transport_simulate::*;
use super::util::*;

#[derive(Clone, Default)]
struct LeaseReadDetector {
struct LeaseReadFilter {
ctx: Arc<RwLock<HashSet<Vec<u8>>>>,
take: bool,
}

impl Filter<RaftMessage> for LeaseReadDetector {
impl Filter<RaftMessage> for LeaseReadFilter {
fn before(&self, msgs: &mut Vec<RaftMessage>) -> Result<()> {
let mut ctx = self.ctx.wl();
for m in msgs {
let msg = m.get_message();
let msg = m.mut_message();
if msg.get_msg_type() == MessageType::MsgHeartbeat && !msg.get_context().is_empty() {
ctx.insert(msg.get_context().to_owned());
}
if self.take {
msg.take_context();
}
}
Ok(())
}
Expand Down Expand Up @@ -153,7 +158,7 @@ fn test_renew_lease<T: Simulator>(cluster: &mut Cluster<T>) {
let state: RaftLocalState = engine.get_msg_cf(storage::CF_RAFT, &state_key).unwrap().unwrap();
let last_index = state.get_last_index();

let detector = LeaseReadDetector::default();
let detector = LeaseReadFilter::default();
cluster.add_send_filter(CloneFilterFactory(detector.clone()));

// Issue a read request and check the value on response.
Expand Down Expand Up @@ -299,7 +304,7 @@ fn test_lease_unsafe_during_leader_transfers<T: Simulator>(cluster: &mut Cluster
let peer3 = new_peer(peer3_store_id, 3);
cluster.run();

let detector = LeaseReadDetector::default();
let detector = LeaseReadFilter::default();
cluster.add_send_filter(CloneFilterFactory(detector.clone()));

// write the initial value for a key.
Expand Down Expand Up @@ -380,3 +385,37 @@ fn test_server_lease_unsafe_during_leader_transfers() {
let mut cluster = new_node_cluster(0, count);
test_lease_unsafe_during_leader_transfers(&mut cluster);
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a brief description of what this test case is for.

fn test_node_callback_when_destroyed() {
let count = 3;
let mut cluster = new_node_cluster(0, count);
cluster.run();
cluster.must_put(b"k1", b"v1");
let leader = cluster.leader_of_region(1).unwrap();
let cc = new_change_peer_request(ConfChangeType::RemoveNode, leader.clone());
let epoch = cluster.get_region_epoch(1);
let req = new_admin_request(1, &epoch, cc);
// so the leader can't commit the conf change yet.
let block = Arc::new(AtomicBool::new(true));
cluster.add_send_filter(CloneFilterFactory(RegionPacketFilter::new(1, leader.get_store_id())
.msg_type(MessageType::MsgAppendResponse)
.direction(Direction::Recv)
.when(block.clone())));
let mut filter = LeaseReadFilter::default();
filter.take = true;
// so the leader can't perform read index.
cluster.add_send_filter(CloneFilterFactory(filter.clone()));
// it always timeout, no need to wait.
let _ = cluster.call_command_on_leader(req, Duration::from_millis(500));

let get = new_get_cmd(b"k1");
let req = new_request(1, epoch, vec![get], true);
block.store(false, Ordering::SeqCst);
let resp = cluster.call_command_on_leader(req, Duration::from_secs(3)).unwrap();
assert!(!filter.ctx.rl().is_empty(),
"read index should be performed");
assert!(resp.get_header().get_error().has_region_not_found(),
"{:?}",
resp);
}
35 changes: 24 additions & 11 deletions tests/raftstore/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use kvproto::eraftpb::MessageType;
use tikv::raftstore::{Result, Error};
use tikv::raftstore::store::{Msg as StoreMsg, Transport};
use tikv::server::transport::*;
use tikv::util::HandyRwLock;
use tikv::util::transport;
use tikv::util::{HandyRwLock, transport, Either};

use rand;
use std::collections::HashMap;
Expand All @@ -26,7 +25,7 @@ use std::sync::mpsc::Sender;
use std::marker::PhantomData;
use std::{time, usize, thread};
use std::vec::Vec;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::*;

pub trait Channel<M>: Send + Clone {
fn send(&self, m: M) -> Result<()>;
Expand Down Expand Up @@ -279,7 +278,7 @@ pub struct RegionPacketFilter {
region_id: u64,
store_id: u64,
direction: Direction,
allow: Arc<AtomicUsize>,
block: Either<Arc<AtomicUsize>, Arc<AtomicBool>>,
msg_type: Option<MessageType>,
}

Expand All @@ -294,11 +293,20 @@ impl Filter<RaftMessage> for RegionPacketFilter {
(self.direction.is_send() && self.store_id == from_store_id ||
self.direction.is_recv() && self.store_id == to_store_id) &&
self.msg_type.as_ref().map_or(true, |t| t == &m.get_message().get_msg_type()) {
if self.allow.load(Ordering::Relaxed) > 0 {
self.allow.fetch_sub(1, Ordering::Relaxed);
return true;
}
return false;
return match self.block {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some annotations in this file, it's hard for me to understand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but not related to this pr.

Either::Left(ref count) => {
loop {
let left = count.load(Ordering::SeqCst);
if left == 0 {
return false;
}
if count.compare_and_swap(left, left - 1, Ordering::SeqCst) == left {
return true;
}
}
}
Either::Right(ref block) => !block.load(Ordering::SeqCst),
};
}
true
});
Expand All @@ -313,7 +321,7 @@ impl RegionPacketFilter {
store_id: store_id,
direction: Direction::Both,
msg_type: None,
allow: Arc::new(AtomicUsize::new(0)),
block: Either::Right(Arc::new(AtomicBool::new(true))),
}
}

Expand All @@ -328,7 +336,12 @@ impl RegionPacketFilter {
}

pub fn allow(mut self, number: usize) -> RegionPacketFilter {
self.allow = Arc::new(AtomicUsize::new(number));
self.block = Either::Left(Arc::new(AtomicUsize::new(number)));
self
}

pub fn when(mut self, condition: Arc<AtomicBool>) -> RegionPacketFilter {
self.block = Either::Right(condition);
self
}
}
Expand Down