Skip to content

Commit

Permalink
add max-batch-key-size config for batch sizein apply
Browse files Browse the repository at this point in the history
  • Loading branch information
qi.xu committed Aug 25, 2021
1 parent 339c834 commit 57d72b6
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 19 deletions.
12 changes: 11 additions & 1 deletion components/engine_rocks/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::Path;
use std::sync::Arc;

use engine_traits::{
Error, IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable,
Error, IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable,WriteBatchExt,
};
use rocksdb::{DBIterator, Writable, DB};

Expand All @@ -26,13 +26,15 @@ use crate::{RocksEngineIterator, RocksSnapshot};
pub struct RocksEngine {
db: Arc<DB>,
shared_block_cache: bool,
max_batch_key_size: usize,
}

impl RocksEngine {
pub fn from_db(db: Arc<DB>) -> Self {
RocksEngine {
db,
shared_block_cache: false,
max_batch_key_size: Self::WRITE_BATCH_MAX_KEYS,
}
}

Expand Down Expand Up @@ -63,6 +65,14 @@ impl RocksEngine {
pub fn set_shared_block_cache(&mut self, enable: bool) {
self.shared_block_cache = enable;
}

pub fn set_max_batch_key_size(&mut self, max_batch_key_size: usize) {
self.max_batch_key_size = max_batch_key_size;
}

pub fn max_batch_key_size(&self) -> usize {
return self.max_batch_key_size;
}
}

impl KvEngine for RocksEngine {
Expand Down
4 changes: 2 additions & 2 deletions components/engine_rocks/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl RocksEngine {
let mut wb = self.write_batch();
for key in data.iter() {
wb.delete_cf(cf, key)?;
if wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
if wb.count() >= self.max_batch_key_size() {
wb.write()?;
wb.clear();
}
Expand All @@ -115,7 +115,7 @@ impl RocksEngine {
let mut wb = self.write_batch();
while it_valid {
wb.delete_cf(cf, it.key())?;
if wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
if wb.count() >= self.max_batch_key_size() {
wb.write()?;
wb.clear();
}
Expand Down
6 changes: 3 additions & 3 deletions components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl RaftEngine for RocksEngine {
type LogBatch = RocksWriteBatch;

fn log_batch(&self, capacity: usize) -> Self::LogBatch {
RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity)
RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity, self.max_batch_key_size())
}

fn sync(&self) -> Result<()> {
Expand Down Expand Up @@ -161,7 +161,7 @@ impl RaftEngine for RocksEngine {
}

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
let mut wb = RocksWriteBatch::new(self.as_inner().clone());
let mut wb = RocksWriteBatch::new(self.as_inner().clone(), self.max_batch_key_size());
let buf = Vec::with_capacity(1024);
wb.append_impl(raft_group_id, &entries, buf)?;
self.consume(&mut wb, false)
Expand Down Expand Up @@ -189,7 +189,7 @@ impl RaftEngine for RocksEngine {
for idx in from..to {
let key = keys::raft_log_key(raft_group_id, idx);
raft_wb.delete(&key)?;
if raft_wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
if raft_wb.count() >= self.max_batch_key_size() {
raft_wb.write()?;
raft_wb.clear();
}
Expand Down
40 changes: 31 additions & 9 deletions components/engine_rocks/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::options::RocksWriteOptions;
use crate::util::get_cf_handle;
use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteOptions};
use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB};
use tikv_util::debug;

const WRITE_BATCH_MAX_BATCH: usize = 16;
const WRITE_BATCH_LIMIT: usize = 16;
Expand All @@ -23,42 +24,56 @@ impl WriteBatchExt for RocksEngine {
}

fn write_batch(&self) -> Self::WriteBatch {
Self::WriteBatch::new(Arc::clone(&self.as_inner()))
Self::WriteBatch::new(Arc::clone(&self.as_inner()), self.max_batch_key_size())
}

fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap)
Self::WriteBatch::with_capacity(
Arc::clone(&self.as_inner()),
cap,
self.max_batch_key_size(),
)
}
}

pub struct RocksWriteBatch {
db: Arc<DB>,
wb: RawWriteBatch,
max_batch_key_size: usize,
}

impl RocksWriteBatch {
pub fn new(db: Arc<DB>) -> RocksWriteBatch {
pub fn new(db: Arc<DB>, max_batch_key_size: usize) -> RocksWriteBatch {
RocksWriteBatch {
db,
wb: RawWriteBatch::default(),
max_batch_key_size,
}
}

pub fn as_inner(&self) -> &RawWriteBatch {
&self.wb
}

pub fn with_capacity(db: Arc<DB>, cap: usize) -> RocksWriteBatch {
pub fn with_capacity(db: Arc<DB>, cap: usize, max_batch_key_size: usize) -> RocksWriteBatch {
let wb = if cap == 0 {
RawWriteBatch::default()
} else {
RawWriteBatch::with_capacity(cap)
};
RocksWriteBatch { db, wb }
RocksWriteBatch {
db,
wb,
max_batch_key_size,
}
}

pub fn from_raw(db: Arc<DB>, wb: RawWriteBatch) -> RocksWriteBatch {
RocksWriteBatch { db, wb }
pub fn from_raw(db: Arc<DB>, wb: RawWriteBatch, max_batch_key_size: usize) -> RocksWriteBatch {
RocksWriteBatch {
db,
wb,
max_batch_key_size,
}
}

pub fn get_db(&self) -> &DB {
Expand Down Expand Up @@ -91,7 +106,14 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatch {
}

fn should_write_to_engine(&self) -> bool {
self.wb.count() > RocksEngine::WRITE_BATCH_MAX_KEYS
if self.wb.count() > self.max_batch_key_size {
debug!(
"should_write_to_engine return true";
"max_batch_key_size" => self.max_batch_key_size,
);
return true;
}
return false;
}

fn clear(&mut self) {
Expand Down Expand Up @@ -335,7 +357,7 @@ mod tests {
.unwrap();
assert!(engine.support_write_batch_vec());
let mut wb = engine.write_batch();
for _i in 0..RocksEngine::WRITE_BATCH_MAX_KEYS {
for _i in 0..engine.max_batch_key_size() {
wb.put(b"aaa", b"bbb").unwrap();
}
assert!(!wb.should_write_to_engine());
Expand Down
3 changes: 3 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub struct Config {
pub apply_yield_duration: ReadableDuration,
#[config(skip)]
pub disable_kv_wal: bool,
#[config(skip)]
pub max_batch_key_size: usize,
pub enable_propose_batch: bool,
pub skip_header: bool,

Expand Down Expand Up @@ -263,6 +265,7 @@ impl Default for Config {
dev_assert: false,
apply_yield_duration: ReadableDuration::millis(500),
disable_kv_wal: false,
max_batch_key_size: 512,
enable_propose_batch: true,
skip_header: false,

Expand Down
8 changes: 4 additions & 4 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use batch_system::{BasicMailbox, Fsm};
use collections::HashMap;
use engine_traits::CF_RAFT;
use engine_traits::{
Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch, WriteBatchExt,
Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch,
};
use error_code::ErrorCodeExt;
use fail::fail_point;
Expand Down Expand Up @@ -387,14 +387,14 @@ where
self.batch_req_size += req_size;
}

fn should_finish(&self) -> bool {
fn should_finish(&self, max_batch_keys: usize) -> bool {
if let Some(batch_req) = self.request.as_ref() {
// Limit the size of batch request so that it will not exceed raft_entry_max_size after
// adding header.
if f64::from(self.batch_req_size) > self.raft_entry_max_size * 0.4 {
return true;
}
if batch_req.get_requests().len() > <E as WriteBatchExt>::WRITE_BATCH_MAX_KEYS {
if batch_req.get_requests().len() > max_batch_keys {
return true;
}
}
Expand Down Expand Up @@ -554,7 +554,7 @@ where
let req_size = cmd.request.compute_size();
if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) {
self.fsm.batch_req_builder.add(cmd, req_size);
if self.fsm.batch_req_builder.should_finish() {
if self.fsm.batch_req_builder.should_finish(self.ctx.cfg.max_batch_key_size) {
self.propose_batch_raft_command();
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,8 @@ impl TiKVServer<RocksEngine> {
let shared_block_cache = block_cache.is_some();
kv_engine.set_shared_block_cache(shared_block_cache);
raft_engine.set_shared_block_cache(shared_block_cache);
kv_engine.set_max_batch_key_size(self.config.raft_store.max_batch_key_size);
raft_engine.set_max_batch_key_size(self.config.raft_store.max_batch_key_size);
let engines = Engines::new(kv_engine, raft_engine);

check_and_dump_raft_engine(&self.config, &engines.raft, 8);
Expand Down Expand Up @@ -1209,6 +1211,7 @@ impl TiKVServer<RaftLogEngine> {
let mut kv_engine = RocksEngine::from_db(Arc::new(kv_engine));
let shared_block_cache = block_cache.is_some();
kv_engine.set_shared_block_cache(shared_block_cache);
kv_engine.set_max_batch_key_size(self.config.raft_store.max_batch_key_size);
let engines = Engines::new(kv_engine, raft_engine);

let cfg_controller = self.cfg_controller.as_mut().unwrap();
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ fn test_serde_custom_tikv_config() {
dev_assert: true,
apply_yield_duration: ReadableDuration::millis(333),
perf_level: PerfLevel::EnableTime,
enable_propose_batch: true,
max_batch_key_size: 256,
skip_header: false,
};
value.pd = PdConfig::new(vec!["example.com:443".to_owned()]);
let titan_cf_config = TitanCfConfig {
Expand Down

0 comments on commit 57d72b6

Please sign in to comment.