From 2eea34c4d334d88d0ded40147b535ceff97e0b60 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 3 Aug 2022 16:48:37 +0800 Subject: [PATCH] Handle destroy peer by observer (#132) Signed-off-by: CalvinNeo --- .../src/engine_store_ffi/observer.rs | 23 ++++++-- components/raftstore/src/store/fsm/peer.rs | 8 --- tests/proxy/normal.rs | 57 +++++++++++++++++++ 3 files changed, 76 insertions(+), 12 deletions(-) diff --git a/components/raftstore/src/engine_store_ffi/observer.rs b/components/raftstore/src/engine_store_ffi/observer.rs index dd174612973..9c47050b601 100644 --- a/components/raftstore/src/engine_store_ffi/observer.rs +++ b/components/raftstore/src/engine_store_ffi/observer.rs @@ -5,6 +5,7 @@ use std::sync::{mpsc, Arc, Mutex}; use collections::HashMap; use engine_tiflash::FsStatsExt; use kvproto::raft_cmdpb::{AdminCmdType, AdminRequest}; +use raft::{eraftpb, StateRole}; use sst_importer::SstImporter; use tikv_util::{debug, error}; use yatp::{ @@ -144,10 +145,10 @@ impl TiFlashObserver { // TIFLASH_OBSERVER_PRIORITY, // BoxApplySnapshotObserver::new(self.clone()), // ); - // coprocessor_host.registry.register_region_change_observer( - // TIFLASH_OBSERVER_PRIORITY, - // BoxRegionChangeObserver::new(self.clone()), - // ); + coprocessor_host.registry.register_region_change_observer( + TIFLASH_OBSERVER_PRIORITY, + BoxRegionChangeObserver::new(self.clone()), + ); // coprocessor_host.registry.register_pd_task_observer( // TIFLASH_OBSERVER_PRIORITY, // BoxPdTaskObserver::new(self.clone()), @@ -219,3 +220,17 @@ impl QueryObserver for TiFlashObserver { ); } } + +impl RegionChangeObserver for TiFlashObserver { + fn on_region_changed( + &self, + ob_ctx: &mut ObserverContext<'_>, + e: RegionChangeEvent, + _: StateRole, + ) { + if e == RegionChangeEvent::Destroy { + self.engine_store_server_helper + .handle_destroy(ob_ctx.region().get_id()); + } + } +} diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 0b51b84ad1f..ea3f610b162 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -3258,14 +3258,6 @@ where .unsafe_recovery_maybe_finish_wait_apply(/*force=*/ true); } - { - let engine_store_server_helper = - crate::engine_store_ffi::gen_engine_store_server_helper( - self.ctx.cfg.engine_store_server_helper, - ); - engine_store_server_helper.handle_destroy(region_id); - } - let mut meta = self.ctx.store_meta.lock().unwrap(); if meta.atomic_snap_regions.contains_key(&self.region_id()) { diff --git a/tests/proxy/normal.rs b/tests/proxy/normal.rs index 9cb6e85004b..cffdd9a6e82 100644 --- a/tests/proxy/normal.rs +++ b/tests/proxy/normal.rs @@ -306,3 +306,60 @@ fn test_empty_cmd() { cluster.shutdown(); } + +#[test] +fn test_handle_destroy() { + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + + // Disable raft log gc in this test case. + cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + + cluster.run(); + cluster.must_put(b"k1", b"v1"); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + let peer_1 = find_peer(®ion, eng_ids[0]).cloned().unwrap(); + let peer_2 = find_peer(®ion, eng_ids[1]).cloned().unwrap(); + cluster.must_transfer_leader(region_id, peer_1); + + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(server.kvstore.contains_key(®ion_id)); + }, + ); + + pd_client.must_remove_peer(region_id, peer_2); + + check_key( + &cluster, + b"k1", + b"k2", + Some(false), + None, + Some(vec![eng_ids[1]]), + ); + + // Region removed in server. + iter_ffi_helpers( + &cluster, + Some(vec![eng_ids[1]]), + &mut |_, _, ffi: &mut FFIHelperSet| { + let server = &ffi.engine_store_server; + assert!(!server.kvstore.contains_key(®ion_id)); + }, + ); + + cluster.shutdown(); +}