Skip to content

Commit

Permalink
Handle destroy peer by observer (#132)
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo authored Aug 3, 2022
1 parent 714b6ce commit 2eea34c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 12 deletions.
23 changes: 19 additions & 4 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::{mpsc, Arc, Mutex};
use collections::HashMap;
use engine_tiflash::FsStatsExt;
use kvproto::raft_cmdpb::{AdminCmdType, AdminRequest};
use raft::{eraftpb, StateRole};
use sst_importer::SstImporter;
use tikv_util::{debug, error};
use yatp::{
Expand Down Expand Up @@ -144,10 +145,10 @@ impl TiFlashObserver {
// TIFLASH_OBSERVER_PRIORITY,
// BoxApplySnapshotObserver::new(self.clone()),
// );
// coprocessor_host.registry.register_region_change_observer(
// TIFLASH_OBSERVER_PRIORITY,
// BoxRegionChangeObserver::new(self.clone()),
// );
coprocessor_host.registry.register_region_change_observer(
TIFLASH_OBSERVER_PRIORITY,
BoxRegionChangeObserver::new(self.clone()),
);
// coprocessor_host.registry.register_pd_task_observer(
// TIFLASH_OBSERVER_PRIORITY,
// BoxPdTaskObserver::new(self.clone()),
Expand Down Expand Up @@ -219,3 +220,17 @@ impl QueryObserver for TiFlashObserver {
);
}
}

impl RegionChangeObserver for TiFlashObserver {
fn on_region_changed(
&self,
ob_ctx: &mut ObserverContext<'_>,
e: RegionChangeEvent,
_: StateRole,
) {
if e == RegionChangeEvent::Destroy {
self.engine_store_server_helper
.handle_destroy(ob_ctx.region().get_id());
}
}
}
8 changes: 0 additions & 8 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3258,14 +3258,6 @@ where
.unsafe_recovery_maybe_finish_wait_apply(/*force=*/ true);
}

{
let engine_store_server_helper =
crate::engine_store_ffi::gen_engine_store_server_helper(
self.ctx.cfg.engine_store_server_helper,
);
engine_store_server_helper.handle_destroy(region_id);
}

let mut meta = self.ctx.store_meta.lock().unwrap();

if meta.atomic_snap_regions.contains_key(&self.region_id()) {
Expand Down
57 changes: 57 additions & 0 deletions tests/proxy/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,60 @@ fn test_empty_cmd() {

cluster.shutdown();
}

#[test]
fn test_handle_destroy() {
let (mut cluster, pd_client) = new_mock_cluster(0, 3);

// Disable raft log gc in this test case.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60);

// Disable default max peer count check.
pd_client.disable_default_operator();

cluster.run();
cluster.must_put(b"k1", b"v1");
let eng_ids = cluster
.engines
.iter()
.map(|e| e.0.to_owned())
.collect::<Vec<_>>();

let region = cluster.get_region(b"k1");
let region_id = region.get_id();
let peer_1 = find_peer(&region, eng_ids[0]).cloned().unwrap();
let peer_2 = find_peer(&region, eng_ids[1]).cloned().unwrap();
cluster.must_transfer_leader(region_id, peer_1);

iter_ffi_helpers(
&cluster,
Some(vec![eng_ids[1]]),
&mut |_, _, ffi: &mut FFIHelperSet| {
let server = &ffi.engine_store_server;
assert!(server.kvstore.contains_key(&region_id));
},
);

pd_client.must_remove_peer(region_id, peer_2);

check_key(
&cluster,
b"k1",
b"k2",
Some(false),
None,
Some(vec![eng_ids[1]]),
);

// Region removed in server.
iter_ffi_helpers(
&cluster,
Some(vec![eng_ids[1]]),
&mut |_, _, ffi: &mut FFIHelperSet| {
let server = &ffi.engine_store_server;
assert!(!server.kvstore.contains_key(&region_id));
},
);

cluster.shutdown();
}

0 comments on commit 2eea34c

Please sign in to comment.