Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add max-batch-key-size config for batch size instead of the hardcoded MAX_BATCH_KEY_SIZE #6

Open
wants to merge 6 commits into
base: patch-5.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ test_util = { path = "components/test_util", default-features = false }
tokio = { version = "0.2", features = ["macros", "rt-threaded", "time"] }
zipf = "6.1.0"

[build-dependencies]
example_plugin = { path = "components/test_coprocessor_plugin/example_plugin" }

[patch.crates-io]
# TODO: remove this when new raft-rs is published.
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false }
Expand Down
54 changes: 0 additions & 54 deletions build.rs

This file was deleted.

1 change: 1 addition & 0 deletions components/cdc/test.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2021/08/19 16:41:41.446 lib.rs:512: [INFO] environment variable is present, GRPC_POLL_STRATEGY: epollex
12 changes: 12 additions & 0 deletions components/engine_panic/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ impl WriteBatchExt for PanicEngine {
fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
panic!()
}

fn write_batch_with_cap_and_max_keys(
&self,
cap: usize,
max_batch_key_size: usize,
) -> Self::WriteBatch {
panic!()
}
}

pub struct PanicWriteBatch;
Expand All @@ -28,6 +36,10 @@ impl WriteBatch<PanicEngine> for PanicWriteBatch {
panic!()
}

fn with_capacity_and_max_entries(_: &PanicEngine, _: usize, _: usize) -> Self {
panic!()
}

fn write_opt(&self, _: &WriteOptions) -> Result<()> {
panic!()
}
Expand Down
2 changes: 1 addition & 1 deletion components/engine_rocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protobuf = "2"
fail = "0.4"

[dependencies.rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
git = "https://github.com/tonyxuqqi/rust-rocksdb.git"
package = "rocksdb"
branch = "tikv-5.0"
features = ["encryption", "static_libcpp"]
Expand Down
1 change: 1 addition & 0 deletions components/engine_rocks/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ impl From<engine_traits::ReadOptions> for RocksReadOptions {
fn from(opts: engine_traits::ReadOptions) -> Self {
let mut r = RawReadOptions::default();
r.fill_cache(opts.fill_cache());
r.set_read_tier(opts.read_tier() as i32);
RocksReadOptions(r)
}
}
Expand Down
8 changes: 6 additions & 2 deletions components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ impl RaftEngine for RocksEngine {
type LogBatch = RocksWriteBatch;

fn log_batch(&self, capacity: usize) -> Self::LogBatch {
RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity)
RocksWriteBatch::with_capacity(
self.as_inner().clone(),
capacity,
Self::WRITE_BATCH_MAX_KEYS,
)
}

fn sync(&self) -> Result<()> {
Expand Down Expand Up @@ -161,7 +165,7 @@ impl RaftEngine for RocksEngine {
}

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
let mut wb = RocksWriteBatch::new(self.as_inner().clone());
let mut wb = RocksWriteBatch::new(self.as_inner().clone(), Self::WRITE_BATCH_MAX_KEYS);
let buf = Vec::with_capacity(1024);
wb.append_impl(raft_group_id, &entries, buf)?;
self.consume(&mut wb, false)
Expand Down
87 changes: 75 additions & 12 deletions components/engine_rocks/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::options::RocksWriteOptions;
use crate::util::get_cf_handle;
use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteOptions};
use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB};
use tikv_util::debug;

const WRITE_BATCH_MAX_BATCH: usize = 16;
const WRITE_BATCH_LIMIT: usize = 16;
Expand All @@ -23,42 +24,64 @@ impl WriteBatchExt for RocksEngine {
}

fn write_batch(&self) -> Self::WriteBatch {
Self::WriteBatch::new(Arc::clone(&self.as_inner()))
Self::WriteBatch::new(Arc::clone(&self.as_inner()), Self::WRITE_BATCH_MAX_KEYS)
}

fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap)
Self::WriteBatch::with_capacity(
Arc::clone(&self.as_inner()),
cap,
Self::WRITE_BATCH_MAX_KEYS,
)
}

fn write_batch_with_cap_and_max_keys(
&self,
cap: usize,
max_batch_key_size: usize,
) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap, max_batch_key_size)
}
}

pub struct RocksWriteBatch {
db: Arc<DB>,
wb: RawWriteBatch,
max_batch_key_size: usize,
}

