Skip to content

Commit

Permalink
address fix
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Aug 26, 2022
1 parent d58cfbb commit f5da45e
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 95 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/pr-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ jobs:
cargo test --features compat_old_proxy --package tests --test failpoints cases::test_import_service
cargo test --features compat_old_proxy --package tests --test failpoints cases::test_proxy_replica_read
# tests based on new-mock-engine-store, with compat for new proxy
cargo test --package tests --test proxy normal
cargo test --package tests --test proxy normal::store
cargo test --package tests --test proxy normal::region
cargo test --package tests --test proxy normal::config
cargo test --package tests --test proxy normal::write
cargo test --package tests --test proxy normal::ingest
cargo test --package tests --test proxy normal::snapshot
cargo test --package tests --test proxy normal::restart
# tests based on new-mock-engine-store, for some tests not available for new proxy
cargo test --package tests --test proxy proxy
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion components/proxy_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ async-stream = "0.2"
backup = { path = "../backup", default-features = false }
backup-stream = { path = "../backup-stream", default-features = false }
causal_ts = { path = "../causal_ts" }
cdc = { path = "../cdc", default-features = false }
chrono = "0.4"
clap = "2.32"
collections = { path = "../collections" }
Expand Down
5 changes: 1 addition & 4 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::{
};

