diff --git a/Cargo.lock b/Cargo.lock index 963fa839bd3..1948d2bf963 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5535,6 +5535,7 @@ dependencies = [ "engine_traits", "futures 0.3.15", "grpcio", + "itertools", "keys", "kvproto", "lazy_static", @@ -5553,6 +5554,7 @@ dependencies = [ "tikv", "tikv_alloc", "tikv_util", + "tokio", "toml", "txn_types", ] diff --git a/components/snap_recovery/Cargo.toml b/components/snap_recovery/Cargo.toml index 985c7323af3..8b0b0ec4c3a 100644 --- a/components/snap_recovery/Cargo.toml +++ b/components/snap_recovery/Cargo.toml @@ -13,6 +13,7 @@ engine_rocks = { workspace = true } engine_traits = { workspace = true } futures = { version = "0.3", features = ["executor"] } grpcio = { workspace = true } +itertools = "0.10" keys = { workspace = true } kvproto = { workspace = true } lazy_static = "1.4" @@ -31,6 +32,7 @@ thiserror = "1.0" tikv = { workspace = true } tikv_alloc = { workspace = true } tikv_util = { workspace = true } +tokio = { version = "1.17", features = ["rt"] } toml = "0.5" txn_types = { workspace = true } diff --git a/components/snap_recovery/src/leader_keeper.rs b/components/snap_recovery/src/leader_keeper.rs new file mode 100644 index 00000000000..417d5becca3 --- /dev/null +++ b/components/snap_recovery/src/leader_keeper.rs @@ -0,0 +1,260 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + collections::HashSet, + marker::PhantomData, + sync::Mutex, + time::{Duration, Instant}, +}; + +use engine_traits::KvEngine; +use futures::compat::Future01CompatExt; +use itertools::Itertools; +use raftstore::{ + errors::{Error, Result}, + store::{Callback, CasualMessage, CasualRouter, SignificantMsg, SignificantRouter}, +}; +use tikv_util::{future::paired_future_callback, timer::GLOBAL_TIMER_HANDLE}; + +pub struct LeaderKeeper { + router: Router, + not_leader: HashSet, + + _ek: PhantomData, +} + +#[derive(Default)] +pub struct StepResult { + pub failed_leader: Vec<(u64, Error)>, + pub campaign_failed: Vec<(u64, Error)>, +} + +fn ellipse(ts: &[T], max_len: usize) -> String { + if ts.len() < max_len { + return format!("{:?}", &ts); + } + format!("{:?} (and {} more)", &ts[..max_len], ts.len() - max_len) +} + +impl std::fmt::Debug for StepResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StepResult") + .field( + "failed_leader", + &format_args!("{}", ellipse(&self.failed_leader, 8)), + ) + .field( + "campaign_failed", + &format_args!("{}", ellipse(&self.campaign_failed, 8)), + ) + .finish() + } +} + +impl LeaderKeeper +where + EK: KvEngine, + Router: CasualRouter + SignificantRouter + 'static, +{ + pub fn new(router: Router, to_keep: impl IntoIterator) -> Self { + Self { + router, + + not_leader: to_keep.into_iter().collect(), + _ek: PhantomData, + } + } + + pub async fn elect_and_wait_all_ready(&mut self) { + loop { + let now = Instant::now(); + let res = self.step().await; + info!("finished leader keeper stepping."; "result" => ?res, "take" => ?now.elapsed()); + GLOBAL_TIMER_HANDLE + .delay(now + Duration::from_secs(10)) + .compat() + .await + .expect("wrong with global timer, cannot stepping."); + if res.failed_leader.is_empty() { + return; + } + } + } + + pub async fn step(&mut self) -> StepResult { + const CONCURRENCY: usize = 256; + let r = Mutex::new(StepResult::default()); + let success = Mutex::new(HashSet::new()); + for batch in &self.not_leader.iter().chunks(CONCURRENCY) { + let tasks = batch.map(|region_id| async { + match self.check_leader(*region_id).await { + Ok(_) => { + success.lock().unwrap().insert(*region_id); + return; + } + Err(err) => r.lock().unwrap().failed_leader.push((*region_id, err)), + }; + + if let Err(err) = self.force_leader(*region_id) { + r.lock().unwrap().campaign_failed.push((*region_id, err)); + } + }); + futures::future::join_all(tasks).await; + } + success.lock().unwrap().iter().for_each(|i| { + debug_assert!(self.not_leader.remove(i)); + }); + r.into_inner().unwrap() + } + + async fn check_leader(&self, region_id: u64) -> Result<()> { + let (cb, fut) = paired_future_callback(); + let msg = SignificantMsg::LeaderCallback(Callback::::read(cb)); + self.router.significant_send(region_id, msg)?; + let resp = fut + .await + .map_err(|_err| Error::Other("canceled by store".into()))?; + let header = resp.response.get_header(); + if header.has_error() { + return Err(Error::Other(box_err!( + "got error: {:?}", + header.get_error() + ))); + } + Ok(()) + } + + fn force_leader(&self, region_id: u64) -> Result<()> { + let msg = CasualMessage::Campaign; + self.router.send(region_id, msg)?; + // We have nothing to do... + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::{cell::RefCell, collections::HashSet}; + + use engine_rocks::RocksEngine; + use engine_traits::KvEngine; + use futures::executor::block_on; + use kvproto::raft_cmdpb; + use raftstore::store::{CasualRouter, SignificantRouter}; + + use super::LeaderKeeper; + + #[derive(Default)] + struct MockStore { + regions: HashSet, + leaders: RefCell>, + } + + impl LeaderKeeper { + fn mut_router(&mut self) -> &mut Router { + &mut self.router + } + } + + // impl SignificantRouter for MockStore, which only handles `LeaderCallback`, + // return success when source region is leader, otherwise fill the error in + // header. + impl SignificantRouter for MockStore { + fn significant_send( + &self, + region_id: u64, + msg: raftstore::store::SignificantMsg, + ) -> raftstore::errors::Result<()> { + match msg { + raftstore::store::SignificantMsg::LeaderCallback(cb) => { + let mut resp = raft_cmdpb::RaftCmdResponse::default(); + let mut header = raft_cmdpb::RaftResponseHeader::default(); + if !self.leaders.borrow().contains(®ion_id) { + let mut err = kvproto::errorpb::Error::new(); + err.set_not_leader(kvproto::errorpb::NotLeader::new()); + header.set_error(err); + } + resp.set_header(header); + cb.invoke_with_response(resp); + Ok(()) + } + _ => panic!("unexpected msg"), + } + } + } + + // impl CasualRouter for MockStore, which only handles `Campaign`, + // add the region to leaders list when handling it. + impl CasualRouter for MockStore { + fn send( + &self, + region_id: u64, + msg: raftstore::store::CasualMessage, + ) -> raftstore::errors::Result<()> { + match msg { + raftstore::store::CasualMessage::Campaign => { + if !self.regions.contains(®ion_id) { + return Err(raftstore::Error::RegionNotFound(region_id)); + } + self.leaders.borrow_mut().insert(region_id); + Ok(()) + } + _ => panic!("unexpected msg"), + } + } + } + + #[test] + fn test_basic() { + let leaders = vec![1, 2, 3]; + let mut store = MockStore::default(); + store.regions = leaders.iter().copied().collect(); + let mut lk = LeaderKeeper::::new(store, leaders); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 3); + assert_eq!(res.campaign_failed.len(), 0); + } + + #[test] + fn test_failure() { + let leaders = vec![1, 2, 3]; + let mut store = MockStore::default(); + store.regions = leaders.iter().copied().collect(); + let mut lk = LeaderKeeper::::new(store, vec![1, 2, 3, 4]); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 4); + assert_eq!(res.campaign_failed.len(), 1); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 1); + assert_eq!(res.campaign_failed.len(), 1); + lk.mut_router().regions.insert(4); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 1); + assert_eq!(res.campaign_failed.len(), 0); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 0); + assert_eq!(res.campaign_failed.len(), 0); + } + + #[test] + fn test_many_regions() { + let leaders = std::iter::repeat_with({ + let mut x = 0; + move || { + x += 1; + x + } + }) + .take(2049) + .collect::>(); + let mut store = MockStore::default(); + store.regions = leaders.iter().copied().collect(); + let mut lk = LeaderKeeper::::new(store, leaders); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 2049); + assert_eq!(res.campaign_failed.len(), 0); + let res = block_on(lk.step()); + assert_eq!(res.failed_leader.len(), 0); + assert_eq!(res.campaign_failed.len(), 0); + } +} diff --git a/components/snap_recovery/src/lib.rs b/components/snap_recovery/src/lib.rs index 043cffb3c80..0baefb5eabe 100644 --- a/components/snap_recovery/src/lib.rs +++ b/components/snap_recovery/src/lib.rs @@ -9,5 +9,6 @@ pub use init_cluster::{enter_snap_recovery_mode, start_recovery}; pub use services::RecoveryService; mod data_resolver; +mod leader_keeper; mod metrics; mod region_meta_collector; diff --git a/components/snap_recovery/src/services.rs b/components/snap_recovery/src/services.rs index 4e2a4a2e5cf..06569deca26 100644 --- a/components/snap_recovery/src/services.rs +++ b/components/snap_recovery/src/services.rs @@ -27,7 +27,7 @@ use raftstore::{ router::RaftStoreRouter, store::{ fsm::RaftRouter, - msg::{Callback, CasualMessage, PeerMsg, SignificantMsg}, + msg::{PeerMsg, SignificantMsg}, transport::SignificantRouter, SnapshotRecoveryWaitApplySyncer, }, @@ -37,7 +37,8 @@ use tikv_util::sys::thread::{StdThreadBuildWrapper, ThreadBuildWrapper}; use crate::{ data_resolver::DataResolverManager, - metrics::{CURRENT_WAIT_APPLY_LEADER, CURRENT_WAIT_ELECTION_LEADER, REGION_EVENT_COUNTER}, + leader_keeper::LeaderKeeper, + metrics::{CURRENT_WAIT_APPLY_LEADER, REGION_EVENT_COUNTER}, region_meta_collector::RegionMetaCollector, }; @@ -240,50 +241,16 @@ impl RecoverData for RecoveryService { } } - let mut rxs = Vec::with_capacity(leaders.len()); - for ®ion_id in &leaders { - if let Err(e) = raft_router.send_casual_msg(region_id, CasualMessage::Campaign) { - // TODO: retry may necessay - warn!("region fails to campaign: "; - "region_id" => region_id, - "err" => ?e); - continue; - } else { - debug!("region starts to campaign"; "region_id" => region_id); - } - - let (tx, rx) = sync_channel(1); - let callback = Callback::read(Box::new(move |_| { - if tx.send(1).is_err() { - error!("response failed"; "region_id" => region_id); - } - })); - if let Err(e) = raft_router - .significant_send(region_id, SignificantMsg::LeaderCallback(callback)) - { - warn!("LeaderCallback failed"; "err" => ?e, "region_id" => region_id); - } - rxs.push(Some(rx)); - } - - info!("send assign leader request done"; "count" => %leaders.len()); - - // leader is campaign and be ensured as leader - for (rid, rx) in leaders.iter().zip(rxs) { - if let Some(rx) = rx { - CURRENT_WAIT_ELECTION_LEADER.set(*rid as _); - match rx.recv() { - Ok(id) => { - debug!("leader is assigned for region"; "region_id" => %id); - } - Err(e) => { - error!("check leader failed"; "error" => ?e); - } - } - } - } - CURRENT_WAIT_ELECTION_LEADER.set(0); - + let mut lk = LeaderKeeper::new(raft_router.clone(), leaders.clone()); + // We must use the tokio runtime here because there isn't a `block_in_place` + // like thing in the futures executor. It simply panics when block + // on the block_on context. + // It is also impossible to directly `await` here, because that will make + // borrowing to the raft router crosses the await point. + tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to build temporary tokio runtime.") + .block_on(lk.elect_and_wait_all_ready()); info!("all region leader assigned done"; "count" => %leaders.len()); let now = Instant::now();