Skip to content

Commit

Permalink
add config to specify the batch size in apply
Browse files Browse the repository at this point in the history
  • Loading branch information
qi.xu committed Aug 25, 2021
1 parent 339c834 commit 77badbc
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 22 deletions.
12 changes: 12 additions & 0 deletions components/engine_panic/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ impl WriteBatchExt for PanicEngine {
fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
panic!()
}

fn write_batch_with_cap_and_max_keys(
&self,
cap: usize,
max_batch_key_size: usize,
) -> Self::WriteBatch {
panic!()
}
}

pub struct PanicWriteBatch;
Expand All @@ -28,6 +36,10 @@ impl WriteBatch<PanicEngine> for PanicWriteBatch {
panic!()
}

fn with_capacity_and_max_entries(_: &PanicEngine, _: usize, _: usize) -> Self {
panic!()
}

fn write_opt(&self, _: &WriteOptions) -> Result<()> {
panic!()
}
Expand Down
8 changes: 6 additions & 2 deletions components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ impl RaftEngine for RocksEngine {
type LogBatch = RocksWriteBatch;

fn log_batch(&self, capacity: usize) -> Self::LogBatch {
RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity)
RocksWriteBatch::with_capacity(
self.as_inner().clone(),
capacity,
Self::WRITE_BATCH_MAX_KEYS,
)
}

fn sync(&self) -> Result<()> {
Expand Down Expand Up @@ -161,7 +165,7 @@ impl RaftEngine for RocksEngine {
}

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
let mut wb = RocksWriteBatch::new(self.as_inner().clone());
let mut wb = RocksWriteBatch::new(self.as_inner().clone(), Self::WRITE_BATCH_MAX_KEYS);
let buf = Vec::with_capacity(1024);
wb.append_impl(raft_group_id, &entries, buf)?;
self.consume(&mut wb, false)
Expand Down
87 changes: 75 additions & 12 deletions components/engine_rocks/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::options::RocksWriteOptions;
use crate::util::get_cf_handle;
use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteOptions};
use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB};
use tikv_util::debug;

const WRITE_BATCH_MAX_BATCH: usize = 16;
const WRITE_BATCH_LIMIT: usize = 16;
Expand All @@ -23,42 +24,64 @@ impl WriteBatchExt for RocksEngine {
}

fn write_batch(&self) -> Self::WriteBatch {
Self::WriteBatch::new(Arc::clone(&self.as_inner()))
Self::WriteBatch::new(Arc::clone(&self.as_inner()), Self::WRITE_BATCH_MAX_KEYS)
}

fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap)
Self::WriteBatch::with_capacity(
Arc::clone(&self.as_inner()),
cap,
Self::WRITE_BATCH_MAX_KEYS,
)
}

fn write_batch_with_cap_and_max_keys(
&self,
cap: usize,
max_batch_key_size: usize,
) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap, max_batch_key_size)
}
}

pub struct RocksWriteBatch {
db: Arc<DB>,
wb: RawWriteBatch,
max_batch_key_size: usize,
}

