Skip to content

Commit

Permalink
snap_recovery: added keep leader (tikv#15124)
Browse files Browse the repository at this point in the history
close tikv#15122

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
YuJuncen and ti-chi-bot[bot] committed Oct 10, 2023
1 parent 304e329 commit 6e6cce4
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/snap_recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ engine_rocks = { workspace = true }
engine_traits = { workspace = true }
futures = { version = "0.3", features = ["executor"] }
grpcio = { workspace = true }
itertools = "0.10"
keys = { workspace = true }
kvproto = { workspace = true }
lazy_static = "1.4"
Expand All @@ -31,6 +32,7 @@ thiserror = "1.0"
tikv = { workspace = true }
tikv_alloc = { workspace = true }
tikv_util = { workspace = true }
tokio = { version = "1.17", features = ["rt"] }
toml = "0.5"
txn_types = { workspace = true }

260 changes: 260 additions & 0 deletions components/snap_recovery/src/leader_keeper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
collections::HashSet,
marker::PhantomData,
sync::Mutex,
time::{Duration, Instant},
};

use engine_traits::KvEngine;
use futures::compat::Future01CompatExt;
use itertools::Itertools;
use raftstore::{
errors::{Error, Result},
store::{Callback, CasualMessage, CasualRouter, SignificantMsg, SignificantRouter},
};
use tikv_util::{future::paired_future_callback, timer::GLOBAL_TIMER_HANDLE};

pub struct LeaderKeeper<EK, Router> {
router: Router,
not_leader: HashSet<u64>,

_ek: PhantomData<EK>,
}

#[derive(Default)]
pub struct StepResult {
pub failed_leader: Vec<(u64, Error)>,
pub campaign_failed: Vec<(u64, Error)>,
}

fn ellipse<T: std::fmt::Debug>(ts: &[T], max_len: usize) -> String {
if ts.len() < max_len {
return format!("{:?}", &ts);
}
format!("{:?} (and {} more)", &ts[..max_len], ts.len() - max_len)
}

impl std::fmt::Debug for StepResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StepResult")
.field(
"failed_leader",
&format_args!("{}", ellipse(&self.failed_leader, 8)),
)
.field(
"campaign_failed",
&format_args!("{}", ellipse(&self.campaign_failed, 8)),
)
.finish()
}
}

impl<EK, Router> LeaderKeeper<EK, Router>
where
EK: KvEngine,
Router: CasualRouter<EK> + SignificantRouter<EK> + 'static,
{
pub fn new(router: Router, to_keep: impl IntoIterator<Item = u64>) -> Self {
Self {
router,

not_leader: to_keep.into_iter().collect(),
_ek: PhantomData,
}
}

pub async fn elect_and_wait_all_ready(&mut self) {
loop {
let now = Instant::now();
let res = self.step().await;
info!("finished leader keeper stepping."; "result" => ?res, "take" => ?now.elapsed());
GLOBAL_TIMER_HANDLE
.delay(now + Duration::from_secs(10))
.compat()
.await
.expect("wrong with global timer, cannot stepping.");
if res.failed_leader.is_empty() {
return;
}
}
}

pub async fn step(&mut self) -> StepResult {
const CONCURRENCY: usize = 256;
let r = Mutex::new(StepResult::default());
let success = Mutex::new(HashSet::new());
for batch in &self.not_leader.iter().chunks(CONCURRENCY) {
let tasks = batch.map(|region_id| async {
match self.check_leader(*region_id).await {
Ok(_) => {
success.lock().unwrap().insert(*region_id);
return;
}
Err(err) => r.lock().unwrap().failed_leader.push((*region_id, err)),
};

if let Err(err) = self.force_leader(*region_id) {
r.lock().unwrap().campaign_failed.push((*region_id, err));
}
});
futures::future::join_all(tasks).await;
}
success.lock().unwrap().iter().for_each(|i| {
debug_assert!(self.not_leader.remove(i));
});
r.into_inner().unwrap()
}

async fn check_leader(&self, region_id: u64) -> Result<()> {
let (cb, fut) = paired_future_callback();
let msg = SignificantMsg::LeaderCallback(Callback::<EK::Snapshot>::read(cb));
self.router.significant_send(region_id, msg)?;
let resp = fut
.await
.map_err(|_err| Error::Other("canceled by store".into()))?;
let header = resp.response.get_header();
if header.has_error() {
return Err(Error::Other(box_err!(
"got error: {:?}",
header.get_error()
)));
}
Ok(())
}

fn force_leader(&self, region_id: u64) -> Result<()> {
let msg = CasualMessage::Campaign;
self.router.send(region_id, msg)?;
// We have nothing to do...
Ok(())
}
}

