Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exposed more rocksdb options, increased max files #1481

Merged
merged 2 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions forest/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Config {
pub sync: SyncConfig,
pub encrypt_keystore: bool,
pub metrics_port: u16,
pub rocks_db: db::rocks::RocksDbConfig,
pub rocks_db: db::rocks_config::RocksDbConfig,
}

impl Default for Config {
Expand All @@ -44,7 +44,7 @@ impl Default for Config {
sync: SyncConfig::default(),
encrypt_keystore: true,
metrics_port: 6116,
rocks_db: db::rocks::RocksDbConfig::default(),
rocks_db: db::rocks_config::RocksDbConfig::default(),
}
}
}
23 changes: 15 additions & 8 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use libp2p::identity::{ed25519, Keypair};
use log::{debug, info, trace, warn};
use rpassword::read_password;

use db::rocks::RocksDb;
use std::io::prelude::*;
use std::path::PathBuf;
use std::sync::Arc;
use std::time;

/// Starts daemon process
pub(super) async fn start(config: Config) {
Expand Down Expand Up @@ -117,7 +119,7 @@ pub(super) async fn start(config: Config) {
.expect("Opening SledDB must succeed");

#[cfg(feature = "rocksdb")]
let db = db::rocks::RocksDb::open(PathBuf::from(&config.data_dir).join("db"), config.rocks_db)
let db = db::rocks::RocksDb::open(PathBuf::from(&config.data_dir).join("db"), &config.rocks_db)
.expect("Opening RocksDB must succeed");

let db = Arc::new(db);
Expand All @@ -144,13 +146,7 @@ pub(super) async fn start(config: Config) {

info!("Using network :: {}", network_name);

let validate_height = if config.snapshot { None } else { Some(0) };
// Sync from snapshot
if let Some(path) = &config.snapshot_path {
import_chain::<FullVerifier, _>(&state_manager, path, validate_height, config.skip_load)
.await
.unwrap();
}
sync_from_snapshot(&config, &state_manager).await;

// Fetch and ensure verification keys are downloaded
get_params_default(SectorSizeOpt::Keys, false)
Expand Down Expand Up @@ -261,6 +257,17 @@ pub(super) async fn start(config: Config) {
info!("Forest finish shutdown");
}

async fn sync_from_snapshot(config: &Config, state_manager: &Arc<StateManager<RocksDb>>) {
if let Some(path) = &config.snapshot_path {
let stopwatch = time::Instant::now();
let validate_height = if config.snapshot { None } else { Some(0) };
import_chain::<FullVerifier, _>(state_manager, path, validate_height, config.skip_load)
.await
.expect("Failed miserably while importing chain from snapshot");
debug!("Imported snapshot in: {}s", stopwatch.elapsed().as_secs());
}
}

#[cfg(test)]
#[cfg(not(any(feature = "interopnet", feature = "devnet")))]
mod test {
Expand Down
1 change: 1 addition & 0 deletions node/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ encoding = { package = "forest_encoding", version = "0.2" }
thiserror = "1.0"
num_cpus = "1.13"
serde = { version = "1.0", features = ["derive"] }
anyhow = "1"

[dev-dependencies]
tempfile = "3.3"
3 changes: 3 additions & 0 deletions node/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod memory;
#[cfg(feature = "rocksdb")]
pub mod rocks;

#[cfg(feature = "rocksdb")]
pub mod rocks_config;

#[cfg(feature = "sled")]
pub mod sled;

Expand Down
43 changes: 18 additions & 25 deletions node/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,10 @@

use super::errors::Error;
use super::Store;
use num_cpus;
use crate::rocks_config::{compaction_style_from_str, compression_type_from_str, RocksDbConfig};
pub use rocksdb::{Options, WriteBatch, DB};
use serde::Deserialize;
use std::path::Path;

#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct RocksDbConfig {
create_if_missing: bool,
parallelism: i32,
write_buffer_size: usize,
max_open_files: i32,
}

impl Default for RocksDbConfig {
fn default() -> Self {
Self {
create_if_missing: true,
parallelism: num_cpus::get() as i32,
write_buffer_size: 256 * 1024 * 1024,
max_open_files: 200,
}
}
}

/// RocksDB instance this satisfies the [Store] interface.
#[derive(Debug)]
pub struct RocksDb {
Expand All @@ -38,12 +17,13 @@ pub struct RocksDb {
///
/// Usage:
/// ```no_run
/// use forest_db::rocks::{RocksDb, RocksDbConfig};
/// use forest_db::rocks::RocksDb;
/// use forest_db::rocks_config::RocksDbConfig;
///
/// let mut db = RocksDb::open("test_db", RocksDbConfig::default()).unwrap();
/// let mut db = RocksDb::open("test_db", &RocksDbConfig::default()).unwrap();
/// ```
impl RocksDb {
pub fn open<P>(path: P, config: RocksDbConfig) -> Result<Self, Error>
pub fn open<P>(path: P, config: &RocksDbConfig) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Expand All @@ -52,6 +32,19 @@ impl RocksDb {
db_opts.increase_parallelism(config.parallelism);
db_opts.set_write_buffer_size(config.write_buffer_size);
db_opts.set_max_open_files(config.max_open_files);

if let Some(max_background_jobs) = config.max_background_jobs {
db_opts.set_max_background_jobs(max_background_jobs);
}
if let Some(compaction_style) = &config.compaction_style {
db_opts.set_compaction_style(compaction_style_from_str(compaction_style).unwrap());
}
if let Some(compression_type) = &config.compression_type {
db_opts.set_compression_type(compression_type_from_str(compression_type).unwrap());
}
if config.enable_statistics {
db_opts.enable_statistics();
};
Ok(Self {
db: DB::open(&db_opts, path)?,
})
Expand Down
108 changes: 108 additions & 0 deletions node/db/src/rocks_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT
use anyhow::anyhow;
use num_cpus;
use rocksdb::{DBCompactionStyle, DBCompressionType};
use serde::Deserialize;

/// RocksDB configuration exposed in Forest.
/// Only subset of possible options is implemented, add missing ones when needed.
/// For description of different options please refer to the `rocksdb` crate documentation.
/// <https://docs.rs/rocksdb/latest/rocksdb/>
#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct RocksDbConfig {
pub create_if_missing: bool,
pub parallelism: i32,
pub write_buffer_size: usize,
pub max_open_files: i32,
pub max_background_jobs: Option<i32>,
pub compression_type: Option<String>,
pub compaction_style: Option<String>,
pub enable_statistics: bool,
}

impl Default for RocksDbConfig {
fn default() -> Self {
Self {
create_if_missing: true,
parallelism: num_cpus::get() as i32,
write_buffer_size: 256 * 1024 * 1024,
max_open_files: 1024,
max_background_jobs: None,
compaction_style: None,
compression_type: Some("lz4".into()),
enable_statistics: false,
}
}
}

/// Converts string to a compaction style RocksDB variant.
pub(crate) fn compaction_style_from_str(s: &str) -> anyhow::Result<DBCompactionStyle> {
match s.to_lowercase().as_str() {
"level" => Ok(DBCompactionStyle::Level),
"universal" => Ok(DBCompactionStyle::Universal),
"fifo" => Ok(DBCompactionStyle::Fifo),
_ => Err(anyhow!("invalid compaction option")),
}
}

/// Converts string to a compression type RocksDB variant.
pub(crate) fn compression_type_from_str(s: &str) -> anyhow::Result<DBCompressionType> {
match s.to_lowercase().as_str() {
"bz2" => Ok(DBCompressionType::Bz2),
"lz4" => Ok(DBCompressionType::Lz4),
"lz4hc" => Ok(DBCompressionType::Lz4hc),
"snappy" => Ok(DBCompressionType::Snappy),
"zlib" => Ok(DBCompressionType::Zlib),
"zstd" => Ok(DBCompressionType::Zstd),
"none" => Ok(DBCompressionType::None),
_ => Err(anyhow!("invalid compression option")),
}
}

#[cfg(test)]
mod test {
use super::*;
use rocksdb::DBCompactionStyle;

#[test]
fn compaction_style_from_str_test() {
let test_cases = vec![
("Level", Ok(DBCompactionStyle::Level)),
("UNIVERSAL", Ok(DBCompactionStyle::Universal)),
("fifo", Ok(DBCompactionStyle::Fifo)),
("cthulhu", Err(anyhow!("some error message"))),
];
for (input, expected) in test_cases {
let actual = compaction_style_from_str(input);
if let Ok(compaction_style) = actual {
assert_eq!(expected.unwrap(), compaction_style);
} else {
assert!(expected.is_err());
}
}
}

#[test]
fn compression_style_from_str_test() {
let test_cases = vec![
("bz2", Ok(DBCompressionType::Bz2)),
("lz4", Ok(DBCompressionType::Lz4)),
("lz4HC", Ok(DBCompressionType::Lz4hc)),
("SNAPPY", Ok(DBCompressionType::Snappy)),
("zlib", Ok(DBCompressionType::Zlib)),
("ZSTD", Ok(DBCompressionType::Zstd)),
("none", Ok(DBCompressionType::None)),
("cthulhu", Err(anyhow!("some error message"))),
];
for (input, expected) in test_cases {
let actual = compression_type_from_str(input);
if let Ok(compression_type) = actual {
assert_eq!(expected.unwrap(), compression_type);
} else {
assert!(expected.is_err());
}
}
}
}
5 changes: 3 additions & 2 deletions node/db/tests/db_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

#![cfg(feature = "rocksdb")]

use forest_db::rocks::{RocksDb, RocksDbConfig};
use forest_db::rocks::RocksDb;
use forest_db::rocks_config::RocksDbConfig;
use std::ops::Deref;

/// Temporary, self-cleaning RocksDB
Expand All @@ -22,7 +23,7 @@ impl TempRocksDB {
let path = dir.path().join("db");

TempRocksDB {
db: RocksDb::open(&path, RocksDbConfig::default()).unwrap(),
db: RocksDb::open(&path, &RocksDbConfig::default()).unwrap(),
_dir: dir,
}
}
Expand Down