Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose more rocksdb options #1033

Merged
merged 7 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,63 @@ impl WalsOpener for RocksDBWalsOpener {
let data_path = Path::new(&rocksdb_wal_config.data_dir);
let wal_path = data_path.join(WAL_DIR_NAME);
let data_wal = RocksWalBuilder::new(wal_path, write_runtime.clone())
.max_subcompactions(rocksdb_wal_config.data_namespace.max_subcompactions)
.max_background_jobs(rocksdb_wal_config.data_namespace.max_background_jobs)
.enable_statistics(rocksdb_wal_config.data_namespace.enable_statistics)
.write_buffer_size(rocksdb_wal_config.data_namespace.write_buffer_size.0)
.max_write_buffer_number(rocksdb_wal_config.data_namespace.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_file_num_compaction_trigger,
)
.level_zero_slowdown_writes_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_slowdown_writes_trigger,
)
.level_zero_stop_writes_trigger(
rocksdb_wal_config
.data_namespace
.level_zero_stop_writes_trigger,
)
.fifo_compaction_max_table_files_size(
rocksdb_wal_config
.data_namespace
.fifo_compaction_max_table_files_size
.0,
)
.build()
.context(OpenWal)?;

let manifest_path = data_path.join(MANIFEST_DIR_NAME);
let manifest_wal = RocksWalBuilder::new(manifest_path, write_runtime)
.max_subcompactions(rocksdb_wal_config.meta_namespace.max_subcompactions)
.max_background_jobs(rocksdb_wal_config.meta_namespace.max_background_jobs)
.enable_statistics(rocksdb_wal_config.meta_namespace.enable_statistics)
.write_buffer_size(rocksdb_wal_config.meta_namespace.write_buffer_size.0)
.max_write_buffer_number(rocksdb_wal_config.meta_namespace.max_write_buffer_number)
.level_zero_file_num_compaction_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_file_num_compaction_trigger,
)
.level_zero_slowdown_writes_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_slowdown_writes_trigger,
)
.level_zero_stop_writes_trigger(
rocksdb_wal_config
.meta_namespace
.level_zero_stop_writes_trigger,
)
.fifo_compaction_max_table_files_size(
rocksdb_wal_config
.meta_namespace
.fifo_compaction_max_table_files_size
.0,
)
.build()
.context(OpenManifestWal)?;
let opened_wals = OpenedWals {
Expand Down
21 changes: 21 additions & 0 deletions wal/src/rocks_impl/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,43 @@

//! RocksDB Config

use common_util::config::ReadableSize;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub max_subcompactions: u32,
pub max_background_jobs: i32,
pub enable_statistics: bool,
pub write_buffer_size: ReadableSize,
pub max_write_buffer_number: i32,
// Number of files to trigger level-0 compaction. A value <0 means that level-0 compaction will
// not be triggered by number of files at all.
pub level_zero_file_num_compaction_trigger: i32,
// Soft limit on number of level-0 files. We start slowing down writes at this point. A value
// <0 means that no writing slow down will be triggered by number of files in level-0.
pub level_zero_slowdown_writes_trigger: i32,
// Maximum number of level-0 files. We stop writes at this point.
pub level_zero_stop_writes_trigger: i32,
pub fifo_compaction_max_table_files_size: ReadableSize,
}

