diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index be39178bbc..940ce16579 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -173,15 +173,63 @@ impl WalsOpener for RocksDBWalsOpener { let data_path = Path::new(&rocksdb_wal_config.data_dir); let wal_path = data_path.join(WAL_DIR_NAME); let data_wal = RocksWalBuilder::new(wal_path, write_runtime.clone()) + .max_subcompactions(rocksdb_wal_config.data_namespace.max_subcompactions) .max_background_jobs(rocksdb_wal_config.data_namespace.max_background_jobs) .enable_statistics(rocksdb_wal_config.data_namespace.enable_statistics) + .write_buffer_size(rocksdb_wal_config.data_namespace.write_buffer_size.0) + .max_write_buffer_number(rocksdb_wal_config.data_namespace.max_write_buffer_number) + .level_zero_file_num_compaction_trigger( + rocksdb_wal_config + .data_namespace + .level_zero_file_num_compaction_trigger, + ) + .level_zero_slowdown_writes_trigger( + rocksdb_wal_config + .data_namespace + .level_zero_slowdown_writes_trigger, + ) + .level_zero_stop_writes_trigger( + rocksdb_wal_config + .data_namespace + .level_zero_stop_writes_trigger, + ) + .fifo_compaction_max_table_files_size( + rocksdb_wal_config + .data_namespace + .fifo_compaction_max_table_files_size + .0, + ) .build() .context(OpenWal)?; let manifest_path = data_path.join(MANIFEST_DIR_NAME); let manifest_wal = RocksWalBuilder::new(manifest_path, write_runtime) + .max_subcompactions(rocksdb_wal_config.meta_namespace.max_subcompactions) .max_background_jobs(rocksdb_wal_config.meta_namespace.max_background_jobs) .enable_statistics(rocksdb_wal_config.meta_namespace.enable_statistics) + .write_buffer_size(rocksdb_wal_config.meta_namespace.write_buffer_size.0) + .max_write_buffer_number(rocksdb_wal_config.meta_namespace.max_write_buffer_number) + .level_zero_file_num_compaction_trigger( + rocksdb_wal_config + .meta_namespace + .level_zero_file_num_compaction_trigger, + ) + .level_zero_slowdown_writes_trigger( + rocksdb_wal_config + .meta_namespace + .level_zero_slowdown_writes_trigger, + ) + .level_zero_stop_writes_trigger( + rocksdb_wal_config + .meta_namespace + .level_zero_stop_writes_trigger, + ) + .fifo_compaction_max_table_files_size( + rocksdb_wal_config + .meta_namespace + .fifo_compaction_max_table_files_size + .0, + ) .build() .context(OpenManifestWal)?; let opened_wals = OpenedWals { diff --git a/wal/src/rocks_impl/config.rs b/wal/src/rocks_impl/config.rs index 439313c144..966720f094 100644 --- a/wal/src/rocks_impl/config.rs +++ b/wal/src/rocks_impl/config.rs @@ -2,13 +2,26 @@ //! RocksDB Config +use common_util::config::ReadableSize; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct Config { + pub max_subcompactions: u32, pub max_background_jobs: i32, pub enable_statistics: bool, + pub write_buffer_size: ReadableSize, + pub max_write_buffer_number: i32, + // Number of files to trigger level-0 compaction. A value <0 means that level-0 compaction will + // not be triggered by number of files at all. + pub level_zero_file_num_compaction_trigger: i32, + // Soft limit on number of level-0 files. We start slowing down writes at this point. A value + // <0 means that no writing slow down will be triggered by number of files in level-0. + pub level_zero_slowdown_writes_trigger: i32, + // Maximum number of level-0 files. We stop writes at this point. + pub level_zero_stop_writes_trigger: i32, + pub fifo_compaction_max_table_files_size: ReadableSize, } impl Default for Config { @@ -16,8 +29,16 @@ impl Default for Config { Self { // Same with rocksdb // https://github.com/facebook/rocksdb/blob/v6.4.6/include/rocksdb/options.h#L537 + max_subcompactions: 1, max_background_jobs: 2, enable_statistics: false, + write_buffer_size: ReadableSize::mb(64), + max_write_buffer_number: 2, + level_zero_file_num_compaction_trigger: 4, + level_zero_slowdown_writes_trigger: 20, + level_zero_stop_writes_trigger: 36, + // default is 1G, use 0 to disable fifo + fifo_compaction_max_table_files_size: ReadableSize::gb(0), } } } diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index b8a97b8e7a..2e4e70cf00 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -19,7 +19,10 @@ use common_types::{ }; use common_util::{error::BoxError, runtime::Runtime}; use log::{debug, info, warn}; -use rocksdb::{DBIterator, DBOptions, ReadOptions, SeekKey, Statistics, Writable, WriteBatch, DB}; +use rocksdb::{ + rocksdb_options::ColumnFamilyDescriptor, ColumnFamilyOptions, DBCompactionStyle, DBIterator, + DBOptions, FifoCompactionOptions, ReadOptions, SeekKey, Statistics, Writable, WriteBatch, DB, +}; use snafu::ResultExt; use tokio::sync::Mutex; @@ -525,8 +528,15 @@ impl RocksImpl { pub struct Builder { wal_path: String, runtime: Arc, + max_subcompactions: Option, max_background_jobs: Option, enable_statistics: Option, + write_buffer_size: Option, + max_write_buffer_number: Option, + level_zero_file_num_compaction_trigger: Option, + level_zero_slowdown_writes_trigger: Option, + level_zero_stop_writes_trigger: Option, + fifo_compaction_max_table_files_size: Option, } impl Builder { @@ -535,11 +545,23 @@ impl Builder { Self { wal_path: wal_path.to_str().unwrap().to_owned(), runtime, + max_subcompactions: None, max_background_jobs: None, enable_statistics: None, + write_buffer_size: None, + max_write_buffer_number: None, + level_zero_file_num_compaction_trigger: None, + level_zero_slowdown_writes_trigger: None, + level_zero_stop_writes_trigger: None, + fifo_compaction_max_table_files_size: None, } } + pub fn max_subcompactions(mut self, v: u32) -> Self { + self.max_subcompactions = Some(v); + self + } + pub fn max_background_jobs(mut self, v: i32) -> Self { self.max_background_jobs = Some(v); self @@ -550,10 +572,43 @@ impl Builder { self } + pub fn write_buffer_size(mut self, v: u64) -> Self { + self.write_buffer_size = Some(v); + self + } + + pub fn max_write_buffer_number(mut self, v: i32) -> Self { + self.max_write_buffer_number = Some(v); + self + } + + pub fn level_zero_file_num_compaction_trigger(mut self, v: i32) -> Self { + self.level_zero_file_num_compaction_trigger = Some(v); + self + } + + pub fn level_zero_slowdown_writes_trigger(mut self, v: i32) -> Self { + self.level_zero_slowdown_writes_trigger = Some(v); + self + } + + pub fn level_zero_stop_writes_trigger(mut self, v: i32) -> Self { + self.level_zero_stop_writes_trigger = Some(v); + self + } + + pub fn fifo_compaction_max_table_files_size(mut self, v: u64) -> Self { + self.fifo_compaction_max_table_files_size = Some(v); + self + } + pub fn build(self) -> Result { let mut rocksdb_config = DBOptions::default(); rocksdb_config.create_if_missing(true); + if let Some(v) = self.max_subcompactions { + rocksdb_config.set_max_subcompactions(v); + } if let Some(v) = self.max_background_jobs { rocksdb_config.set_max_background_jobs(v); } @@ -566,7 +621,38 @@ impl Builder { None }; - let db = DB::open(rocksdb_config, &self.wal_path) + let mut cf_opts = ColumnFamilyOptions::new(); + if let Some(v) = self.write_buffer_size { + cf_opts.set_write_buffer_size(v); + } + if let Some(v) = self.max_write_buffer_number { + cf_opts.set_max_write_buffer_number(v); + } + if let Some(v) = self.level_zero_file_num_compaction_trigger { + cf_opts.set_level_zero_file_num_compaction_trigger(v); + } + if let Some(v) = self.level_zero_slowdown_writes_trigger { + cf_opts.set_level_zero_slowdown_writes_trigger(v); + } + if let Some(v) = self.level_zero_stop_writes_trigger { + cf_opts.set_level_zero_stop_writes_trigger(v); + } + + // FIFO compaction strategy let rocksdb looks like a message queue. + if let Some(v) = self.fifo_compaction_max_table_files_size { + if v > 0 { + let mut fifo_opts = FifoCompactionOptions::new(); + fifo_opts.set_max_table_files_size(v); + cf_opts.set_fifo_compaction_options(fifo_opts); + cf_opts.set_compaction_style(DBCompactionStyle::Fifo); + } + } + + let default_cfd = ColumnFamilyDescriptor { + options: cf_opts, + ..ColumnFamilyDescriptor::default() + }; + let db = DB::open_cf(rocksdb_config, &self.wal_path, vec![default_cfd]) .map_err(|e| e.into()) .context(Open { wal_path: self.wal_path.clone(),