#[cfg(test)]
mod test {
use std::{cell::RefCell, collections::HashSet};

use engine_rocks::RocksEngine;
use engine_traits::KvEngine;
use futures::executor::block_on;
use kvproto::raft_cmdpb;
use raftstore::store::{CasualRouter, SignificantRouter};

use super::LeaderKeeper;

#[derive(Default)]
struct MockStore {
regions: HashSet<u64>,
leaders: RefCell<HashSet<u64>>,
}

impl<EK, Router> LeaderKeeper<EK, Router> {
fn mut_router(&mut self) -> &mut Router {
&mut self.router
}
}

// impl SignificantRouter for MockStore, which only handles `LeaderCallback`,
// return success when source region is leader, otherwise fill the error in
// header.
impl<EK: KvEngine> SignificantRouter<EK> for MockStore {
fn significant_send(
&self,
region_id: u64,
msg: raftstore::store::SignificantMsg<EK::Snapshot>,
) -> raftstore::errors::Result<()> {
match msg {
raftstore::store::SignificantMsg::LeaderCallback(cb) => {
let mut resp = raft_cmdpb::RaftCmdResponse::default();
let mut header = raft_cmdpb::RaftResponseHeader::default();
if !self.leaders.borrow().contains(&region_id) {
let mut err = kvproto::errorpb::Error::new();
err.set_not_leader(kvproto::errorpb::NotLeader::new());
header.set_error(err);
}
resp.set_header(header);
cb.invoke_with_response(resp);
Ok(())
}
_ => panic!("unexpected msg"),
}
}
}

// impl CasualRouter for MockStore, which only handles `Campaign`,
// add the region to leaders list when handling it.
impl<EK: KvEngine> CasualRouter<EK> for MockStore {
fn send(
&self,
region_id: u64,
msg: raftstore::store::CasualMessage<EK>,
) -> raftstore::errors::Result<()> {
match msg {
raftstore::store::CasualMessage::Campaign => {
if !self.regions.contains(&region_id) {
return Err(raftstore::Error::RegionNotFound(region_id));
}
self.leaders.borrow_mut().insert(region_id);
Ok(())
}
_ => panic!("unexpected msg"),
}
}
}

#[test]
fn test_basic() {
let leaders = vec![1, 2, 3];
let mut store = MockStore::default();
store.regions = leaders.iter().copied().collect();
let mut lk = LeaderKeeper::<RocksEngine, _>::new(store, leaders);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 3);
assert_eq!(res.campaign_failed.len(), 0);
}

#[test]
fn test_failure() {
let leaders = vec![1, 2, 3];
let mut store = MockStore::default();
store.regions = leaders.iter().copied().collect();
let mut lk = LeaderKeeper::<RocksEngine, _>::new(store, vec![1, 2, 3, 4]);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 4);
assert_eq!(res.campaign_failed.len(), 1);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 1);
assert_eq!(res.campaign_failed.len(), 1);
lk.mut_router().regions.insert(4);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 1);
assert_eq!(res.campaign_failed.len(), 0);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 0);
assert_eq!(res.campaign_failed.len(), 0);
}