impl RocksWriteBatch {
pub fn new(db: Arc<DB>) -> RocksWriteBatch {
pub fn new(db: Arc<DB>, max_batch_key_size: usize) -> RocksWriteBatch {
RocksWriteBatch {
db,
wb: RawWriteBatch::default(),
max_batch_key_size,
}
}

pub fn as_inner(&self) -> &RawWriteBatch {
&self.wb
}

pub fn with_capacity(db: Arc<DB>, cap: usize) -> RocksWriteBatch {
pub fn with_capacity(db: Arc<DB>, cap: usize, max_batch_key_size: usize) -> RocksWriteBatch {
let wb = if cap == 0 {
RawWriteBatch::default()
} else {
RawWriteBatch::with_capacity(cap)
};
RocksWriteBatch { db, wb }
RocksWriteBatch {
db,
wb,
max_batch_key_size,
}
}

pub fn from_raw(db: Arc<DB>, wb: RawWriteBatch) -> RocksWriteBatch {
RocksWriteBatch { db, wb }
pub fn from_raw(db: Arc<DB>, wb: RawWriteBatch, max_batch_key_size: usize) -> RocksWriteBatch {
RocksWriteBatch {
db,
wb,
max_batch_key_size,
}
}

pub fn get_db(&self) -> &DB {
Expand All @@ -71,6 +94,14 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatch {
e.write_batch_with_cap(cap)
}

fn with_capacity_and_max_entries(
e: &RocksEngine,
cap: usize,
max_entries: usize,
) -> RocksWriteBatch {
e.write_batch_with_cap_and_max_keys(cap, max_entries)
}

fn write_opt(&self, opts: &WriteOptions) -> Result<()> {
let opt: RocksWriteOptions = opts.into();
self.get_db()
Expand All @@ -91,7 +122,14 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatch {
}

fn should_write_to_engine(&self) -> bool {
self.wb.count() > RocksEngine::WRITE_BATCH_MAX_KEYS
if self.wb.count() > self.max_batch_key_size {
debug!(
"should_write_to_engine return true";
"max_batch_key_size" => self.max_batch_key_size,
);
return true;
}
return false;
}

fn clear(&mut self) {
Expand Down Expand Up @@ -156,11 +194,17 @@ pub struct RocksWriteBatchVec {
save_points: Vec<usize>,
index: usize,
cur_batch_size: usize,
batch_size_limit: usize,
batch_size_limit: usize, // the max size of the single batch
max_batch_count: usize, // the max total batch count
}

impl RocksWriteBatchVec {
pub fn new(db: Arc<DB>, batch_size_limit: usize, cap: usize) -> RocksWriteBatchVec {
pub fn new(
db: Arc<DB>,
batch_size_limit: usize,
max_batch_count: usize,
cap: usize,
) -> RocksWriteBatchVec {
let wb = RawWriteBatch::with_capacity(cap);
RocksWriteBatchVec {
db,
Expand All @@ -169,6 +213,7 @@ impl RocksWriteBatchVec {
index: 0,
cur_batch_size: 0,
batch_size_limit,
max_batch_count,
}
}

Expand Down Expand Up @@ -200,7 +245,25 @@ impl RocksWriteBatchVec {

impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatchVec {
fn with_capacity(e: &RocksEngine, cap: usize) -> RocksWriteBatchVec {
RocksWriteBatchVec::new(e.as_inner().clone(), WRITE_BATCH_LIMIT, cap)
RocksWriteBatchVec::new(
e.as_inner().clone(),
WRITE_BATCH_LIMIT,
WRITE_BATCH_MAX_BATCH,
cap,
)
}

fn with_capacity_and_max_entries(
e: &RocksEngine,
cap: usize,
max_batch_count: usize,
) -> RocksWriteBatchVec {
RocksWriteBatchVec::new(
e.as_inner().clone(),
WRITE_BATCH_LIMIT,
max_batch_count,
cap,
)
}

fn write_opt(&self, opts: &WriteOptions) -> Result<()> {
Expand Down Expand Up @@ -229,7 +292,7 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatchVec {
}

fn should_write_to_engine(&self) -> bool {
self.index >= WRITE_BATCH_MAX_BATCH
self.index >= self.max_batch_count
}

fn clear(&mut self) {
Expand Down
24 changes: 23 additions & 1 deletion components/engine_traits/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@
use std::ops::Bound;
use tikv_util::keybuilder::KeyBuilder;

#[repr(i32)]
#[derive(Clone, Copy)]
pub enum ReadTier {
ReadAllTier = 0,
BlockCacheTier = 1,
PersistedTier = 2,
}

#[derive(Clone)]
pub struct ReadOptions {
fill_cache: bool,
read_tier: ReadTier,
}

impl ReadOptions {
Expand All @@ -21,11 +30,24 @@ impl ReadOptions {
pub fn set_fill_cache(&mut self, v: bool) {
self.fill_cache = v;
}

#[inline]
pub fn read_tier(&self) -> ReadTier {
self.read_tier
}

#[inline]
pub fn set_read_tier(&mut self, v: ReadTier) {
self.read_tier = v;
}
}

impl Default for ReadOptions {
fn default() -> ReadOptions {
ReadOptions { fill_cache: true }
ReadOptions {
fill_cache: true,
read_tier: ReadTier::ReadAllTier, // all tier
}
}
}

Expand Down
Loading