use api_version::{dispatch_api_version, KvFormat};
use backup_stream::{config::BackupStreamConfigManager, observer::BackupStreamObserver};
use cdc::{CdcConfigManager, MemoryQuota};
use concurrency_manager::ConcurrencyManager;
use encryption_export::{data_key_manager_from_config, DataKeyManager};
use engine_rocks::{
Expand All @@ -40,9 +38,8 @@ use futures::executor::block_on;
use grpcio::{EnvBuilder, Environment};
use grpcio_health::HealthService;
use kvproto::{
brpb::create_backup, cdcpb::create_change_data, deadlock::create_deadlock,
debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst,
kvrpcpb::ApiVersion, resource_usage_agent::create_resource_metering_pub_sub,
kvrpcpb::ApiVersion,
};
use pd_client::{PdClient, RpcClient};
use raft_log_engine::RaftLogEngine;
Expand Down
29 changes: 19 additions & 10 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@ impl RegionChangeObserver for TiFlashObserver {
_: StateRole,
) {
if e == RegionChangeEvent::Destroy {
info!(
"observe destroy";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
);
self.engine_store_server_helper
.handle_destroy(ob_ctx.region().get_id());
}
Expand Down Expand Up @@ -632,7 +637,12 @@ impl ApplySnapshotObserver for TiFlashObserver {
snap_key: &crate::store::SnapKey,
snap: Option<&crate::store::Snapshot>,
) {
info!("pre apply snapshot"; "peer_id" => peer_id, "region_id" => ob_ctx.region().get_id());
info!("pre apply snapshot";
"peer_id" => peer_id,
"region_id" => ob_ctx.region().get_id(),
"snap_key" => ?snap_key,
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
fail::fail_point!("on_ob_pre_handle_snapshot", |_| {});

let snap = match snap {
Expand Down Expand Up @@ -691,7 +701,8 @@ impl ApplySnapshotObserver for TiFlashObserver {
) {
fail::fail_point!("on_ob_post_apply_snapshot", |_| {});
info!("post apply snapshot";
"peer_id" => ?snap_key,
"peer_id" => ?peer_id,
"snap_key" => ?snap_key,
"region" => ?ob_ctx.region(),
);
let snap = match snap {
Expand All @@ -708,11 +719,10 @@ impl ApplySnapshotObserver for TiFlashObserver {
let neer_retry = match t.recv.recv() {
Ok(snap_ptr) => {
info!("get prehandled snapshot success";
"peer_id" => ?snap_key,
"region" => ?ob_ctx.region(),
"pending" => self.engine
.pending_applies_count.load(Ordering::SeqCst),
);
"peer_id" => ?peer_id,
"region" => ?ob_ctx.region(),
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
self.engine_store_server_helper
.apply_pre_handled_snapshot(snap_ptr.0);
false
Expand All @@ -729,10 +739,9 @@ impl ApplySnapshotObserver for TiFlashObserver {
.pending_applies_count
.fetch_sub(1, Ordering::SeqCst);
info!("apply snapshot finished";
"peer_id" => ?snap_key,
"peer_id" => ?peer_id,
"region" => ?ob_ctx.region(),
"pending" => self.engine
.pending_applies_count.load(Ordering::SeqCst),
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
neer_retry
}
Expand Down
11 changes: 10 additions & 1 deletion new-mock-engine-store/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::ops::{Deref, DerefMut};
use std::{
ops::{Deref, DerefMut},
sync::{atomic::AtomicBool, Arc},
};

use tikv::config::TiKvConfig;

use crate::ProxyConfig;

#[derive(Clone, Default)]
pub struct MockConfig {
pub panic_when_flush_no_found: Arc<AtomicBool>,
}

#[derive(Clone)]
pub struct Config {
pub tikv: TiKvConfig,
pub prefer_mem: bool,
pub proxy_cfg: ProxyConfig,
/// Whether our mock server should compat new proxy.
pub proxy_compat: bool,
pub mock_cfg: MockConfig,
}

impl Deref for Config {
Expand Down
52 changes: 39 additions & 13 deletions new-mock-engine-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
pin::Pin,
sync::Mutex,
sync::{atomic::Ordering, Mutex},
time::Duration,
};

Expand All @@ -26,7 +26,7 @@ use protobuf::Message;
use raftstore::{engine_store_ffi, engine_store_ffi::RawCppPtr};
use tikv_util::{debug, error, info, warn};

use crate::mock_cluster::TiFlashEngine;
use crate::{config::MockConfig, mock_cluster::TiFlashEngine};

pub mod config;
pub mod mock_cluster;
Expand Down Expand Up @@ -73,6 +73,7 @@ pub struct EngineStoreServer {
pub engines: Option<Engines<TiFlashEngine, engine_rocks::RocksEngine>>,
pub kvstore: HashMap<RegionId, Box<Region>>,
pub proxy_compat: bool,
pub mock_cfg: MockConfig,
}

impl EngineStoreServer {
Expand All @@ -85,6 +86,7 @@ impl EngineStoreServer {
engines,
kvstore: Default::default(),
proxy_compat: false,
mock_cfg: MockConfig::default(),
}
}

Expand Down Expand Up @@ -253,7 +255,7 @@ impl EngineStoreServerWrap {
"node_id"=>node_id,
);
panic!("observe obsolete admin index");
return ffi_interfaces::EngineStoreApplyRes::None;
// return ffi_interfaces::EngineStoreApplyRes::None;
}
match req.get_cmd_type() {
AdminCmdType::ChangePeer | AdminCmdType::ChangePeerV2 => {
Expand Down Expand Up @@ -328,7 +330,7 @@ impl EngineStoreServerWrap {
AdminCmdType::PrepareMerge => {
let tikv_region = resp.get_split().get_left();

let target = req.prepare_merge.as_ref().unwrap().target.as_ref();
let _target = req.prepare_merge.as_ref().unwrap().target.as_ref();
let region_meta = &mut (engine_store_server
.kvstore
.get_mut(&region_id)
Expand Down Expand Up @@ -495,7 +497,7 @@ impl EngineStoreServerWrap {
let region_id = header.region_id;
let server = &mut (*self.engine_store_server);
let node_id = (*self.engine_store_server).id;
let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv;
let _kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv;
let proxy_compat = server.proxy_compat;
let mut do_handle_write_raft_cmd = move |region: &mut Box<Region>| {
if region.apply_state.get_applied_index() >= header.index {
Expand All @@ -505,7 +507,7 @@ impl EngineStoreServerWrap {
"node_id"=>node_id,
);
panic!("observe obsolete write index");
return ffi_interfaces::EngineStoreApplyRes::None;
// return ffi_interfaces::EngineStoreApplyRes::None;
}
for i in 0..cmds.len {
let key = &*cmds.keys.add(i as _);
Expand All @@ -523,7 +525,7 @@ impl EngineStoreServerWrap {
"node_id" => server.id,
"header" => ?header,
);
let data = &mut region.data[cf_index as usize];
let _data = &mut region.data[cf_index as usize];
match tp {
engine_store_ffi::WriteCmdType::Put => {
write_kv_in_mem(region, cf_index as usize, k, v);
Expand Down Expand Up @@ -674,12 +676,35 @@ unsafe extern "C" fn ffi_try_flush_data(
arg1: *mut ffi_interfaces::EngineStoreServerWrap,
region_id: u64,
_try_until_succeed: u8,
_index: u64,
_term: u64,
index: u64,
term: u64,
) -> u8 {
let store = into_engine_store_server_wrap(arg1);
let kvstore = &mut (*store.engine_store_server).kvstore;
let region = kvstore.get_mut(&region_id).unwrap();
// If we can't find region here, we return true so proxy can trigger a CompactLog.
// The triggered CompactLog will be handled by `handleUselessAdminRaftCmd`,
// and result in a `EngineStoreApplyRes::NotFound`.
// Proxy will print this message and continue: `region not found in engine-store, maybe have exec `RemoveNode` first`.
let region = match kvstore.get_mut(&region_id) {
Some(r) => r,
None => {
if (*store.engine_store_server)
.mock_cfg
.panic_when_flush_no_found
.load(Ordering::SeqCst)
{
panic!(
"ffi_try_flush_data no found region {} [index {} term {}], store {}",
region_id,
index,
term,
(*store.engine_store_server).id
);
} else {
return 1;
}
}
};
fail::fail_point!("try_flush_data", |e| {
let b = e.unwrap().parse::<u8>().unwrap();
if b == 1 {
Expand Down Expand Up @@ -819,6 +844,7 @@ unsafe extern "C" fn ffi_handle_destroy(
arg2: u64,
) {
let store = into_engine_store_server_wrap(arg1);
debug!("ffi_handle_destroy {}", arg2);
(*store.engine_store_server).kvstore.remove(&arg2);
}

Expand Down Expand Up @@ -902,7 +928,7 @@ unsafe extern "C" fn ffi_pre_handle_snapshot(
) -> ffi_interfaces::RawCppPtr {
let store = into_engine_store_server_wrap(arg1);
let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap());
let kvstore = &mut (*store.engine_store_server).kvstore;
let _kvstore = &mut (*store.engine_store_server).kvstore;
let node_id = (*store.engine_store_server).id;

let mut region_meta = kvproto::metapb::Region::default();
Expand Down Expand Up @@ -965,7 +991,7 @@ pub fn cf_to_name(cf: ffi_interfaces::ColumnFamilyType) -> &'static str {
unsafe extern "C" fn ffi_apply_pre_handled_snapshot(
arg1: *mut ffi_interfaces::EngineStoreServerWrap,
arg2: ffi_interfaces::RawVoidPtr,
arg3: ffi_interfaces::RawCppPtrType,
_arg3: ffi_interfaces::RawCppPtrType,
) {
let store = into_engine_store_server_wrap(arg1);
let region_meta = &mut *(arg2 as *mut PrehandledSnapshot);
Expand Down Expand Up @@ -1052,7 +1078,7 @@ unsafe extern "C" fn ffi_handle_ingest_sst(
region.apply_state.mut_truncated_state().set_term(term);
}

fail::fail_point!("on_handle_ingest_sst_return", |e| {
fail::fail_point!("on_handle_ingest_sst_return", |_e| {
ffi_interfaces::EngineStoreApplyRes::None
});
write_to_db_data(
Expand Down
15 changes: 9 additions & 6 deletions new-mock-engine-store/src/mock_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use kvproto::{
};
use lazy_static::lazy_static;
use pd_client::PdClient;
use protobuf::Message;
pub use proxy_server::config::ProxyConfig;
use proxy_server::fatal;
// mock cluster
Expand Down Expand Up @@ -76,7 +75,7 @@ use tikv_util::{
pub use crate::config::Config;
use crate::{
gen_engine_store_server_helper, transport_simulate::Filter, EngineStoreServer,
EngineStoreServerWrap,
EngineStoreServerWrap, MockConfig,
};

pub struct FFIHelperSet {
Expand Down Expand Up @@ -136,6 +135,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
prefer_mem: true,
proxy_cfg,
proxy_compat: false,
mock_cfg: Default::default(),
},
leaders: HashMap::default(),
count,
Expand All @@ -161,6 +161,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
node_cfg: TiKvConfig,
cluster_id: isize,
proxy_compat: bool,
mock_cfg: MockConfig,
) -> (FFIHelperSet, TiKvConfig) {
// We must allocate on heap to avoid move.
let proxy = Box::new(engine_store_ffi::RaftStoreProxy {
Expand All @@ -179,6 +180,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
let mut proxy_helper = Box::new(engine_store_ffi::RaftStoreProxyFFIHelper::new(&proxy));
let mut engine_store_server = Box::new(EngineStoreServer::new(id, Some(engines)));
engine_store_server.proxy_compat = proxy_compat;
engine_store_server.mock_cfg = mock_cfg;
let engine_store_server_wrap = Box::new(EngineStoreServerWrap::new(
&mut *engine_store_server,
Some(&mut *proxy_helper),
Expand Down Expand Up @@ -225,6 +227,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
self.cfg.tikv.clone(),
self as *const Cluster<T> as isize,
self.cfg.proxy_compat,
self.cfg.mock_cfg.clone(),
)
}

Expand Down Expand Up @@ -260,7 +263,7 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
key_manager: &Option<Arc<DataKeyManager>>,
router: &Option<RaftRouter<TiFlashEngine, engine_rocks::RocksEngine>>,
) {
let (mut ffi_helper_set, mut node_cfg) =
let (mut ffi_helper_set, node_cfg) =
self.make_ffi_helper_set(0, engines, key_manager, router);

// We can not use moved or cloned engines any more.
Expand Down Expand Up @@ -329,8 +332,8 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
let node_ids: Vec<u64> = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids {
debug!("recover node"; "node_id" => node_id);
let engines = self.engines.get_mut(&node_id).unwrap().clone();
let key_mgr = self.key_managers_map[&node_id].clone();
let _engines = self.engines.get_mut(&node_id).unwrap().clone();
let _key_mgr = self.key_managers_map[&node_id].clone();
// Always at the front of the vector.
self.associate_ffi_helper_set(Some(0), node_id);
// Like TiKVServer::init
Expand Down Expand Up @@ -431,7 +434,7 @@ pub fn init_global_ffi_helper_set() {
pub fn create_tiflash_test_engine(
// ref init_tiflash_engines and create_test_engine
// TODO: pass it in for all cases.
router: Option<RaftRouter<TiFlashEngine, engine_rocks::RocksEngine>>,
_router: Option<RaftRouter<TiFlashEngine, engine_rocks::RocksEngine>>,
limiter: Option<Arc<IORateLimiter>>,
cfg: &Config,
) -> (
Expand Down
Loading

0 comments on commit f5da45e

Please sign in to comment.