From 9aad9b4daa754c0ccfee2102ca382acfbc1a9386 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 27 Feb 2023 20:51:25 +0800 Subject: [PATCH 1/7] add MixModeEngine Signed-off-by: CalvinNeo --- engine_store_ffi/src/core/forwarder.rs | 2 +- engine_store_ffi/src/lib.rs | 2 +- engine_store_ffi/src/observer.rs | 2 +- engine_tiflash/src/engine.rs | 22 +++++++++++-------- engine_tiflash/src/ps_engine/ps_db_vector.rs | 2 ++ .../src/ps_engine/ps_write_batch.rs | 1 + proxy_server/src/run.rs | 2 +- 7 files changed, 20 insertions(+), 13 deletions(-) diff --git a/engine_store_ffi/src/core/forwarder.rs b/engine_store_ffi/src/core/forwarder.rs index 75c6197112a..26ac34329d1 100644 --- a/engine_store_ffi/src/core/forwarder.rs +++ b/engine_store_ffi/src/core/forwarder.rs @@ -77,7 +77,7 @@ impl ProxyForwarder { #[allow(clippy::too_many_arguments)] pub fn new( store_id: u64, - engine: engine_tiflash::RocksEngine, + engine: engine_tiflash::MixedModeEngine, raft_engine: ER, sst_importer: Arc, trans: T, diff --git a/engine_store_ffi/src/lib.rs b/engine_store_ffi/src/lib.rs index f2d73cb19b6..620a09ac962 100644 --- a/engine_store_ffi/src/lib.rs +++ b/engine_store_ffi/src/lib.rs @@ -13,6 +13,6 @@ pub mod observer; // TODO We may integrate engine_tiflash if possible. pub use engine_tiflash::EngineStoreConfig; -pub type TiFlashEngine = engine_tiflash::RocksEngine; +pub type TiFlashEngine = engine_tiflash::MixedModeEngine; pub(crate) use proxy_ffi::fatal; diff --git a/engine_store_ffi/src/observer.rs b/engine_store_ffi/src/observer.rs index 0a7f327feb2..3c0dde48b5e 100644 --- a/engine_store_ffi/src/observer.rs +++ b/engine_store_ffi/src/observer.rs @@ -35,7 +35,7 @@ impl TiFlashObserver { #[allow(clippy::too_many_arguments)] pub fn new( store_id: u64, - engine: engine_tiflash::RocksEngine, + engine: engine_tiflash::MixedModeEngine, raft_engine: ER, sst_importer: Arc, trans: T, diff --git a/engine_tiflash/src/engine.rs b/engine_tiflash/src/engine.rs index 99eb87df2fc..cbb91cb0ef7 100644 --- a/engine_tiflash/src/engine.rs +++ b/engine_tiflash/src/engine.rs @@ -4,12 +4,13 @@ #![allow(dead_code)] #![allow(unused_variables)] use std::{ - fmt::Debug, fs, path::Path, sync::{atomic::AtomicIsize, Arc}, }; +pub(crate) use details::RocksEngine; +pub use details::RocksEngine as MixedModeEngine; use engine_rocks::RocksSnapshot; use engine_traits::{Checkpointable, Checkpointer, Error, KvEngine, Result}; use rocksdb::DB; @@ -18,14 +19,17 @@ use crate::{ proxy_utils::{engine_ext::*, EngineStoreHub}, ProxyEngineExt, }; - -#[derive(Clone, Debug)] -pub struct RocksEngine { - // Must ensure rocks is the first field, for RocksEngine::from_ref. - // We must own a engine_rocks::RocksEngine, since TiKV has not decouple from engine_rocks yet. - pub rocks: engine_rocks::RocksEngine, - pub proxy_ext: ProxyEngineExt, - pub ps_ext: Option, +mod details { + use crate::{PageStorageExt, ProxyEngineExt}; + #[derive(Clone, Debug)] + pub struct RocksEngine { + // Must ensure rocks is the first field, for RocksEngine::from_ref. + // We must own a engine_rocks::RocksEngine, since TiKV has not decouple from engine_rocks + // yet. + pub rocks: engine_rocks::RocksEngine, + pub proxy_ext: ProxyEngineExt, + pub ps_ext: Option, + } } impl RocksEngine { diff --git a/engine_tiflash/src/ps_engine/ps_db_vector.rs b/engine_tiflash/src/ps_engine/ps_db_vector.rs index fe9abfd21fe..32900bc9c65 100644 --- a/engine_tiflash/src/ps_engine/ps_db_vector.rs +++ b/engine_tiflash/src/ps_engine/ps_db_vector.rs @@ -7,6 +7,8 @@ use std::{ use engine_traits::DbVector; +/// Used when impl Peekable. +/// Since we can't hold a rocksdb::DBVector. pub struct PsDbVector(Vec); impl PsDbVector { diff --git a/engine_tiflash/src/ps_engine/ps_write_batch.rs b/engine_tiflash/src/ps_engine/ps_write_batch.rs index 94c9ecc3d98..98ec8dca9b8 100644 --- a/engine_tiflash/src/ps_engine/ps_write_batch.rs +++ b/engine_tiflash/src/ps_engine/ps_write_batch.rs @@ -37,6 +37,7 @@ impl WriteBatchExt for RocksEngine { } } +/// Used when impl WriteBatchExt. /// `RocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could /// help to deal with these small WriteBatch when it is calling diff --git a/proxy_server/src/run.rs b/proxy_server/src/run.rs index 6de038035d0..00a37cfa286 100644 --- a/proxy_server/src/run.rs +++ b/proxy_server/src/run.rs @@ -459,7 +459,7 @@ impl TiKvServer { let engine_store_hub = Arc::new(engine_store_ffi::engine::TiFlashEngineStoreHub { engine_store_server_helper: helper, }); - // engine_tiflash::RocksEngine has engine_rocks::RocksEngine inside + // engine_tiflash::MixedModeEngine has engine_rocks::RocksEngine inside let mut kv_engine = TiFlashEngine::from_rocks(kv_engine); let proxy_config_set = Arc::new(engine_tiflash::ProxyEngineConfigSet { engine_store: self.proxy_config.engine_store.clone(), From af581d732b26fb0a6db7a0e692b0da5f414d5d46 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 28 Feb 2023 21:08:40 +0800 Subject: [PATCH 2/7] impl Peekable and SyncMutable Signed-off-by: CalvinNeo --- engine_tiflash/src/engine.rs | 25 ++- engine_tiflash/src/lib.rs | 5 +- engine_tiflash/src/mixed_engine/elementary.rs | 23 +++ engine_tiflash/src/mixed_engine/mod.rs | 120 ++++++++++++++ engine_tiflash/src/ps_engine/engine.rs | 155 +++++++----------- engine_tiflash/src/ps_engine/mod.rs | 2 - engine_tiflash/src/ps_engine/ps_db_vector.rs | 40 ----- .../src/{ => rocks_engine}/db_vector.rs | 0 engine_tiflash/src/rocks_engine/engine.rs | 104 ++++++------ engine_tiflash/src/rocks_engine/mod.rs | 3 + 10 files changed, 282 insertions(+), 195 deletions(-) create mode 100644 engine_tiflash/src/mixed_engine/elementary.rs create mode 100644 engine_tiflash/src/mixed_engine/mod.rs delete mode 100644 engine_tiflash/src/ps_engine/ps_db_vector.rs rename engine_tiflash/src/{ => rocks_engine}/db_vector.rs (100%) diff --git a/engine_tiflash/src/engine.rs b/engine_tiflash/src/engine.rs index cbb91cb0ef7..16331e7c0ea 100644 --- a/engine_tiflash/src/engine.rs +++ b/engine_tiflash/src/engine.rs @@ -19,8 +19,11 @@ use crate::{ proxy_utils::{engine_ext::*, EngineStoreHub}, ProxyEngineExt, }; + mod details { - use crate::{PageStorageExt, ProxyEngineExt}; + use std::sync::Arc; + + use crate::{mixed_engine::elementary::ElementaryEngine, PageStorageExt, ProxyEngineExt}; #[derive(Clone, Debug)] pub struct RocksEngine { // Must ensure rocks is the first field, for RocksEngine::from_ref. @@ -29,6 +32,7 @@ mod details { pub rocks: engine_rocks::RocksEngine, pub proxy_ext: ProxyEngineExt, pub ps_ext: Option, + pub element_engine: Option>, } } @@ -39,6 +43,7 @@ impl RocksEngine { rocks: engine_rocks::RocksEngine::new(db), proxy_ext: ProxyEngineExt::default(), ps_ext: None, + element_engine: None::<_>, } } @@ -61,9 +66,22 @@ impl RocksEngine { config_set, cached_region_info_manager: Some(Arc::new(crate::CachedRegionInfoManager::new())), }; - self.ps_ext = Some(PageStorageExt { + let ps_ext = PageStorageExt { engine_store_server_helper, - }); + }; + #[cfg(feature = "enable-pagestorage")] + { + self.element_engine = Some(Arc::new(crate::ps_engine::PSElementEngine { + ps_ext: ps_ext.clone(), + })) + } + #[cfg(not(feature = "enable-pagestorage"))] + { + self.element_engine = Some(Arc::new(crate::rocks_engine::RocksElementEngine { + rocks: self.rocks.clone(), + })) + } + self.ps_ext = Some(ps_ext); } pub fn from_rocks(rocks: engine_rocks::RocksEngine) -> Self { @@ -71,6 +89,7 @@ impl RocksEngine { rocks, proxy_ext: ProxyEngineExt::default(), ps_ext: None, + element_engine: None::<_>, } } diff --git a/engine_tiflash/src/lib.rs b/engine_tiflash/src/lib.rs index aabe96a6c77..2c4fdac53b2 100644 --- a/engine_tiflash/src/lib.rs +++ b/engine_tiflash/src/lib.rs @@ -33,8 +33,6 @@ mod compact; pub use crate::compact::*; mod db_options; pub use crate::db_options::*; -mod db_vector; -pub use crate::db_vector::*; mod engine; pub use crate::engine::*; mod import; @@ -55,13 +53,14 @@ pub use crate::status::*; mod table_properties; pub use crate::table_properties::*; +mod mixed_engine; mod ps_engine; mod rocks_engine; #[cfg(feature = "enable-pagestorage")] pub use crate::ps_engine::ps_write_batch::*; -pub use crate::ps_engine::PSLogEngine; #[cfg(not(feature = "enable-pagestorage"))] pub use crate::rocks_engine::write_batch::*; +pub use crate::{ps_engine::PSLogEngine, rocks_engine::db_vector}; pub mod mvcc_properties; pub use crate::mvcc_properties::*; diff --git a/engine_tiflash/src/mixed_engine/elementary.rs b/engine_tiflash/src/mixed_engine/elementary.rs new file mode 100644 index 00000000000..9ba9014a1ca --- /dev/null +++ b/engine_tiflash/src/mixed_engine/elementary.rs @@ -0,0 +1,23 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_traits::{ReadOptions, Result}; + +use super::MixedDbVector; +pub trait ElementaryEngine: std::fmt::Debug { + fn put(&self, key: &[u8], value: &[u8]) -> Result<()>; + + fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()>; + + fn delete(&self, key: &[u8]) -> Result<()>; + + fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()>; + + fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result>; + + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + cf: &str, + key: &[u8], + ) -> Result>; +} diff --git a/engine_tiflash/src/mixed_engine/mod.rs b/engine_tiflash/src/mixed_engine/mod.rs new file mode 100644 index 00000000000..3d55cf35e63 --- /dev/null +++ b/engine_tiflash/src/mixed_engine/mod.rs @@ -0,0 +1,120 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +pub mod elementary; + +use std::{ + fmt::{self, Debug, Formatter}, + ops::Deref, +}; + +use engine_rocks::RocksDbVector; +use engine_traits::{DbVector, Peekable, ReadOptions, Result, SyncMutable}; +use tikv_util::Either; + +use crate::RocksEngine; + +pub type Vector = Either>; + +pub struct MixedDbVector(Vector); + +impl MixedDbVector { + pub fn from_raw(raw: Vector) -> MixedDbVector { + MixedDbVector(raw) + } + + pub fn from_raw_rocks(raw: RocksDbVector) -> MixedDbVector { + MixedDbVector(Vector::Left(raw)) + } + + pub fn from_raw_ps(raw: Vec) -> MixedDbVector { + MixedDbVector(Vector::Right(raw)) + } +} + +impl DbVector for MixedDbVector {} + +impl Deref for MixedDbVector { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + match self.0.as_ref() { + Either::Left(r) => r.deref(), + Either::Right(p) => p.deref(), + } + } +} + +impl Debug for MixedDbVector { + fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { + write!(formatter, "{:?}", &**self) + } +} + +impl<'a> PartialEq<&'a [u8]> for MixedDbVector { + fn eq(&self, rhs: &&[u8]) -> bool { + **rhs == **self + } +} + +impl Peekable for RocksEngine { + type DbVector = MixedDbVector; + + fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { + self.element_engine + .as_ref() + .unwrap() + .get_value_opt(opts, key) + } + + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + cf: &str, + key: &[u8], + ) -> Result> { + self.element_engine + .as_ref() + .unwrap() + .get_value_cf_opt(opts, cf, key) + } +} + +impl SyncMutable for RocksEngine { + fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + if self.do_write(engine_traits::CF_DEFAULT, key) { + return self.element_engine.as_ref().unwrap().put(key, value); + } + Ok(()) + } + + fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { + if self.do_write(cf, key) { + return self.element_engine.as_ref().unwrap().put_cf(cf, key, value); + } + Ok(()) + } + + fn delete(&self, key: &[u8]) -> Result<()> { + if self.do_write(engine_traits::CF_DEFAULT, key) { + return self.element_engine.as_ref().unwrap().delete(key); + } + Ok(()) + } + + fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { + if self.do_write(cf, key) { + return self.element_engine.as_ref().unwrap().delete_cf(cf, key); + } + Ok(()) + } + + fn delete_range(&self, _begin_key: &[u8], _end_key: &[u8]) -> Result<()> { + // do nothing + Ok(()) + } + + fn delete_range_cf(&self, _cf: &str, _begin_key: &[u8], _end_key: &[u8]) -> Result<()> { + // do nothing + Ok(()) + } +} diff --git a/engine_tiflash/src/ps_engine/engine.rs b/engine_tiflash/src/ps_engine/engine.rs index 855f7cd511b..b23ce1446e9 100644 --- a/engine_tiflash/src/ps_engine/engine.rs +++ b/engine_tiflash/src/ps_engine/engine.rs @@ -3,7 +3,69 @@ use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result, SyncMutable}; -use crate::RocksEngine; +use crate::{ + mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, + PageStorageExt, RocksEngine, +}; + +#[derive(Clone, Debug)] +pub struct PSElementEngine { + pub ps_ext: PageStorageExt, +} + +unsafe impl Send for PSElementEngine {} +unsafe impl Sync for PSElementEngine {} + +impl ElementaryEngine for PSElementEngine { + fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let ps_wb = self.ps_ext.create_write_batch(); + self.ps_ext + .write_batch_put_page(ps_wb.ptr, add_prefix(key).as_slice(), value); + self.ps_ext.consume_write_batch(ps_wb.ptr); + Ok(()) + } + + fn put_cf(&self, _cf: &str, key: &[u8], value: &[u8]) -> Result<()> { + let ps_wb = self.ps_ext.create_write_batch(); + self.ps_ext + .write_batch_put_page(ps_wb.ptr, add_prefix(key).as_slice(), value); + self.ps_ext.consume_write_batch(ps_wb.ptr); + Ok(()) + } + + fn delete(&self, key: &[u8]) -> Result<()> { + let ps_wb = self.ps_ext.create_write_batch(); + self.ps_ext + .write_batch_del_page(ps_wb.ptr, add_prefix(key).as_slice()); + self.ps_ext.consume_write_batch(ps_wb.ptr); + Ok(()) + } + + fn delete_cf(&self, _cf: &str, key: &[u8]) -> Result<()> { + let ps_wb = self.ps_ext.create_write_batch(); + self.ps_ext + .write_batch_del_page(ps_wb.ptr, add_prefix(key).as_slice()); + self.ps_ext.consume_write_batch(ps_wb.ptr); + Ok(()) + } + + fn get_value_opt(&self, _opts: &ReadOptions, key: &[u8]) -> Result> { + let result = self.ps_ext.read_page(add_prefix(key).as_slice()); + match result { + None => Ok(None), + Some(v) => Ok(Some(MixedDbVector::from_raw_ps(v))), + } + } + + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + _cf: &str, + key: &[u8], + ) -> Result> { + self.get_value_opt(opts, key) + } +} // Some data may be migrated from kv engine to raft engine in the future, // so kv engine and raft engine may write and delete the same key in the code @@ -44,94 +106,3 @@ impl Iterable for RocksEngine { self.rocks.iterator_opt(cf, opts) } } - -impl Peekable for RocksEngine { - type DbVector = crate::ps_engine::PsDbVector; - - fn get_value_opt( - &self, - _opts: &ReadOptions, - key: &[u8], - ) -> Result> { - let result = self - .ps_ext - .as_ref() - .unwrap() - .read_page(add_prefix(key).as_slice()); - return match result { - None => Ok(None), - Some(v) => Ok(Some(crate::ps_engine::PsDbVector::from_raw(v))), - }; - } - - fn get_value_cf_opt( - &self, - opts: &ReadOptions, - _cf: &str, - key: &[u8], - ) -> Result> { - self.get_value_opt(opts, key) - } -} - -impl SyncMutable for RocksEngine { - fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - if self.do_write(engine_traits::CF_DEFAULT, key) { - let ps_wb = self.ps_ext.as_ref().unwrap().create_write_batch(); - self.ps_ext.as_ref().unwrap().write_batch_put_page( - ps_wb.ptr, - add_prefix(key).as_slice(), - value, - ); - self.ps_ext.as_ref().unwrap().consume_write_batch(ps_wb.ptr); - } - Ok(()) - } - - fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { - if self.do_write(cf, key) { - let ps_wb = self.ps_ext.as_ref().unwrap().create_write_batch(); - self.ps_ext.as_ref().unwrap().write_batch_put_page( - ps_wb.ptr, - add_prefix(key).as_slice(), - value, - ); - self.ps_ext.as_ref().unwrap().consume_write_batch(ps_wb.ptr); - } - Ok(()) - } - - fn delete(&self, key: &[u8]) -> Result<()> { - if self.do_write(engine_traits::CF_DEFAULT, key) { - let ps_wb = self.ps_ext.as_ref().unwrap().create_write_batch(); - self.ps_ext - .as_ref() - .unwrap() - .write_batch_del_page(ps_wb.ptr, add_prefix(key).as_slice()); - self.ps_ext.as_ref().unwrap().consume_write_batch(ps_wb.ptr); - } - Ok(()) - } - - fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { - if self.do_write(cf, key) { - let ps_wb = self.ps_ext.as_ref().unwrap().create_write_batch(); - self.ps_ext - .as_ref() - .unwrap() - .write_batch_del_page(ps_wb.ptr, add_prefix(key).as_slice()); - self.ps_ext.as_ref().unwrap().consume_write_batch(ps_wb.ptr); - } - Ok(()) - } - - fn delete_range(&self, _begin_key: &[u8], _end_key: &[u8]) -> Result<()> { - // do nothing - Ok(()) - } - - fn delete_range_cf(&self, _cf: &str, _begin_key: &[u8], _end_key: &[u8]) -> Result<()> { - // do nothing - Ok(()) - } -} diff --git a/engine_tiflash/src/ps_engine/mod.rs b/engine_tiflash/src/ps_engine/mod.rs index 45ec2b85db7..3bb7060cc56 100644 --- a/engine_tiflash/src/ps_engine/mod.rs +++ b/engine_tiflash/src/ps_engine/mod.rs @@ -2,14 +2,12 @@ #[cfg(feature = "enable-pagestorage")] mod engine; -mod ps_db_vector; mod ps_log_engine; #[cfg(feature = "enable-pagestorage")] pub(crate) mod ps_write_batch; #[cfg(feature = "enable-pagestorage")] pub use engine::*; -pub use ps_db_vector::*; pub use ps_log_engine::*; #[cfg(feature = "enable-pagestorage")] pub use ps_write_batch::*; diff --git a/engine_tiflash/src/ps_engine/ps_db_vector.rs b/engine_tiflash/src/ps_engine/ps_db_vector.rs deleted file mode 100644 index 32900bc9c65..00000000000 --- a/engine_tiflash/src/ps_engine/ps_db_vector.rs +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. - -use std::{ - fmt::{self, Debug, Formatter}, - ops::Deref, -}; - -use engine_traits::DbVector; - -/// Used when impl Peekable. -/// Since we can't hold a rocksdb::DBVector. -pub struct PsDbVector(Vec); - -impl PsDbVector { - pub fn from_raw(raw: Vec) -> PsDbVector { - PsDbVector(raw) - } -} - -impl DbVector for PsDbVector {} - -impl Deref for PsDbVector { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - &self.0 - } -} - -impl Debug for PsDbVector { - fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { - write!(formatter, "{:?}", &**self) - } -} - -impl<'a> PartialEq<&'a [u8]> for PsDbVector { - fn eq(&self, rhs: &&[u8]) -> bool { - **rhs == **self - } -} diff --git a/engine_tiflash/src/db_vector.rs b/engine_tiflash/src/rocks_engine/db_vector.rs similarity index 100% rename from engine_tiflash/src/db_vector.rs rename to engine_tiflash/src/rocks_engine/db_vector.rs diff --git a/engine_tiflash/src/rocks_engine/engine.rs b/engine_tiflash/src/rocks_engine/engine.rs index dfbfb9f9ed8..c326d6cf4b5 100644 --- a/engine_tiflash/src/rocks_engine/engine.rs +++ b/engine_tiflash/src/rocks_engine/engine.rs @@ -2,81 +2,75 @@ #![allow(unused_variables)] -use engine_rocks::{RocksDbVector, RocksEngineIterator}; -use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result, SyncMutable}; +use engine_rocks::RocksEngineIterator; +use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result}; use rocksdb::Writable; -use crate::{r2e, util::get_cf_handle, RocksEngine}; +use crate::{ + mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, + r2e, + util::get_cf_handle, + RocksEngine, +}; -impl Iterable for RocksEngine { - type Iterator = RocksEngineIterator; - - fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { - self.rocks.iterator_opt(cf, opts) - } +#[derive(Clone, Debug)] +pub struct RocksElementEngine { + pub rocks: engine_rocks::RocksEngine, } -impl Peekable for RocksEngine { - type DbVector = RocksDbVector; - - fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { - self.rocks.get_value_opt(opts, key) - } +unsafe impl Send for RocksElementEngine {} +unsafe impl Sync for RocksElementEngine {} - fn get_value_cf_opt( - &self, - opts: &ReadOptions, - cf: &str, - key: &[u8], - ) -> Result> { - self.rocks.get_value_cf_opt(opts, cf, key) - } -} - -impl SyncMutable for RocksEngine { +impl ElementaryEngine for RocksElementEngine { fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { - if self.do_write(engine_traits::CF_DEFAULT, key) { - return self.rocks.get_sync_db().put(key, value).map_err(r2e); - } - Ok(()) + self.rocks.get_sync_db().put(key, value).map_err(r2e) } fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { - if self.do_write(cf, key) { - let db = self.rocks.get_sync_db(); - let handle = get_cf_handle(&db, cf)?; - return self - .rocks - .get_sync_db() - .put_cf(handle, key, value) - .map_err(r2e); - } - Ok(()) + let db = self.rocks.get_sync_db(); + let handle = get_cf_handle(&db, cf)?; + self.rocks + .get_sync_db() + .put_cf(handle, key, value) + .map_err(r2e) } fn delete(&self, key: &[u8]) -> Result<()> { - if self.do_write(engine_traits::CF_DEFAULT, key) { - return self.rocks.get_sync_db().delete(key).map_err(r2e); - } - Ok(()) + self.rocks.get_sync_db().delete(key).map_err(r2e) } fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<()> { - if self.do_write(cf, key) { - let db = self.rocks.get_sync_db(); - let handle = get_cf_handle(&db, cf)?; - return self.rocks.get_sync_db().delete_cf(handle, key).map_err(r2e); + let db = self.rocks.get_sync_db(); + let handle = get_cf_handle(&db, cf)?; + self.rocks.get_sync_db().delete_cf(handle, key).map_err(r2e) + } + + fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { + let o = self.rocks.get_value_opt(opts, key)?; + if let Some(r) = o { + return Ok(Some(MixedDbVector::from_raw_rocks(r))); } - Ok(()) + Ok(None) } - fn delete_range(&self, begin_key: &[u8], end_key: &[u8]) -> Result<()> { - // do nothing - Ok(()) + fn get_value_cf_opt( + &self, + opts: &ReadOptions, + cf: &str, + key: &[u8], + ) -> Result> { + let o = self.rocks.get_value_cf_opt(opts, cf, key)?; + if let Some(r) = o { + return Ok(Some(MixedDbVector::from_raw_rocks(r))); + } + Ok(None) } +} + +impl Iterable for RocksEngine { + type Iterator = RocksEngineIterator; - fn delete_range_cf(&self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> { - // do nothing - Ok(()) + fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { + self.rocks.iterator_opt(cf, opts) } } diff --git a/engine_tiflash/src/rocks_engine/mod.rs b/engine_tiflash/src/rocks_engine/mod.rs index 728fb247b9b..ee8559c6964 100644 --- a/engine_tiflash/src/rocks_engine/mod.rs +++ b/engine_tiflash/src/rocks_engine/mod.rs @@ -9,3 +9,6 @@ pub use engine::*; pub mod write_batch; #[cfg(not(feature = "enable-pagestorage"))] pub use write_batch::*; + +pub mod db_vector; +pub use db_vector::*; From c2c6faf926cb2c01d4aba5a9694274bc7930f99b Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 1 Mar 2023 15:17:24 +0800 Subject: [PATCH 3/7] trait Iterable Signed-off-by: CalvinNeo --- engine_tiflash/src/engine.rs | 1 + engine_tiflash/src/mixed_engine/elementary.rs | 14 ++++- engine_tiflash/src/mixed_engine/mod.rs | 29 +++++++++- engine_tiflash/src/proxy_utils/engine_ext.rs | 7 +-- engine_tiflash/src/ps_engine/engine.rs | 55 +++++++++---------- engine_tiflash/src/ps_engine/mod.rs | 2 - engine_tiflash/src/rocks_engine/engine.rs | 17 ++++-- engine_tiflash/src/rocks_engine/mod.rs | 2 - 8 files changed, 81 insertions(+), 46 deletions(-) diff --git a/engine_tiflash/src/engine.rs b/engine_tiflash/src/engine.rs index 16331e7c0ea..8c098e3a8d3 100644 --- a/engine_tiflash/src/engine.rs +++ b/engine_tiflash/src/engine.rs @@ -73,6 +73,7 @@ impl RocksEngine { { self.element_engine = Some(Arc::new(crate::ps_engine::PSElementEngine { ps_ext: ps_ext.clone(), + rocks: self.rocks.clone(), })) } #[cfg(not(feature = "enable-pagestorage"))] diff --git a/engine_tiflash/src/mixed_engine/elementary.rs b/engine_tiflash/src/mixed_engine/elementary.rs index 9ba9014a1ca..ea5284eeb2e 100644 --- a/engine_tiflash/src/mixed_engine/elementary.rs +++ b/engine_tiflash/src/mixed_engine/elementary.rs @@ -1,6 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use engine_traits::{ReadOptions, Result}; +use engine_rocks::RocksEngineIterator; +use engine_traits::{IterOptions, ReadOptions, Result}; use super::MixedDbVector; pub trait ElementaryEngine: std::fmt::Debug { @@ -20,4 +21,15 @@ pub trait ElementaryEngine: std::fmt::Debug { cf: &str, key: &[u8], ) -> Result>; + + fn scan( + &self, + cf: &str, + start_key: &[u8], + end_key: &[u8], + fill_cache: bool, + f: &mut dyn FnMut(&[u8], &[u8]) -> Result, + ) -> Result<()>; + + fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result; } diff --git a/engine_tiflash/src/mixed_engine/mod.rs b/engine_tiflash/src/mixed_engine/mod.rs index 3d55cf35e63..34db92819eb 100644 --- a/engine_tiflash/src/mixed_engine/mod.rs +++ b/engine_tiflash/src/mixed_engine/mod.rs @@ -7,8 +7,8 @@ use std::{ ops::Deref, }; -use engine_rocks::RocksDbVector; -use engine_traits::{DbVector, Peekable, ReadOptions, Result, SyncMutable}; +use engine_rocks::{RocksDbVector, RocksEngineIterator}; +use engine_traits::{DbVector, IterOptions, Iterable, Peekable, ReadOptions, Result, SyncMutable}; use tikv_util::Either; use crate::RocksEngine; @@ -118,3 +118,28 @@ impl SyncMutable for RocksEngine { Ok(()) } } + +impl Iterable for RocksEngine { + type Iterator = RocksEngineIterator; + fn scan( + &self, + cf: &str, + start_key: &[u8], + end_key: &[u8], + fill_cache: bool, + f: F, + ) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> Result, + { + let mut f = f; + self.element_engine + .as_ref() + .unwrap() + .scan(cf, start_key, end_key, fill_cache, &mut f) + } + + fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { + self.element_engine.as_ref().unwrap().iterator_opt(cf, opts) + } +} diff --git a/engine_tiflash/src/proxy_utils/engine_ext.rs b/engine_tiflash/src/proxy_utils/engine_ext.rs index c6c314f42b2..86c66aa8e25 100644 --- a/engine_tiflash/src/proxy_utils/engine_ext.rs +++ b/engine_tiflash/src/proxy_utils/engine_ext.rs @@ -1,9 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. - -#[cfg(feature = "enable-pagestorage")] -use proxy_ffi::interfaces_ffi::{PageAndCppStrWithView, RawCppPtr, RawVoidPtr}; use proxy_ffi::{ - gen_engine_store_server_helper, interfaces_ffi, interfaces_ffi::EngineStoreServerHelper, + gen_engine_store_server_helper, interfaces_ffi, + interfaces_ffi::{EngineStoreServerHelper, PageAndCppStrWithView, RawCppPtr, RawVoidPtr}, }; use crate::RocksEngine; @@ -22,7 +20,6 @@ pub struct PageStorageExt { pub engine_store_server_helper: isize, } -#[cfg(feature = "enable-pagestorage")] impl PageStorageExt { fn helper(&self) -> &'static EngineStoreServerHelper { gen_engine_store_server_helper(self.engine_store_server_helper) diff --git a/engine_tiflash/src/ps_engine/engine.rs b/engine_tiflash/src/ps_engine/engine.rs index b23ce1446e9..82b732a66b4 100644 --- a/engine_tiflash/src/ps_engine/engine.rs +++ b/engine_tiflash/src/ps_engine/engine.rs @@ -1,16 +1,17 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use engine_rocks::RocksEngineIterator; -use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result, SyncMutable}; +use engine_traits::{IterOptions, Iterable, ReadOptions, Result}; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, - PageStorageExt, RocksEngine, + PageStorageExt, }; #[derive(Clone, Debug)] pub struct PSElementEngine { pub ps_ext: PageStorageExt, + pub rocks: engine_rocks::RocksEngine, } unsafe impl Send for PSElementEngine {} @@ -65,44 +66,40 @@ impl ElementaryEngine for PSElementEngine { ) -> Result> { self.get_value_opt(opts, key) } -} - -// Some data may be migrated from kv engine to raft engine in the future, -// so kv engine and raft engine may write and delete the same key in the code -// base. To distinguish data managed by kv engine and raft engine, we prepend an -// `0x02` to the key written by kv engine. -// So kv engine won't scan any key from raft engine, and vice versa. -pub fn add_prefix(key: &[u8]) -> Vec { - let mut v = Vec::with_capacity(key.len() + 1); - v.push(0x02); - v.extend_from_slice(key); - v -} -impl Iterable for RocksEngine { - type Iterator = RocksEngineIterator; - - fn scan( + fn scan( &self, _cf: &str, start_key: &[u8], end_key: &[u8], _fill_cache: bool, - f: F, - ) -> Result<()> - where - F: FnMut(&[u8], &[u8]) -> Result, - { - let mut f = f; - self.ps_ext.as_ref().unwrap().scan_page( + f: &mut dyn FnMut(&[u8], &[u8]) -> Result, + ) -> Result<()> { + self.ps_ext.scan_page( add_prefix(start_key).as_slice(), add_prefix(end_key).as_slice(), - &mut f, + f, ); Ok(()) } - fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { - self.rocks.iterator_opt(cf, opts) + #[allow(unused_variables)] + #[allow(unreachable_code)] + fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { + let r = self.rocks.iterator_opt(cf, opts); + panic!("iterator_opt should not be called in PS engine"); + r } } + +// Some data may be migrated from kv engine to raft engine in the future, +// so kv engine and raft engine may write and delete the same key in the code +// base. To distinguish data managed by kv engine and raft engine, we prepend an +// `0x02` to the key written by kv engine. +// So kv engine won't scan any key from raft engine, and vice versa. +pub fn add_prefix(key: &[u8]) -> Vec { + let mut v = Vec::with_capacity(key.len() + 1); + v.push(0x02); + v.extend_from_slice(key); + v +} diff --git a/engine_tiflash/src/ps_engine/mod.rs b/engine_tiflash/src/ps_engine/mod.rs index 3bb7060cc56..b55ef4ee727 100644 --- a/engine_tiflash/src/ps_engine/mod.rs +++ b/engine_tiflash/src/ps_engine/mod.rs @@ -1,12 +1,10 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -#[cfg(feature = "enable-pagestorage")] mod engine; mod ps_log_engine; #[cfg(feature = "enable-pagestorage")] pub(crate) mod ps_write_batch; -#[cfg(feature = "enable-pagestorage")] pub use engine::*; pub use ps_log_engine::*; #[cfg(feature = "enable-pagestorage")] diff --git a/engine_tiflash/src/rocks_engine/engine.rs b/engine_tiflash/src/rocks_engine/engine.rs index c326d6cf4b5..33a83637744 100644 --- a/engine_tiflash/src/rocks_engine/engine.rs +++ b/engine_tiflash/src/rocks_engine/engine.rs @@ -10,7 +10,6 @@ use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, r2e, util::get_cf_handle, - RocksEngine, }; #[derive(Clone, Debug)] @@ -65,12 +64,20 @@ impl ElementaryEngine for RocksElementEngine { } Ok(None) } -} -impl Iterable for RocksEngine { - type Iterator = RocksEngineIterator; + fn scan( + &self, + cf: &str, + start_key: &[u8], + end_key: &[u8], + fill_cache: bool, + f: &mut dyn FnMut(&[u8], &[u8]) -> Result, + ) -> Result<()> { + // THe original version leaves the default impl in engine_triats. + self.rocks.scan(cf, start_key, end_key, fill_cache, f) + } - fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { + fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { self.rocks.iterator_opt(cf, opts) } } diff --git a/engine_tiflash/src/rocks_engine/mod.rs b/engine_tiflash/src/rocks_engine/mod.rs index ee8559c6964..3597401ae74 100644 --- a/engine_tiflash/src/rocks_engine/mod.rs +++ b/engine_tiflash/src/rocks_engine/mod.rs @@ -1,8 +1,6 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -#[cfg(not(feature = "enable-pagestorage"))] mod engine; -#[cfg(not(feature = "enable-pagestorage"))] pub use engine::*; #[cfg(not(feature = "enable-pagestorage"))] From 2f375adbd57b80ae872009296f0facacf34288aa Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 1 Mar 2023 17:49:26 +0800 Subject: [PATCH 4/7] r Signed-off-by: CalvinNeo --- engine_tiflash/src/lib.rs | 7 +- engine_tiflash/src/mixed_engine/elementary.rs | 5 + engine_tiflash/src/mixed_engine/mod.rs | 19 +-- .../src/mixed_engine/write_batch.rs | 119 ++++++++++++++++++ engine_tiflash/src/proxy_utils/mod.rs | 9 +- engine_tiflash/src/ps_engine/engine.rs | 28 ++++- .../src/ps_engine/ps_write_batch.rs | 54 +++----- engine_tiflash/src/raft_engine.rs | 3 +- .../src/rocks_engine/write_batch.rs | 24 ++-- 9 files changed, 198 insertions(+), 70 deletions(-) create mode 100644 engine_tiflash/src/mixed_engine/write_batch.rs diff --git a/engine_tiflash/src/lib.rs b/engine_tiflash/src/lib.rs index 2c4fdac53b2..cf3ceba1b0b 100644 --- a/engine_tiflash/src/lib.rs +++ b/engine_tiflash/src/lib.rs @@ -56,11 +56,8 @@ pub use crate::table_properties::*; mod mixed_engine; mod ps_engine; mod rocks_engine; -#[cfg(feature = "enable-pagestorage")] -pub use crate::ps_engine::ps_write_batch::*; -#[cfg(not(feature = "enable-pagestorage"))] -pub use crate::rocks_engine::write_batch::*; -pub use crate::{ps_engine::PSLogEngine, rocks_engine::db_vector}; + +pub use crate::{mixed_engine::write_batch::*, ps_engine::PSLogEngine, rocks_engine::db_vector}; pub mod mvcc_properties; pub use crate::mvcc_properties::*; diff --git a/engine_tiflash/src/mixed_engine/elementary.rs b/engine_tiflash/src/mixed_engine/elementary.rs index ea5284eeb2e..da25d93c37e 100644 --- a/engine_tiflash/src/mixed_engine/elementary.rs +++ b/engine_tiflash/src/mixed_engine/elementary.rs @@ -4,6 +4,7 @@ use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, ReadOptions, Result}; use super::MixedDbVector; +use super::write_batch::MixedWriteBatch; pub trait ElementaryEngine: std::fmt::Debug { fn put(&self, key: &[u8], value: &[u8]) -> Result<()>; @@ -32,4 +33,8 @@ pub trait ElementaryEngine: std::fmt::Debug { ) -> Result<()>; fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result; + + fn write_batch(&self) -> MixedWriteBatch; + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch; } diff --git a/engine_tiflash/src/mixed_engine/mod.rs b/engine_tiflash/src/mixed_engine/mod.rs index 34db92819eb..588ffc994bb 100644 --- a/engine_tiflash/src/mixed_engine/mod.rs +++ b/engine_tiflash/src/mixed_engine/mod.rs @@ -1,6 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. pub mod elementary; +pub mod write_batch; use std::{ fmt::{self, Debug, Formatter}, @@ -60,10 +61,11 @@ impl Peekable for RocksEngine { type DbVector = MixedDbVector; fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { - self.element_engine - .as_ref() - .unwrap() - .get_value_opt(opts, key) + if let Some(e) = self.element_engine.as_ref() { + e.get_value_opt(opts, key) + } else { + Err(tikv_util::box_err!("mixed engine not inited")) + } } fn get_value_cf_opt( @@ -72,10 +74,11 @@ impl Peekable for RocksEngine { cf: &str, key: &[u8], ) -> Result> { - self.element_engine - .as_ref() - .unwrap() - .get_value_cf_opt(opts, cf, key) + if let Some(e) = self.element_engine.as_ref() { + e.get_value_cf_opt(opts, cf, key) + } else { + Err(tikv_util::box_err!("mixed engine not inited")) + } } } diff --git a/engine_tiflash/src/mixed_engine/write_batch.rs b/engine_tiflash/src/mixed_engine/write_batch.rs new file mode 100644 index 00000000000..40ad9aac867 --- /dev/null +++ b/engine_tiflash/src/mixed_engine/write_batch.rs @@ -0,0 +1,119 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +#![allow(unused_variables)] +use std::sync::Arc; + +use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; +use proxy_ffi::interfaces_ffi::RawCppPtr; +use rocksdb::{WriteBatch as RawWriteBatch, DB}; + +use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt}; + +pub struct MixedWriteBatch { + pub inner: Box, +} + +impl WriteBatchExt for RocksEngine { + type WriteBatch = MixedWriteBatch; + + fn write_batch(&self) -> MixedWriteBatch { + self.element_engine.as_ref().unwrap().write_batch() + } + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { + self.element_engine.as_ref().unwrap().write_batch_with_cap(cap) + } +} + +impl engine_traits::WriteBatch for MixedWriteBatch { + fn write_opt(&mut self, opts: &WriteOptions) -> Result { + // write into ps + self.inner.write_opt(opts) + } + + fn data_size(&self) -> usize { + self.inner.data_size() + } + + fn count(&self) -> usize { + self.inner.count() + } + + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + fn should_write_to_engine(&self) -> bool { + // Disable TiKV's logic, and using Proxy's instead. + false + } + + fn clear(&mut self) { + self.inner.clear(); + } + + fn set_save_point(&mut self) { + self.wbs[self.index].set_save_point(); + self.save_points.push(self.index); + } + + fn pop_save_point(&mut self) -> Result<()> { + if let Some(x) = self.save_points.pop() { + return self.wbs[x].pop_save_point().map_err(r2e); + } + Err(r2e("no save point")) + } + + fn rollback_to_save_point(&mut self) -> Result<()> { + if let Some(x) = self.save_points.pop() { + for i in x + 1..=self.index { + self.wbs[i].clear(); + } + self.index = x; + return self.wbs[x].rollback_to_save_point().map_err(r2e); + } + Err(r2e("no save point")) + } + + fn merge(&mut self, other: Self) -> Result<()> { + self.inner.merge(other) + } +} + +impl Mutable for MixedWriteBatch { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + if !self.do_write(engine_traits::CF_DEFAULT, key) { + return Ok(()); + } + self.put(key, value) + } + + fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { + if !self.do_write(cf, key) { + return Ok(()); + } + self.put_cf(cf, key, value) + } + + fn delete(&mut self, key: &[u8]) -> Result<()> { + if !self.do_write(engine_traits::CF_DEFAULT, key) { + return Ok(()); + } + self.delete(key) + } + + fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { + if !self.do_write(cf, key) { + return Ok(()); + } + self.delete_cf(cf, key) + } + + fn delete_range(&mut self, begin_key: &[u8], end_key: &[u8]) -> Result<()> { + Ok(()) + } + + fn delete_range_cf(&mut self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> { + Ok(()) + } +} diff --git a/engine_tiflash/src/proxy_utils/mod.rs b/engine_tiflash/src/proxy_utils/mod.rs index 929dd73153c..5aeef9867c4 100644 --- a/engine_tiflash/src/proxy_utils/mod.rs +++ b/engine_tiflash/src/proxy_utils/mod.rs @@ -10,6 +10,7 @@ mod proxy_ext; pub use proxy_ext::*; use crate::util::get_cf_handle; +use crate::mixed_engine::write_batch::MixedWriteBatch as RocksWriteBatchVec; pub fn do_write(cf: &str, key: &[u8]) -> bool { fail::fail_point!("before_tiflash_do_write", |_| true); @@ -22,7 +23,7 @@ pub fn do_write(cf: &str, key: &[u8]) -> bool { } } -pub fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str { +pub fn cf_to_name(batch: &RocksWriteBatchVec, cf: u32) -> &'static str { // d 0 w 2 l 1 let handle_default = get_cf_handle(batch.db.as_ref(), engine_traits::CF_DEFAULT).unwrap(); let d = handle_default.id(); @@ -42,7 +43,7 @@ pub fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str { } #[cfg(any(test, feature = "testexport"))] -pub fn check_double_write(batch: &crate::RocksWriteBatchVec) { +pub fn check_double_write(batch: &RocksWriteBatchVec) { // It will fire if we write by both observer(compat_old_proxy is not enabled) // and TiKV's WriteBatch. fail::fail_point!("before_tiflash_check_double_write", |_| {}); @@ -61,9 +62,9 @@ pub fn check_double_write(batch: &crate::RocksWriteBatchVec) { } } #[cfg(not(any(test, feature = "testexport")))] -pub fn check_double_write(_: &crate::RocksWriteBatchVec) {} +pub fn check_double_write(_: &RocksWriteBatchVec) {} -pub fn log_check_double_write(batch: &crate::RocksWriteBatchVec) -> bool { +pub fn log_check_double_write(batch: &RocksWriteBatchVec) -> bool { check_double_write(batch); // TODO(tiflash) re-support this tracker. let mut e = true; diff --git a/engine_tiflash/src/ps_engine/engine.rs b/engine_tiflash/src/ps_engine/engine.rs index 82b732a66b4..0ca8c2ad4a8 100644 --- a/engine_tiflash/src/ps_engine/engine.rs +++ b/engine_tiflash/src/ps_engine/engine.rs @@ -2,11 +2,14 @@ use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, ReadOptions, Result}; - +use std::sync::Arc; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, PageStorageExt, }; +use crate::MixedWriteBatch; +use super::{PSEngineWriteBatch, PSRocksWriteBatchVec}; +use super::ps_engine::WRITE_BATCH_LIMIT; #[derive(Clone, Debug)] pub struct PSElementEngine { @@ -90,6 +93,29 @@ impl ElementaryEngine for PSElementEngine { panic!("iterator_opt should not be called in PS engine"); r } + + fn write_batch(&self) -> MixedWriteBatch { + MixedWriteBatch { + inner: PSRocksWriteBatchVec::new( + Arc::clone(self.as_inner()), + self.ps_ext.clone(), + self.ps_ext.as_ref().unwrap().create_write_batch(), + WRITE_BATCH_LIMIT, + 1, + self.support_multi_batch_write(), + ), + } + } + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { + MixedWriteBatch { + inner: PSRocksWriteBatchVec::with_unit_capacity( + self, + self.ps_ext.as_ref().unwrap().create_write_batch(), + cap, + ), + } + } } // Some data may be migrated from kv engine to raft engine in the future, diff --git a/engine_tiflash/src/ps_engine/ps_write_batch.rs b/engine_tiflash/src/ps_engine/ps_write_batch.rs index 98ec8dca9b8..181eabed356 100644 --- a/engine_tiflash/src/ps_engine/ps_write_batch.rs +++ b/engine_tiflash/src/ps_engine/ps_write_batch.rs @@ -6,39 +6,15 @@ use std::sync::Arc; use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; use proxy_ffi::interfaces_ffi::RawCppPtr; use rocksdb::{WriteBatch as RawWriteBatch, DB}; +use crate::mixed_engine::elementary::ElementaryEngine; use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt}; -const WRITE_BATCH_MAX_BATCH: usize = 16; -const WRITE_BATCH_LIMIT: usize = 16; - -impl WriteBatchExt for RocksEngine { - type WriteBatch = RocksWriteBatchVec; - - const WRITE_BATCH_MAX_KEYS: usize = 256; - - fn write_batch(&self) -> RocksWriteBatchVec { - RocksWriteBatchVec::new( - Arc::clone(self.as_inner()), - self.ps_ext.clone(), - self.ps_ext.as_ref().unwrap().create_write_batch(), - WRITE_BATCH_LIMIT, - 1, - self.support_multi_batch_write(), - ) - } - - fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { - RocksWriteBatchVec::with_unit_capacity( - self, - self.ps_ext.as_ref().unwrap().create_write_batch(), - cap, - ) - } -} +pub const WRITE_BATCH_MAX_BATCH: usize = 16; +pub const WRITE_BATCH_LIMIT: usize = 16; /// Used when impl WriteBatchExt. -/// `RocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which +/// `PSRocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could /// help to deal with these small WriteBatch when it is calling /// `MultiBatchCommit` and wait the front writer to finish writing. @@ -46,7 +22,7 @@ impl WriteBatchExt for RocksEngine { /// `pipelined_write` when TiKV writes very large data into RocksDB. /// We will remove this feature when `unordered_write` of RocksDB becomes more /// stable and becomes compatible with Titan. -pub struct RocksWriteBatchVec { +pub struct PSRocksWriteBatchVec { pub db: Arc, pub wbs: Vec, pub ps_ext: Option, @@ -57,7 +33,7 @@ pub struct RocksWriteBatchVec { support_write_batch_vec: bool, } -impl Drop for RocksWriteBatchVec { +impl Drop for PSRocksWriteBatchVec { fn drop(&mut self) { if !self.ps_wb.ptr.is_null() { self.ps_ext @@ -69,7 +45,7 @@ impl Drop for RocksWriteBatchVec { } } -impl RocksWriteBatchVec { +impl PSRocksWriteBatchVec { pub fn new( db: Arc, ps_ext: Option, @@ -77,9 +53,9 @@ impl RocksWriteBatchVec { batch_size_limit: usize, cap: usize, support_write_batch_vec: bool, - ) -> RocksWriteBatchVec { + ) -> PSRocksWriteBatchVec { let wb = RawWriteBatch::with_capacity(cap); - RocksWriteBatchVec { + PSRocksWriteBatchVec { db, wbs: vec![wb], ps_ext, @@ -95,7 +71,7 @@ impl RocksWriteBatchVec { engine: &RocksEngine, ps_wb: RawCppPtr, cap: usize, - ) -> RocksWriteBatchVec { + ) -> PSRocksWriteBatchVec { Self::new( engine.as_inner().clone(), engine.ps_ext.clone(), @@ -131,7 +107,7 @@ impl RocksWriteBatchVec { } } -impl engine_traits::WriteBatch for RocksWriteBatchVec { +impl engine_traits::WriteBatch for PSRocksWriteBatchVec { fn write_opt(&mut self, opts: &WriteOptions) -> Result { // write into ps self.ps_ext @@ -209,13 +185,13 @@ impl engine_traits::WriteBatch for RocksWriteBatchVec { } } -impl RocksWriteBatchVec { +impl PSRocksWriteBatchVec { fn do_write(&self, cf: &str, key: &[u8]) -> bool { crate::do_write(cf, key) } } -impl Mutable for RocksWriteBatchVec { +impl Mutable for PSRocksWriteBatchVec { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { if !self.do_write(engine_traits::CF_DEFAULT, key) { return Ok(()); @@ -318,7 +294,7 @@ mod tests { assert!(v.is_some()); assert_eq!(v.unwrap(), b"bbb"); - let mut wb = RocksWriteBatchVec::with_unit_capacity(&engine, 1024); + let mut wb = PSRocksWriteBatchVec::with_unit_capacity(&engine, 1024); for _i in 0..RocksEngine::WRITE_BATCH_MAX_KEYS { wb.put(b"aaa", b"bbb").unwrap(); } @@ -358,7 +334,7 @@ mod tests { assert!(!wb.should_write_to_engine()); wb.put(b"aaa", b"bbb").unwrap(); assert!(wb.should_write_to_engine()); - let mut wb = RocksWriteBatchVec::with_unit_capacity(&engine, 1024); + let mut wb = PSRocksWriteBatchVec::with_unit_capacity(&engine, 1024); for _i in 0..WRITE_BATCH_MAX_BATCH * WRITE_BATCH_LIMIT { wb.put(b"aaa", b"bbb").unwrap(); } diff --git a/engine_tiflash/src/raft_engine.rs b/engine_tiflash/src/raft_engine.rs index f6adddc3782..134f7671de3 100644 --- a/engine_tiflash/src/raft_engine.rs +++ b/engine_tiflash/src/raft_engine.rs @@ -16,7 +16,8 @@ use protobuf::Message; use raft::eraftpb::Entry; use tikv_util::{box_err, box_try}; -use crate::{util, RocksEngine, RocksWriteBatchVec}; +use crate::{util, RocksEngine}; +use crate::mixed_engine::write_batch::MixedWriteBatch as RocksWriteBatchVec; impl RaftEngineReadOnly for RocksEngine { fn get_raft_state(&self, raft_group_id: u64) -> Result> { diff --git a/engine_tiflash/src/rocks_engine/write_batch.rs b/engine_tiflash/src/rocks_engine/write_batch.rs index 2c6dd8e6bbb..20b6333b3d4 100644 --- a/engine_tiflash/src/rocks_engine/write_batch.rs +++ b/engine_tiflash/src/rocks_engine/write_batch.rs @@ -12,22 +12,22 @@ use crate::{options::RocksWriteOptions, r2e, util::get_cf_handle, RocksEngine}; const WRITE_BATCH_MAX_BATCH: usize = 16; const WRITE_BATCH_LIMIT: usize = 16; -impl WriteBatchExt for RocksEngine { - type WriteBatch = RocksWriteBatchVec; - - const WRITE_BATCH_MAX_KEYS: usize = 256; - +impl ElementaryEngine for PSElementEngine { fn write_batch(&self) -> RocksWriteBatchVec { - RocksWriteBatchVec::new( - Arc::clone(self.as_inner()), - WRITE_BATCH_LIMIT, - 1, - self.support_multi_batch_write(), - ) + MixedWriteBatch { + inner: RocksWriteBatchVec::new( + Arc::clone(self.as_inner()), + WRITE_BATCH_LIMIT, + 1, + self.support_multi_batch_write(), + ), + } } fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { - RocksWriteBatchVec::with_unit_capacity(self, cap) + MixedWriteBatch { + inner: RocksWriteBatchVec::with_unit_capacity(self, cap), + } } } From b8775fd972b6e3e12f7938a6738c3ff2884ded36 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 1 Mar 2023 19:05:27 +0800 Subject: [PATCH 5/7] fix init element engine Signed-off-by: CalvinNeo --- engine_tiflash/src/mixed_engine/elementary.rs | 1 + engine_tiflash/src/mixed_engine/mod.rs | 18 ++++++++++-------- engine_tiflash/src/proxy_utils/engine_ext.rs | 3 ++- .../src/mock_cluster/cluster_ext.rs | 4 ++++ proxy_server/src/run.rs | 4 ++++ 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/engine_tiflash/src/mixed_engine/elementary.rs b/engine_tiflash/src/mixed_engine/elementary.rs index ea5284eeb2e..f849f9818f7 100644 --- a/engine_tiflash/src/mixed_engine/elementary.rs +++ b/engine_tiflash/src/mixed_engine/elementary.rs @@ -22,6 +22,7 @@ pub trait ElementaryEngine: std::fmt::Debug { key: &[u8], ) -> Result>; + #[allow(clippy::type_complexity)] fn scan( &self, cf: &str, diff --git a/engine_tiflash/src/mixed_engine/mod.rs b/engine_tiflash/src/mixed_engine/mod.rs index 34db92819eb..26449d7ba80 100644 --- a/engine_tiflash/src/mixed_engine/mod.rs +++ b/engine_tiflash/src/mixed_engine/mod.rs @@ -60,10 +60,11 @@ impl Peekable for RocksEngine { type DbVector = MixedDbVector; fn get_value_opt(&self, opts: &ReadOptions, key: &[u8]) -> Result> { - self.element_engine - .as_ref() - .unwrap() - .get_value_opt(opts, key) + if let Some(e) = self.element_engine.as_ref() { + e.get_value_opt(opts, key) + } else { + Err(tikv_util::box_err!("mixed engine not inited")) + } } fn get_value_cf_opt( @@ -72,10 +73,11 @@ impl Peekable for RocksEngine { cf: &str, key: &[u8], ) -> Result> { - self.element_engine - .as_ref() - .unwrap() - .get_value_cf_opt(opts, cf, key) + if let Some(e) = self.element_engine.as_ref() { + e.get_value_cf_opt(opts, cf, key) + } else { + Err(tikv_util::box_err!("mixed engine not inited")) + } } } diff --git a/engine_tiflash/src/proxy_utils/engine_ext.rs b/engine_tiflash/src/proxy_utils/engine_ext.rs index 86c66aa8e25..76ea6d28a57 100644 --- a/engine_tiflash/src/proxy_utils/engine_ext.rs +++ b/engine_tiflash/src/proxy_utils/engine_ext.rs @@ -29,7 +29,7 @@ impl PageStorageExt { // TODO There are too many dummy write batch created in non-uni-ps impl. // Need to work out a solution for this. // See engine_tiflash/src/write_batch.rs. - self.helper().create_write_batch().into() + self.helper().create_write_batch() } pub fn destroy_write_batch(&self, wb_wrapper: &RawCppPtr) { @@ -75,6 +75,7 @@ impl PageStorageExt { }; } + #[allow(clippy::type_complexity)] pub fn scan_page( &self, start_page_id: &[u8], diff --git a/mock-engine-store/src/mock_cluster/cluster_ext.rs b/mock-engine-store/src/mock_cluster/cluster_ext.rs index c0b71783920..3aca8ae3b49 100644 --- a/mock-engine-store/src/mock_cluster/cluster_ext.rs +++ b/mock-engine-store/src/mock_cluster/cluster_ext.rs @@ -159,7 +159,11 @@ impl ClusterExt { Some(proxy_config_set), ); + ffi_helper_set.proxy.set_kv_engine( + engine_store_ffi::ffi::RaftStoreProxyEngine::from_tiflash_engine(engines.kv.clone()), + ); assert_ne!(engines.kv.proxy_ext.engine_store_server_helper, 0); + assert!(engines.kv.element_engine.is_some()); cluster.cluster_ext.ffi_helper_lst.push(ffi_helper_set); } } diff --git a/proxy_server/src/run.rs b/proxy_server/src/run.rs index 00a37cfa286..c9eacb5582d 100644 --- a/proxy_server/src/run.rs +++ b/proxy_server/src/run.rs @@ -180,10 +180,14 @@ pub fn run_impl( let fetcher = tikv.init_io_utility(); let listener = tikv.init_flow_receiver(); let engine_store_server_helper_ptr = engine_store_server_helper as *const _ as isize; + // Will call TiFlashEngine::init let (engines, engines_info) = tikv.init_tiflash_engines(listener, engine_store_server_helper_ptr); tikv.init_engines(engines.clone()); { + if engines.kv.element_engine.is_none() { + error!("TiFlashEngine has empty ElementaryEngine"); + } proxy.set_kv_engine( engine_store_ffi::ffi::RaftStoreProxyEngine::from_tiflash_engine(engines.kv.clone()), ); From 1a767e40a754409d1abc5f41c6ea3dce34e04d88 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 2 Mar 2023 13:45:11 +0800 Subject: [PATCH 6/7] f Signed-off-by: CalvinNeo --- .../src/mixed_engine/write_batch.rs | 3 ++- engine_tiflash/src/proxy_utils/engine_ext.rs | 4 ++-- engine_tiflash/src/ps_engine/engine.rs | 2 +- engine_tiflash/src/ps_engine/mod.rs | 2 -- engine_tiflash/src/rocks_engine/engine.rs | 18 +++++++++++++++ engine_tiflash/src/rocks_engine/mod.rs | 2 -- .../src/rocks_engine/write_batch.rs | 22 ++----------------- 7 files changed, 25 insertions(+), 28 deletions(-) diff --git a/engine_tiflash/src/mixed_engine/write_batch.rs b/engine_tiflash/src/mixed_engine/write_batch.rs index 40ad9aac867..d14188929ad 100644 --- a/engine_tiflash/src/mixed_engine/write_batch.rs +++ b/engine_tiflash/src/mixed_engine/write_batch.rs @@ -6,11 +6,12 @@ use std::sync::Arc; use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; use proxy_ffi::interfaces_ffi::RawCppPtr; use rocksdb::{WriteBatch as RawWriteBatch, DB}; +use tikv_util::Either; use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt}; pub struct MixedWriteBatch { - pub inner: Box, + pub inner: Either, } impl WriteBatchExt for RocksEngine { diff --git a/engine_tiflash/src/proxy_utils/engine_ext.rs b/engine_tiflash/src/proxy_utils/engine_ext.rs index 76ea6d28a57..d3dabdc042a 100644 --- a/engine_tiflash/src/proxy_utils/engine_ext.rs +++ b/engine_tiflash/src/proxy_utils/engine_ext.rs @@ -68,11 +68,11 @@ impl PageStorageExt { pub fn read_page(&self, page_id: &[u8]) -> Option> { // TODO maybe we can steal memory from C++ here to reduce redundant copy? let value = self.helper().read_page(page_id.into()); - return if value.view.len == 0 { + if value.view.len == 0 { None } else { Some(value.view.to_slice().to_vec()) - }; + } } #[allow(clippy::type_complexity)] diff --git a/engine_tiflash/src/ps_engine/engine.rs b/engine_tiflash/src/ps_engine/engine.rs index 0ca8c2ad4a8..e3d5eaf039e 100644 --- a/engine_tiflash/src/ps_engine/engine.rs +++ b/engine_tiflash/src/ps_engine/engine.rs @@ -9,7 +9,7 @@ use crate::{ }; use crate::MixedWriteBatch; use super::{PSEngineWriteBatch, PSRocksWriteBatchVec}; -use super::ps_engine::WRITE_BATCH_LIMIT; +use super::ps_write_batch::WRITE_BATCH_LIMIT; #[derive(Clone, Debug)] pub struct PSElementEngine { diff --git a/engine_tiflash/src/ps_engine/mod.rs b/engine_tiflash/src/ps_engine/mod.rs index b55ef4ee727..3a053e84ce6 100644 --- a/engine_tiflash/src/ps_engine/mod.rs +++ b/engine_tiflash/src/ps_engine/mod.rs @@ -2,10 +2,8 @@ mod engine; mod ps_log_engine; -#[cfg(feature = "enable-pagestorage")] pub(crate) mod ps_write_batch; pub use engine::*; pub use ps_log_engine::*; -#[cfg(feature = "enable-pagestorage")] pub use ps_write_batch::*; diff --git a/engine_tiflash/src/rocks_engine/engine.rs b/engine_tiflash/src/rocks_engine/engine.rs index 33a83637744..b26434b959c 100644 --- a/engine_tiflash/src/rocks_engine/engine.rs +++ b/engine_tiflash/src/rocks_engine/engine.rs @@ -5,6 +5,7 @@ use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result}; use rocksdb::Writable; +use super::WRITE_BATCH_LIMIT; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, @@ -80,4 +81,21 @@ impl ElementaryEngine for RocksElementEngine { fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { self.rocks.iterator_opt(cf, opts) } + + fn write_batch(&self) -> RocksWriteBatchVec { + MixedWriteBatch { + inner: RocksWriteBatchVec::new( + Arc::clone(self.as_inner()), + WRITE_BATCH_LIMIT, + 1, + self.support_multi_batch_write(), + ), + } + } + + fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { + MixedWriteBatch { + inner: RocksWriteBatchVec::with_unit_capacity(self, cap), + } + } } diff --git a/engine_tiflash/src/rocks_engine/mod.rs b/engine_tiflash/src/rocks_engine/mod.rs index 3597401ae74..e1379ac8373 100644 --- a/engine_tiflash/src/rocks_engine/mod.rs +++ b/engine_tiflash/src/rocks_engine/mod.rs @@ -3,9 +3,7 @@ mod engine; pub use engine::*; -#[cfg(not(feature = "enable-pagestorage"))] pub mod write_batch; -#[cfg(not(feature = "enable-pagestorage"))] pub use write_batch::*; pub mod db_vector; diff --git a/engine_tiflash/src/rocks_engine/write_batch.rs b/engine_tiflash/src/rocks_engine/write_batch.rs index 20b6333b3d4..4159173b044 100644 --- a/engine_tiflash/src/rocks_engine/write_batch.rs +++ b/engine_tiflash/src/rocks_engine/write_batch.rs @@ -6,31 +6,13 @@ use std::sync::Arc; use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB}; - +use crate::mixed_engine::elementary::ElementaryEngine; +use crate::ps_engine::PSElementEngine; use crate::{options::RocksWriteOptions, r2e, util::get_cf_handle, RocksEngine}; const WRITE_BATCH_MAX_BATCH: usize = 16; const WRITE_BATCH_LIMIT: usize = 16; -impl ElementaryEngine for PSElementEngine { - fn write_batch(&self) -> RocksWriteBatchVec { - MixedWriteBatch { - inner: RocksWriteBatchVec::new( - Arc::clone(self.as_inner()), - WRITE_BATCH_LIMIT, - 1, - self.support_multi_batch_write(), - ), - } - } - - fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { - MixedWriteBatch { - inner: RocksWriteBatchVec::with_unit_capacity(self, cap), - } - } -} - /// `RocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could /// help to deal with these small WriteBatch when it is calling From 0e207b98d3390f7fd2a0eeb9e488c666973cd1a9 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 2 Mar 2023 14:10:05 +0800 Subject: [PATCH 7/7] f Signed-off-by: CalvinNeo --- .../src/mixed_engine/write_batch.rs | 55 +++++++++++++++---- engine_tiflash/src/ps_engine/engine.rs | 14 +++-- .../src/ps_engine/ps_write_batch.rs | 17 +----- engine_tiflash/src/rocks_engine/engine.rs | 18 +++--- .../src/rocks_engine/write_batch.rs | 16 +----- 5 files changed, 68 insertions(+), 52 deletions(-) diff --git a/engine_tiflash/src/mixed_engine/write_batch.rs b/engine_tiflash/src/mixed_engine/write_batch.rs index 57140131be0..1dbd39482ce 100644 --- a/engine_tiflash/src/mixed_engine/write_batch.rs +++ b/engine_tiflash/src/mixed_engine/write_batch.rs @@ -10,6 +10,9 @@ use tikv_util::Either; use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt}; +pub const WRITE_BATCH_MAX_BATCH: usize = 16; +pub const WRITE_BATCH_LIMIT: usize = 16; + pub struct MixedWriteBatch { pub inner: Either, @@ -18,6 +21,8 @@ pub struct MixedWriteBatch { impl WriteBatchExt for RocksEngine { type WriteBatch = MixedWriteBatch; + const WRITE_BATCH_MAX_KEYS: usize = 256; + fn write_batch(&self) -> MixedWriteBatch { self.element_engine.as_ref().unwrap().write_batch() } @@ -33,19 +38,31 @@ impl WriteBatchExt for RocksEngine { impl engine_traits::WriteBatch for MixedWriteBatch { fn write_opt(&mut self, opts: &WriteOptions) -> Result { // write into ps - self.inner.write_opt(opts) + match self.inner.as_mut() { + Either::Left(x) => x.write_opt(opts), + Either::Right(x) => x.write_opt(opts), + } } fn data_size(&self) -> usize { - self.inner.data_size() + match self.inner.as_ref() { + Either::Left(x) => x.data_size(), + Either::Right(x) => x.data_size(), + } } fn count(&self) -> usize { - self.inner.count() + match self.inner.as_ref() { + Either::Left(x) => x.count(), + Either::Right(x) => x.count(), + } } fn is_empty(&self) -> bool { - self.inner.is_empty() + match self.inner.as_ref() { + Either::Left(x) => x.is_empty(), + Either::Right(x) => x.is_empty(), + } } fn should_write_to_engine(&self) -> bool { @@ -54,7 +71,10 @@ impl engine_traits::WriteBatch for MixedWriteBatch { } fn clear(&mut self) { - self.inner.clear(); + match self.inner.as_mut() { + Either::Left(x) => x.clear(), + Either::Right(x) => x.clear(), + } } fn set_save_point(&mut self) { @@ -81,7 +101,10 @@ impl engine_traits::WriteBatch for MixedWriteBatch { } fn merge(&mut self, other: Self) -> Result<()> { - self.inner.merge(other) + match self.inner.as_mut() { + Either::Left(x) => x.merge(other.left().unwrap()), + Either::Right(x) => x.merge(other.right().unwrap()), + } } } @@ -90,28 +113,40 @@ impl Mutable for MixedWriteBatch { if !self.do_write(engine_traits::CF_DEFAULT, key) { return Ok(()); } - self.put(key, value) + match self.inner.as_mut() { + Either::Left(x) => x.put(key, value), + Either::Right(x) => x.put(key, value), + } } fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { if !self.do_write(cf, key) { return Ok(()); } - self.put_cf(cf, key, value) + match self.inner.as_mut() { + Either::Left(x) => x.put_cf(cf, key, value), + Either::Right(x) => x.put_cf(cf, key, value), + } } fn delete(&mut self, key: &[u8]) -> Result<()> { if !self.do_write(engine_traits::CF_DEFAULT, key) { return Ok(()); } - self.delete(key) + match self.inner.as_mut() { + Either::Left(x) => x.delete(key), + Either::Right(x) => x.delete(key), + } } fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { if !self.do_write(cf, key) { return Ok(()); } - self.delete_cf(cf, key) + match self.inner.as_mut() { + Either::Left(x) => x.delete_cf(cf, key), + Either::Right(x) => x.delete_cf(cf, key), + } } fn delete_range(&mut self, begin_key: &[u8], end_key: &[u8]) -> Result<()> { diff --git a/engine_tiflash/src/ps_engine/engine.rs b/engine_tiflash/src/ps_engine/engine.rs index 59a78a17d86..20bddde4537 100644 --- a/engine_tiflash/src/ps_engine/engine.rs +++ b/engine_tiflash/src/ps_engine/engine.rs @@ -5,12 +5,14 @@ use std::sync::Arc; use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, ReadOptions, Result}; -use super::{ps_write_batch::WRITE_BATCH_LIMIT, PSEngineWriteBatch, PSRocksWriteBatchVec}; +use super::{PSEngineWriteBatch, PSRocksWriteBatchVec}; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, MixedWriteBatch, PageStorageExt, }; - +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; +use tikv_util::Either; #[derive(Clone, Debug)] pub struct PSElementEngine { pub ps_ext: PageStorageExt, @@ -96,24 +98,24 @@ impl ElementaryEngine for PSElementEngine { fn write_batch(&self) -> MixedWriteBatch { MixedWriteBatch { - inner: PSRocksWriteBatchVec::new( + inner: Either::Right(PSRocksWriteBatchVec::new( Arc::clone(self.as_inner()), self.ps_ext.clone(), self.ps_ext.as_ref().unwrap().create_write_batch(), WRITE_BATCH_LIMIT, 1, self.support_multi_batch_write(), - ), + )), } } fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { MixedWriteBatch { - inner: PSRocksWriteBatchVec::with_unit_capacity( + inner: Either::Right(PSRocksWriteBatchVec::with_unit_capacity( self, self.ps_ext.as_ref().unwrap().create_write_batch(), cap, - ), + )), } } } diff --git a/engine_tiflash/src/ps_engine/ps_write_batch.rs b/engine_tiflash/src/ps_engine/ps_write_batch.rs index 7987e96668b..eebf544a983 100644 --- a/engine_tiflash/src/ps_engine/ps_write_batch.rs +++ b/engine_tiflash/src/ps_engine/ps_write_batch.rs @@ -12,8 +12,9 @@ use crate::{ PageStorageExt, }; -pub const WRITE_BATCH_MAX_BATCH: usize = 16; -pub const WRITE_BATCH_LIMIT: usize = 16; +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; + /// Used when impl WriteBatchExt. /// `PSRocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could @@ -194,9 +195,6 @@ impl PSRocksWriteBatchVec { impl Mutable for PSRocksWriteBatchVec { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.ps_ext.as_ref().unwrap().write_batch_put_page( self.ps_wb.ptr, add_prefix(key).as_slice(), @@ -206,9 +204,6 @@ impl Mutable for PSRocksWriteBatchVec { } fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.ps_ext.as_ref().unwrap().write_batch_put_page( self.ps_wb.ptr, add_prefix(key).as_slice(), @@ -218,9 +213,6 @@ impl Mutable for PSRocksWriteBatchVec { } fn delete(&mut self, key: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.ps_ext .as_ref() .unwrap() @@ -229,9 +221,6 @@ impl Mutable for PSRocksWriteBatchVec { } fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.ps_ext .as_ref() .unwrap() diff --git a/engine_tiflash/src/rocks_engine/engine.rs b/engine_tiflash/src/rocks_engine/engine.rs index 7f31a2edf73..0b40f795646 100644 --- a/engine_tiflash/src/rocks_engine/engine.rs +++ b/engine_tiflash/src/rocks_engine/engine.rs @@ -1,12 +1,14 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. #![allow(unused_variables)] - +use std::sync::Arc; use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result}; use rocksdb::Writable; - -use super::WRITE_BATCH_LIMIT; +use tikv_util::Either; +use crate::MixedWriteBatch; +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, r2e, @@ -82,20 +84,20 @@ impl ElementaryEngine for RocksElementEngine { self.rocks.iterator_opt(cf, opts) } - fn write_batch(&self) -> RocksWriteBatchVec { + fn write_batch(&self) -> MixedWriteBatch { MixedWriteBatch { - inner: RocksWriteBatchVec::new( + inner: Either::Left(super::RocksWriteBatchVec::new( Arc::clone(self.as_inner()), WRITE_BATCH_LIMIT, 1, self.support_multi_batch_write(), - ), + )), } } - fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { MixedWriteBatch { - inner: RocksWriteBatchVec::with_unit_capacity(self, cap), + inner: Either::Left(super::RocksWriteBatchVec::with_unit_capacity(self, cap)), } } } diff --git a/engine_tiflash/src/rocks_engine/write_batch.rs b/engine_tiflash/src/rocks_engine/write_batch.rs index fdcd88827bb..f74e4afc698 100644 --- a/engine_tiflash/src/rocks_engine/write_batch.rs +++ b/engine_tiflash/src/rocks_engine/write_batch.rs @@ -12,8 +12,8 @@ use crate::{ ps_engine::PSElementEngine, r2e, util::get_cf_handle, RocksEngine, }; -const WRITE_BATCH_MAX_BATCH: usize = 16; -const WRITE_BATCH_LIMIT: usize = 16; +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; /// `RocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could @@ -189,34 +189,22 @@ impl RocksWriteBatchVec { impl Mutable for RocksWriteBatchVec { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.check_switch_batch(); self.wbs[self.index].put(key, value).map_err(r2e) } fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.check_switch_batch(); let handle = get_cf_handle(self.db.as_ref(), cf)?; self.wbs[self.index].put_cf(handle, key, value).map_err(r2e) } fn delete(&mut self, key: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.check_switch_batch(); self.wbs[self.index].delete(key).map_err(r2e) } fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.check_switch_batch(); let handle = get_cf_handle(self.db.as_ref(), cf)?; self.wbs[self.index].delete_cf(handle, key).map_err(r2e)