diff --git a/components/engine_rocks/src/engine.rs b/components/engine_rocks/src/engine.rs index 2ec323eef172..d676bbd5f688 100644 --- a/components/engine_rocks/src/engine.rs +++ b/components/engine_rocks/src/engine.rs @@ -6,7 +6,7 @@ use std::path::Path; use std::sync::Arc; use engine_traits::{ - Error, IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable, + Error, IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable,WriteBatchExt, }; use rocksdb::{DBIterator, Writable, DB}; @@ -26,6 +26,7 @@ use crate::{RocksEngineIterator, RocksSnapshot}; pub struct RocksEngine { db: Arc, shared_block_cache: bool, + max_batch_key_size: usize, } impl RocksEngine { @@ -33,6 +34,7 @@ impl RocksEngine { RocksEngine { db, shared_block_cache: false, + max_batch_key_size: Self::WRITE_BATCH_MAX_KEYS, } } @@ -63,6 +65,14 @@ impl RocksEngine { pub fn set_shared_block_cache(&mut self, enable: bool) { self.shared_block_cache = enable; } + + pub fn set_max_batch_key_size(&mut self, max_batch_key_size: usize) { + self.max_batch_key_size = max_batch_key_size; + } + + pub fn max_batch_key_size(&self) -> usize { + return self.max_batch_key_size; + } } impl KvEngine for RocksEngine { diff --git a/components/engine_rocks/src/misc.rs b/components/engine_rocks/src/misc.rs index 2dae53fd36a9..dd904dab8318 100644 --- a/components/engine_rocks/src/misc.rs +++ b/components/engine_rocks/src/misc.rs @@ -89,7 +89,7 @@ impl RocksEngine { let mut wb = self.write_batch(); for key in data.iter() { wb.delete_cf(cf, key)?; - if wb.count() >= Self::WRITE_BATCH_MAX_KEYS { + if wb.count() >= self.max_batch_key_size() { wb.write()?; wb.clear(); } @@ -115,7 +115,7 @@ impl RocksEngine { let mut wb = self.write_batch(); while it_valid { wb.delete_cf(cf, it.key())?; - if wb.count() >= Self::WRITE_BATCH_MAX_KEYS { + if wb.count() >= self.max_batch_key_size() { wb.write()?; wb.clear(); } diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index 231e50e86900..d518e1ac9236 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -104,7 +104,7 @@ impl RaftEngine for RocksEngine { type LogBatch = RocksWriteBatch; fn log_batch(&self, capacity: usize) -> Self::LogBatch { - RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity) + RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity, self.max_batch_key_size()) } fn sync(&self) -> Result<()> { @@ -161,7 +161,7 @@ impl RaftEngine for RocksEngine { } fn append(&self, raft_group_id: u64, entries: Vec) -> Result { - let mut wb = RocksWriteBatch::new(self.as_inner().clone()); + let mut wb = RocksWriteBatch::new(self.as_inner().clone(), self.max_batch_key_size()); let buf = Vec::with_capacity(1024); wb.append_impl(raft_group_id, &entries, buf)?; self.consume(&mut wb, false) @@ -189,7 +189,7 @@ impl RaftEngine for RocksEngine { for idx in from..to { let key = keys::raft_log_key(raft_group_id, idx); raft_wb.delete(&key)?; - if raft_wb.count() >= Self::WRITE_BATCH_MAX_KEYS { + if raft_wb.count() >= self.max_batch_key_size() { raft_wb.write()?; raft_wb.clear(); } diff --git a/components/engine_rocks/src/write_batch.rs b/components/engine_rocks/src/write_batch.rs index d04e03d42df5..117a54ae8679 100644 --- a/components/engine_rocks/src/write_batch.rs +++ b/components/engine_rocks/src/write_batch.rs @@ -7,6 +7,7 @@ use crate::options::RocksWriteOptions; use crate::util::get_cf_handle; use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteOptions}; use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB}; +use tikv_util::debug; const WRITE_BATCH_MAX_BATCH: usize = 16; const WRITE_BATCH_LIMIT: usize = 16; @@ -23,24 +24,30 @@ impl WriteBatchExt for RocksEngine { } fn write_batch(&self) -> Self::WriteBatch { - Self::WriteBatch::new(Arc::clone(&self.as_inner())) + Self::WriteBatch::new(Arc::clone(&self.as_inner()), self.max_batch_key_size()) } fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch { - Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap) + Self::WriteBatch::with_capacity( + Arc::clone(&self.as_inner()), + cap, + self.max_batch_key_size(), + ) } } pub struct RocksWriteBatch { db: Arc, wb: RawWriteBatch, + max_batch_key_size: usize, } impl RocksWriteBatch { - pub fn new(db: Arc) -> RocksWriteBatch { + pub fn new(db: Arc, max_batch_key_size: usize) -> RocksWriteBatch { RocksWriteBatch { db, wb: RawWriteBatch::default(), + max_batch_key_size, } } @@ -48,17 +55,25 @@ impl RocksWriteBatch { &self.wb } - pub fn with_capacity(db: Arc, cap: usize) -> RocksWriteBatch { + pub fn with_capacity(db: Arc, cap: usize, max_batch_key_size: usize) -> RocksWriteBatch { let wb = if cap == 0 { RawWriteBatch::default() } else { RawWriteBatch::with_capacity(cap) }; - RocksWriteBatch { db, wb } + RocksWriteBatch { + db, + wb, + max_batch_key_size, + } } - pub fn from_raw(db: Arc, wb: RawWriteBatch) -> RocksWriteBatch { - RocksWriteBatch { db, wb } + pub fn from_raw(db: Arc, wb: RawWriteBatch, max_batch_key_size: usize) -> RocksWriteBatch { + RocksWriteBatch { + db, + wb, + max_batch_key_size, + } } pub fn get_db(&self) -> &DB { @@ -91,7 +106,14 @@ impl engine_traits::WriteBatch for RocksWriteBatch { } fn should_write_to_engine(&self) -> bool { - self.wb.count() > RocksEngine::WRITE_BATCH_MAX_KEYS + if self.wb.count() > self.max_batch_key_size { + debug!( + "should_write_to_engine return true"; + "max_batch_key_size" => self.max_batch_key_size, + ); + return true; + } + return false; } fn clear(&mut self) { @@ -335,7 +357,7 @@ mod tests { .unwrap(); assert!(engine.support_write_batch_vec()); let mut wb = engine.write_batch(); - for _i in 0..RocksEngine::WRITE_BATCH_MAX_KEYS { + for _i in 0..engine.max_batch_key_size() { wb.put(b"aaa", b"bbb").unwrap(); } assert!(!wb.should_write_to_engine()); diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 6469fe707548..fd3452e542c4 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -176,6 +176,8 @@ pub struct Config { pub apply_yield_duration: ReadableDuration, #[config(skip)] pub disable_kv_wal: bool, + #[config(skip)] + pub max_batch_key_size: usize, pub enable_propose_batch: bool, pub skip_header: bool, @@ -263,6 +265,7 @@ impl Default for Config { dev_assert: false, apply_yield_duration: ReadableDuration::millis(500), disable_kv_wal: false, + max_batch_key_size: 512, enable_propose_batch: true, skip_header: false, diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index f3e7bea18226..b045c4fa3ccd 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -11,7 +11,7 @@ use batch_system::{BasicMailbox, Fsm}; use collections::HashMap; use engine_traits::CF_RAFT; use engine_traits::{ - Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch, WriteBatchExt, + Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch, }; use error_code::ErrorCodeExt; use fail::fail_point; @@ -387,14 +387,14 @@ where self.batch_req_size += req_size; } - fn should_finish(&self) -> bool { + fn should_finish(&self, max_batch_keys: usize) -> bool { if let Some(batch_req) = self.request.as_ref() { // Limit the size of batch request so that it will not exceed raft_entry_max_size after // adding header. if f64::from(self.batch_req_size) > self.raft_entry_max_size * 0.4 { return true; } - if batch_req.get_requests().len() > ::WRITE_BATCH_MAX_KEYS { + if batch_req.get_requests().len() > max_batch_keys { return true; } } @@ -554,7 +554,7 @@ where let req_size = cmd.request.compute_size(); if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) { self.fsm.batch_req_builder.add(cmd, req_size); - if self.fsm.batch_req_builder.should_finish() { + if self.fsm.batch_req_builder.should_finish(self.ctx.cfg.max_batch_key_size) { self.propose_batch_raft_command(); } } else { diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 4ad590969e8f..2df3400447db 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -1137,6 +1137,8 @@ impl TiKVServer { let shared_block_cache = block_cache.is_some(); kv_engine.set_shared_block_cache(shared_block_cache); raft_engine.set_shared_block_cache(shared_block_cache); + kv_engine.set_max_batch_key_size(self.config.raft_store.max_batch_key_size); + raft_engine.set_max_batch_key_size(self.config.raft_store.max_batch_key_size); let engines = Engines::new(kv_engine, raft_engine); check_and_dump_raft_engine(&self.config, &engines.raft, 8); @@ -1209,6 +1211,7 @@ impl TiKVServer { let mut kv_engine = RocksEngine::from_db(Arc::new(kv_engine)); let shared_block_cache = block_cache.is_some(); kv_engine.set_shared_block_cache(shared_block_cache); + kv_engine.set_max_batch_key_size(self.config.raft_store.max_batch_key_size); let engines = Engines::new(kv_engine, raft_engine); let cfg_controller = self.cfg_controller.as_mut().unwrap(); diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 439f2506c69e..ad177d6148c7 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -209,6 +209,9 @@ fn test_serde_custom_tikv_config() { dev_assert: true, apply_yield_duration: ReadableDuration::millis(333), perf_level: PerfLevel::EnableTime, + enable_propose_batch: true, + max_batch_key_size: 256, + skip_header: false, }; value.pd = PdConfig::new(vec!["example.com:443".to_owned()]); let titan_cf_config = TitanCfConfig {