#[test]
fn test_many_regions() {
let leaders = std::iter::repeat_with({
let mut x = 0;
move || {
x += 1;
x
}
})
.take(2049)
.collect::<Vec<_>>();
let mut store = MockStore::default();
store.regions = leaders.iter().copied().collect();
let mut lk = LeaderKeeper::<RocksEngine, _>::new(store, leaders);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 2049);
assert_eq!(res.campaign_failed.len(), 0);
let res = block_on(lk.step());
assert_eq!(res.failed_leader.len(), 0);
assert_eq!(res.campaign_failed.len(), 0);
}
}
1 change: 1 addition & 0 deletions components/snap_recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ pub use init_cluster::{enter_snap_recovery_mode, start_recovery};
pub use services::RecoveryService;

mod data_resolver;
mod leader_keeper;
mod metrics;
mod region_meta_collector;
59 changes: 13 additions & 46 deletions components/snap_recovery/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use raftstore::{
router::RaftStoreRouter,
store::{
fsm::RaftRouter,
msg::{Callback, CasualMessage, PeerMsg, SignificantMsg},
msg::{PeerMsg, SignificantMsg},
transport::SignificantRouter,
SnapshotRecoveryWaitApplySyncer,
},
Expand All @@ -37,7 +37,8 @@ use tikv_util::sys::thread::{StdThreadBuildWrapper, ThreadBuildWrapper};

use crate::{
data_resolver::DataResolverManager,
metrics::{CURRENT_WAIT_APPLY_LEADER, CURRENT_WAIT_ELECTION_LEADER, REGION_EVENT_COUNTER},
leader_keeper::LeaderKeeper,
metrics::{CURRENT_WAIT_APPLY_LEADER, REGION_EVENT_COUNTER},
region_meta_collector::RegionMetaCollector,
};

Expand Down Expand Up @@ -240,50 +241,16 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
}
}

let mut rxs = Vec::with_capacity(leaders.len());
for &region_id in &leaders {
if let Err(e) = raft_router.send_casual_msg(region_id, CasualMessage::Campaign) {
// TODO: retry may necessay
warn!("region fails to campaign: ";
"region_id" => region_id,
"err" => ?e);
continue;
} else {
debug!("region starts to campaign"; "region_id" => region_id);
}

let (tx, rx) = sync_channel(1);
let callback = Callback::read(Box::new(move |_| {
if tx.send(1).is_err() {
error!("response failed"; "region_id" => region_id);
}
}));
if let Err(e) = raft_router
.significant_send(region_id, SignificantMsg::LeaderCallback(callback))
{
warn!("LeaderCallback failed"; "err" => ?e, "region_id" => region_id);
}
rxs.push(Some(rx));
}

info!("send assign leader request done"; "count" => %leaders.len());

// leader is campaign and be ensured as leader
for (rid, rx) in leaders.iter().zip(rxs) {
if let Some(rx) = rx {
CURRENT_WAIT_ELECTION_LEADER.set(*rid as _);
match rx.recv() {
Ok(id) => {
debug!("leader is assigned for region"; "region_id" => %id);
}
Err(e) => {
error!("check leader failed"; "error" => ?e);
}
}
}
}
CURRENT_WAIT_ELECTION_LEADER.set(0);

let mut lk = LeaderKeeper::new(raft_router.clone(), leaders.clone());
// We must use the tokio runtime here because there isn't a `block_in_place`
// like thing in the futures executor. It simply panics when block
// on the block_on context.
// It is also impossible to directly `await` here, because that will make
// borrowing to the raft router crosses the await point.
tokio::runtime::Builder::new_current_thread()
.build()
.expect("failed to build temporary tokio runtime.")
.block_on(lk.elect_and_wait_all_ready());
info!("all region leader assigned done"; "count" => %leaders.len());

let now = Instant::now();
Expand Down

0 comments on commit 6e6cce4

Please sign in to comment.