impl Default for Config {
fn default() -> Self {
Self {
// Same with rocksdb
// https://github.com/facebook/rocksdb/blob/v6.4.6/include/rocksdb/options.h#L537
max_subcompactions: 1,
max_background_jobs: 2,
enable_statistics: false,
write_buffer_size: ReadableSize::mb(64),
max_write_buffer_number: 2,
level_zero_file_num_compaction_trigger: 4,
level_zero_slowdown_writes_trigger: 20,
level_zero_stop_writes_trigger: 36,
// default is 1G, use 0 to disable fifo
fifo_compaction_max_table_files_size: ReadableSize::gb(0),
}
}
}
90 changes: 88 additions & 2 deletions wal/src/rocks_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use common_types::{
};
use common_util::{error::BoxError, runtime::Runtime};
use log::{debug, info, warn};
use rocksdb::{DBIterator, DBOptions, ReadOptions, SeekKey, Statistics, Writable, WriteBatch, DB};
use rocksdb::{
rocksdb_options::ColumnFamilyDescriptor, ColumnFamilyOptions, DBCompactionStyle, DBIterator,
DBOptions, FifoCompactionOptions, ReadOptions, SeekKey, Statistics, Writable, WriteBatch, DB,
};
use snafu::ResultExt;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -525,8 +528,15 @@ impl RocksImpl {
pub struct Builder {
wal_path: String,
runtime: Arc<Runtime>,
max_subcompactions: Option<u32>,
max_background_jobs: Option<i32>,
enable_statistics: Option<bool>,
write_buffer_size: Option<u64>,
max_write_buffer_number: Option<i32>,
level_zero_file_num_compaction_trigger: Option<i32>,
level_zero_slowdown_writes_trigger: Option<i32>,
level_zero_stop_writes_trigger: Option<i32>,
fifo_compaction_max_table_files_size: Option<u64>,
}

impl Builder {
Expand All @@ -535,11 +545,23 @@ impl Builder {
Self {
wal_path: wal_path.to_str().unwrap().to_owned(),
runtime,
max_subcompactions: None,
max_background_jobs: None,
enable_statistics: None,
write_buffer_size: None,
max_write_buffer_number: None,
level_zero_file_num_compaction_trigger: None,
level_zero_slowdown_writes_trigger: None,
level_zero_stop_writes_trigger: None,
fifo_compaction_max_table_files_size: None,
}
}

pub fn max_subcompactions(mut self, v: u32) -> Self {
self.max_subcompactions = Some(v);
self
}

pub fn max_background_jobs(mut self, v: i32) -> Self {
self.max_background_jobs = Some(v);
self
Expand All @@ -550,10 +572,43 @@ impl Builder {
self
}

pub fn write_buffer_size(mut self, v: u64) -> Self {
self.write_buffer_size = Some(v);
self
}

pub fn max_write_buffer_number(mut self, v: i32) -> Self {
self.max_write_buffer_number = Some(v);
self
}

pub fn level_zero_file_num_compaction_trigger(mut self, v: i32) -> Self {
self.level_zero_file_num_compaction_trigger = Some(v);
self
}

pub fn level_zero_slowdown_writes_trigger(mut self, v: i32) -> Self {
self.level_zero_slowdown_writes_trigger = Some(v);
self
}

pub fn level_zero_stop_writes_trigger(mut self, v: i32) -> Self {
self.level_zero_stop_writes_trigger = Some(v);
self
}

pub fn fifo_compaction_max_table_files_size(mut self, v: u64) -> Self {
self.fifo_compaction_max_table_files_size = Some(v);
self
}

pub fn build(self) -> Result<RocksImpl> {
let mut rocksdb_config = DBOptions::default();
rocksdb_config.create_if_missing(true);

if let Some(v) = self.max_subcompactions {
rocksdb_config.set_max_subcompactions(v);
}
if let Some(v) = self.max_background_jobs {
rocksdb_config.set_max_background_jobs(v);
}
Expand All @@ -566,7 +621,38 @@ impl Builder {
None
};

let db = DB::open(rocksdb_config, &self.wal_path)
let mut cf_opts = ColumnFamilyOptions::new();
if let Some(v) = self.write_buffer_size {
cf_opts.set_write_buffer_size(v);
}
if let Some(v) = self.max_write_buffer_number {
cf_opts.set_max_write_buffer_number(v);
}
if let Some(v) = self.level_zero_file_num_compaction_trigger {
cf_opts.set_level_zero_file_num_compaction_trigger(v);
}
if let Some(v) = self.level_zero_slowdown_writes_trigger {
cf_opts.set_level_zero_slowdown_writes_trigger(v);
}
if let Some(v) = self.level_zero_stop_writes_trigger {
cf_opts.set_level_zero_stop_writes_trigger(v);
}

// FIFO compaction strategy let rocksdb looks like a message queue.
if let Some(v) = self.fifo_compaction_max_table_files_size {
if v > 0 {
let mut fifo_opts = FifoCompactionOptions::new();
fifo_opts.set_max_table_files_size(v);
cf_opts.set_fifo_compaction_options(fifo_opts);
cf_opts.set_compaction_style(DBCompactionStyle::Fifo);
}
}

let default_cfd = ColumnFamilyDescriptor {
options: cf_opts,
..ColumnFamilyDescriptor::default()
};
let db = DB::open_cf(rocksdb_config, &self.wal_path, vec![default_cfd])
.map_err(|e| e.into())
.context(Open {
wal_path: self.wal_path.clone(),
Expand Down