impl RocksWriteBatch {
pub fn new(db: Arc<DB>) -> RocksWriteBatch {
pub fn new(db: Arc<DB>, max_batch_key_size: usize) -> RocksWriteBatch {
RocksWriteBatch {
db,
wb: RawWriteBatch::default(),
max_batch_key_size,
}
}

pub fn as_inner(&self) -> &RawWriteBatch {
&self.wb
}

pub fn with_capacity(db: Arc<DB>, cap: usize) -> RocksWriteBatch {
pub fn with_capacity(db: Arc<DB>, cap: usize, max_batch_key_size: usize) -> RocksWriteBatch {
let wb = if cap == 0 {
RawWriteBatch::default()
} else {
RawWriteBatch::with_capacity(cap)
};
RocksWriteBatch { db, wb }
RocksWriteBatch {
db,
wb,
max_batch_key_size,
}
}

pub fn from_raw(db: Arc<DB>, wb: RawWriteBatch) -> RocksWriteBatch {
RocksWriteBatch { db, wb }
pub fn from_raw(db: Arc<DB>, wb: RawWriteBatch, max_batch_key_size: usize) -> RocksWriteBatch {
RocksWriteBatch {
db,
wb,
max_batch_key_size,
}
}

pub fn get_db(&self) -> &DB {
Expand All @@ -71,6 +94,14 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatch {
e.write_batch_with_cap(cap)
}

fn with_capacity_and_max_entries(
e: &RocksEngine,
cap: usize,
max_entries: usize,
) -> RocksWriteBatch {
e.write_batch_with_cap_and_max_keys(cap, max_entries)
}

fn write_opt(&self, opts: &WriteOptions) -> Result<()> {
let opt: RocksWriteOptions = opts.into();
self.get_db()
Expand All @@ -91,7 +122,14 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatch {
}

fn should_write_to_engine(&self) -> bool {
self.wb.count() > RocksEngine::WRITE_BATCH_MAX_KEYS
if self.wb.count() > self.max_batch_key_size {
debug!(
"should_write_to_engine return true";
"max_batch_key_size" => self.max_batch_key_size,
);
return true;
}
return false;
}

fn clear(&mut self) {
Expand Down Expand Up @@ -156,11 +194,17 @@ pub struct RocksWriteBatchVec {
save_points: Vec<usize>,
index: usize,
cur_batch_size: usize,
batch_size_limit: usize,
batch_size_limit: usize, // the max size of the single batch
max_batch_count: usize, // the max total batch count
}

impl RocksWriteBatchVec {
pub fn new(db: Arc<DB>, batch_size_limit: usize, cap: usize) -> RocksWriteBatchVec {
pub fn new(
db: Arc<DB>,
batch_size_limit: usize,
max_batch_count: usize,
cap: usize,
) -> RocksWriteBatchVec {
let wb = RawWriteBatch::with_capacity(cap);
RocksWriteBatchVec {
db,
Expand All @@ -169,6 +213,7 @@ impl RocksWriteBatchVec {
index: 0,
cur_batch_size: 0,
batch_size_limit,
max_batch_count,
}
}

Expand Down Expand Up @@ -200,7 +245,25 @@ impl RocksWriteBatchVec {

impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatchVec {
fn with_capacity(e: &RocksEngine, cap: usize) -> RocksWriteBatchVec {
RocksWriteBatchVec::new(e.as_inner().clone(), WRITE_BATCH_LIMIT, cap)
RocksWriteBatchVec::new(
e.as_inner().clone(),
WRITE_BATCH_LIMIT,
WRITE_BATCH_MAX_BATCH,
cap,
)
}

fn with_capacity_and_max_entries(
e: &RocksEngine,
cap: usize,
max_batch_count: usize,
) -> RocksWriteBatchVec {
RocksWriteBatchVec::new(
e.as_inner().clone(),
WRITE_BATCH_LIMIT,
max_batch_count,
cap,
)
}

fn write_opt(&self, opts: &WriteOptions) -> Result<()> {
Expand Down Expand Up @@ -229,7 +292,7 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatchVec {
}

fn should_write_to_engine(&self) -> bool {
self.index >= WRITE_BATCH_MAX_BATCH
self.index >= self.max_batch_count
}

fn clear(&mut self) {
Expand Down
8 changes: 8 additions & 0 deletions components/engine_traits/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pub trait WriteBatchExt: Sized {

fn write_batch(&self) -> Self::WriteBatch;
fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch;
fn write_batch_with_cap_and_max_keys(
&self,
cap: usize,
max_batch_key_size: usize,
) -> Self::WriteBatch;
}

/// A trait implemented by WriteBatch
Expand Down Expand Up @@ -83,6 +88,9 @@ pub trait WriteBatch<E: WriteBatchExt + Sized>: Mutable {
/// Create a WriteBatch with a given command capacity
fn with_capacity(e: &E, cap: usize) -> Self;

/// Create a WriteBatch with a given command capacity and max entries
fn with_capacity_and_max_entries(e: &E, cap: usize, max_entries: usize) -> Self;

/// Commit the WriteBatch to disk with the given options
fn write_opt(&self, opts: &WriteOptions) -> Result<()>;

Expand Down
6 changes: 6 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ pub struct Config {
pub apply_yield_duration: ReadableDuration,
#[config(skip)]
pub disable_kv_wal: bool,
#[config(skip)]
pub max_batch_key_size: usize,
#[config(skip)]
pub max_batch_count: usize,
pub enable_propose_batch: bool,
pub skip_header: bool,

Expand Down Expand Up @@ -263,6 +267,8 @@ impl Default for Config {
dev_assert: false,
apply_yield_duration: ReadableDuration::millis(500),
disable_kv_wal: false,
max_batch_key_size: 256,
max_batch_count: 16,
enable_propose_batch: true,
skip_header: false,

Expand Down
16 changes: 14 additions & 2 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ where
// Whether to use the delete range API instead of deleting one by one.
use_delete_range: bool,
disable_kv_wal: bool,
max_batch_limit: usize,

perf_context: EK::PerfContext,

Expand Down Expand Up @@ -428,7 +429,13 @@ where
) -> ApplyContext<EK, W> {
// If `enable_multi_batch_write` was set true, we create `RocksWriteBatchVec`.
// Otherwise create `RocksWriteBatch`.
let kv_wb = W::with_capacity(&engine, DEFAULT_APPLY_WB_SIZE);
let max_batch_limit = if engine.support_write_batch_vec() {
cfg.max_batch_count
} else {
cfg.max_batch_key_size
};
let kv_wb =
W::with_capacity_and_max_entries(&engine, DEFAULT_APPLY_WB_SIZE, max_batch_limit);

ApplyContext {
tag,
Expand All @@ -447,6 +454,7 @@ where
committed_count: 0,
sync_log_hint: false,
disable_kv_wal: cfg.disable_kv_wal,
max_batch_limit,
exec_ctx: None,
use_delete_range: cfg.use_delete_range,
perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply),
Expand Down Expand Up @@ -524,7 +532,11 @@ where
if data_size > APPLY_WB_SHRINK_SIZE {
// Control the memory usage for the WriteBatch. Whether it's `RocksWriteBatch` or
// `RocksWriteBatchVec` depends on the `enable_multi_batch_write` configuration.
self.kv_wb = W::with_capacity(&self.engine, DEFAULT_APPLY_WB_SIZE);
self.kv_wb = W::with_capacity_and_max_entries(
&self.engine,
DEFAULT_APPLY_WB_SIZE,
self.max_batch_limit,
);
} else {
// Clear data, reuse the WriteBatch, this can reduce memory allocations and deallocations.
self.kv_wb_mut().clear();
Expand Down
14 changes: 8 additions & 6 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use std::{cmp, mem, u64};
use batch_system::{BasicMailbox, Fsm};
use collections::HashMap;
use engine_traits::CF_RAFT;
use engine_traits::{
Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch, WriteBatchExt,
};
use engine_traits::{Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch};
use error_code::ErrorCodeExt;
use fail::fail_point;
use kvproto::errorpb;
Expand Down Expand Up @@ -387,14 +385,14 @@ where
self.batch_req_size += req_size;
}

fn should_finish(&self) -> bool {
fn should_finish(&self, max_batch_keys: usize) -> bool {
if let Some(batch_req) = self.request.as_ref() {
// Limit the size of batch request so that it will not exceed raft_entry_max_size after
// adding header.
if f64::from(self.batch_req_size) > self.raft_entry_max_size * 0.4 {
return true;
}
if batch_req.get_requests().len() > <E as WriteBatchExt>::WRITE_BATCH_MAX_KEYS {
if batch_req.get_requests().len() > max_batch_keys {
return true;
}
}
Expand Down Expand Up @@ -554,7 +552,11 @@ where
let req_size = cmd.request.compute_size();
if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) {
self.fsm.batch_req_builder.add(cmd, req_size);
if self.fsm.batch_req_builder.should_finish() {
if self
.fsm
.batch_req_builder
.should_finish(self.ctx.cfg.max_batch_key_size)
{
self.propose_batch_raft_command();
}
} else {
Expand Down
4 changes: 4 additions & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ fn test_serde_custom_tikv_config() {
dev_assert: true,
apply_yield_duration: ReadableDuration::millis(333),
perf_level: PerfLevel::EnableTime,
enable_propose_batch: true,
max_batch_key_size: 256,
skip_header: false,
max_batch_count: 16,
};
value.pd = PdConfig::new(vec!["example.com:443".to_owned()]);
let titan_cf_config = TitanCfConfig {
Expand Down

0 comments on commit 77badbc

Please sign in to comment.