Skip to content

Commit

Permalink
refactor tests
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Aug 24, 2022
1 parent 111db93 commit d58cfbb
Show file tree
Hide file tree
Showing 3 changed files with 1,035 additions and 992 deletions.
6 changes: 5 additions & 1 deletion components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,11 @@ impl<CER: ConfiguredRaftEngine> TiKvServer<CER> {
});
// engine_tiflash::RocksEngine has engine_rocks::RocksEngine inside
let mut kv_engine = TiFlashEngine::from_rocks(kv_engine);
kv_engine.init(engine_store_server_helper, self.proxy_config.raft_store.snap_handle_pool_size, Some(ffi_hub));
kv_engine.init(
engine_store_server_helper,
self.proxy_config.raft_store.snap_handle_pool_size,
Some(ffi_hub),
);

let engines = Engines::new(kv_engine, raft_engine);

Expand Down
20 changes: 16 additions & 4 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl TiFlashObserver {
impl Coprocessor for TiFlashObserver {
fn stop(&self) {
// TODO(tiflash) remove this when pre apply merged
info!("shutdown tiflash observer"; "peer_id" => self.peer_id);
self.apply_snap_pool.as_ref().unwrap().shutdown();
}
}
Expand Down Expand Up @@ -653,7 +654,7 @@ impl ApplySnapshotObserver for TiFlashObserver {
let ssts = retrieve_sst_files(snap);
self.engine
.pending_applies_count
.fetch_add(1, Ordering::Relaxed);
.fetch_add(1, Ordering::SeqCst);
match self.apply_snap_pool.as_ref() {
Some(p) => {
p.spawn(async move {
Expand All @@ -675,7 +676,7 @@ impl ApplySnapshotObserver for TiFlashObserver {
None => {
self.engine
.pending_applies_count
.fetch_sub(1, Ordering::Relaxed);
.fetch_sub(1, Ordering::SeqCst);
error!("apply_snap_pool is not initialized, quit background pre apply"; "peer_id" => peer_id, "region_id" => ob_ctx.region().get_id());
}
}
Expand Down Expand Up @@ -706,7 +707,12 @@ impl ApplySnapshotObserver for TiFlashObserver {
Some(t) => {
let neer_retry = match t.recv.recv() {
Ok(snap_ptr) => {
info!("get prehandled snapshot success");
info!("get prehandled snapshot success";
"peer_id" => ?snap_key,
"region" => ?ob_ctx.region(),
"pending" => self.engine
.pending_applies_count.load(Ordering::SeqCst),
);
self.engine_store_server_helper
.apply_pre_handled_snapshot(snap_ptr.0);
false
Expand All @@ -721,7 +727,13 @@ impl ApplySnapshotObserver for TiFlashObserver {
};
self.engine
.pending_applies_count
.fetch_sub(1, Ordering::Relaxed);
.fetch_sub(1, Ordering::SeqCst);
info!("apply snapshot finished";
"peer_id" => ?snap_key,
"region" => ?ob_ctx.region(),
"pending" => self.engine
.pending_applies_count.load(Ordering::SeqCst),
);
neer_retry
}
None => {
Expand Down
Loading

0 comments on commit d58cfbb

Please sign in to comment.