Skip to content

Commit

Permalink
parametrize current rocksdb settings (#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
LesnyRumcajs authored Mar 17, 2022
1 parent 879cb3e commit d52d4f7
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 85 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions forest/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Config {
pub sync: SyncConfig,
pub encrypt_keystore: bool,
pub metrics_port: u16,
pub rocks_db: db::rocks::RocksDbConfig,
}

impl Default for Config {
Expand All @@ -43,6 +44,7 @@ impl Default for Config {
sync: SyncConfig::default(),
encrypt_keystore: true,
metrics_port: 6116,
rocks_db: db::rocks::RocksDbConfig::default(),
}
}
}
12 changes: 7 additions & 5 deletions forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ pub(super) async fn start(config: Config) {

let passphrase = read_password().expect("Error reading passphrase");

let mut data_dir = PathBuf::from(&config.data_dir);
data_dir.push(ENCRYPTED_KEYSTORE_NAME);

let data_dir = PathBuf::from(&config.data_dir).join(ENCRYPTED_KEYSTORE_NAME);
if !data_dir.exists() {
print!("Confirm passphrase: ");
std::io::stdout().flush().unwrap();
Expand Down Expand Up @@ -119,7 +117,7 @@ pub(super) async fn start(config: Config) {
.expect("Opening SledDB must succeed");

#[cfg(feature = "rocksdb")]
let db = db::rocks::RocksDb::open(format!("{}/{}", config.data_dir.clone(), "db"))
let db = db::rocks::RocksDb::open(PathBuf::from(&config.data_dir).join("db"), config.rocks_db)
.expect("Opening RocksDB must succeed");

let db = Arc::new(db);
Expand Down Expand Up @@ -237,7 +235,11 @@ pub(super) async fn start(config: Config) {
(format!("127.0.0.1:{}", config.metrics_port))
.parse()
.unwrap(),
format!("{}/{}", config.data_dir.clone(), "db"),
PathBuf::from(&config.data_dir)
.join("db")
.into_os_string()
.into_string()
.expect("Failed converting the path to db"),
));

// Block until ctrl-c is hit
Expand Down
4 changes: 4 additions & 0 deletions node/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ parking_lot = "0.11"
encoding = { package = "forest_encoding", version = "0.2" }
thiserror = "1.0"
num_cpus = "1.13"
serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
tempfile = "3.3"
69 changes: 45 additions & 24 deletions node/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,29 @@ use super::errors::Error;
use super::Store;
use num_cpus;
pub use rocksdb::{Options, WriteBatch, DB};
use serde::Deserialize;
use std::path::Path;

#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct RocksDbConfig {
create_if_missing: bool,
parallelism: i32,
write_buffer_size: usize,
max_open_files: i32,
}

impl Default for RocksDbConfig {
fn default() -> Self {
Self {
create_if_missing: true,
parallelism: num_cpus::get() as i32,
write_buffer_size: 256 * 1024 * 1024,
max_open_files: 200,
}
}
}

/// RocksDB instance this satisfies the [Store] interface.
#[derive(Debug)]
pub struct RocksDb {
Expand All @@ -17,27 +38,34 @@ pub struct RocksDb {
///
/// Usage:
/// ```no_run
/// use forest_db::rocks::RocksDb;
/// use forest_db::rocks::{RocksDb, RocksDbConfig};
///
/// let mut db = RocksDb::open("test_db").unwrap();
/// let mut db = RocksDb::open("test_db", RocksDbConfig::default()).unwrap();
/// ```
impl RocksDb {
pub fn open<P>(path: P) -> Result<Self, Error>
pub fn open<P>(path: P, config: RocksDbConfig) -> Result<Self, Error>
where
P: AsRef<Path>,
{
let mut db_opts = Options::default();
db_opts.create_if_missing(true);
db_opts.increase_parallelism(num_cpus::get() as i32);
db_opts.set_write_buffer_size(256 * 1024 * 1024); // increase from 64MB to 256MB
db_opts.set_max_open_files(200);
db_opts.create_if_missing(config.create_if_missing);
db_opts.increase_parallelism(config.parallelism);
db_opts.set_write_buffer_size(config.write_buffer_size);
db_opts.set_max_open_files(config.max_open_files);
Ok(Self {
db: DB::open(&db_opts, path)?,
})
}
}

impl Store for RocksDb {
fn read<K>(&self, key: K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
{
self.db.get(key).map_err(Error::from)
}

fn write<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
Expand All @@ -53,6 +81,16 @@ impl Store for RocksDb {
Ok(self.db.delete(key)?)
}

fn exists<K>(&self, key: K) -> Result<bool, Error>
where
K: AsRef<[u8]>,
{
self.db
.get_pinned(key)
.map(|v| v.is_some())
.map_err(Error::from)
}

fn bulk_write<K, V>(&self, values: &[(K, V)]) -> Result<(), Error>
where
K: AsRef<[u8]>,
Expand All @@ -64,21 +102,4 @@ impl Store for RocksDb {
}
Ok(self.db.write(batch)?)
}

fn read<K>(&self, key: K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
{
self.db.get(key).map_err(Error::from)
}

fn exists<K>(&self, key: K) -> Result<bool, Error>
where
K: AsRef<[u8]>,
{
self.db
.get_pinned(key)
.map(|v| v.is_some())
.map_err(Error::from)
}
}
51 changes: 21 additions & 30 deletions node/db/tests/db_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,35 @@

#![cfg(feature = "rocksdb")]

// Taken from
// https://github.com/rust-rocksdb/rust-rocksdb/blob/master/tests/util/mod.rs
use rocksdb::{Options, DB};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use forest_db::rocks::{RocksDb, RocksDbConfig};
use std::ops::Deref;

/// Ensures that DB::Destroy is called for this database when DBPath is dropped.
pub struct DBPath {
pub path: PathBuf,
/// Temporary, self-cleaning RocksDB
pub struct TempRocksDB {
db: RocksDb,
_dir: tempfile::TempDir, // kept for cleaning up during Drop
}

impl DBPath {
/// Suffixes the given `prefix` with a timestamp to ensure that subsequent test runs don't reuse
/// an old database in case of panics prior to Drop being called.
pub fn new(prefix: &str) -> DBPath {
let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let path = format!(
"{}.{}.{}",
prefix,
current_time.as_secs(),
current_time.subsec_nanos()
);
impl TempRocksDB {
/// Creates a new DB in a temporary path that gets wiped out when the variable
/// gets out of scope.
pub fn new() -> TempRocksDB {
let dir = tempfile::Builder::new()
.tempdir()
.expect("Failed to create temporary path for db.");
let path = dir.path().join("db");

DBPath {
path: PathBuf::from(path),
TempRocksDB {
db: RocksDb::open(&path, RocksDbConfig::default()).unwrap(),
_dir: dir,
}
}
}

impl Drop for DBPath {
fn drop(&mut self) {
let opts = Options::default();
DB::destroy(&opts, &self.path).unwrap();
}
}
impl Deref for TempRocksDB {
type Target = RocksDb;

impl AsRef<Path> for DBPath {
fn as_ref(&self) -> &Path {
&self.path
fn deref(&self) -> &Self::Target {
&self.db
}
}
43 changes: 17 additions & 26 deletions node/db/tests/rocks_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,52 @@
mod db_utils;
mod subtests;

use db_utils::DBPath;
use forest_db::rocks::RocksDb;
use crate::db_utils::TempRocksDB;

#[test]
fn rocks_db_write() {
let path = DBPath::new("write_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::write(&db);
let db = TempRocksDB::new();
subtests::write(&*db);
}

#[test]
fn rocks_db_read() {
let path = DBPath::new("read_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::read(&db);
let db = TempRocksDB::new();
subtests::read(&*db);
}

#[test]
fn rocks_db_exists() {
let path = DBPath::new("exists_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::exists(&db);
let db = TempRocksDB::new();
subtests::exists(&*db);
}

#[test]
fn rocks_db_does_not_exist() {
let path = DBPath::new("does_not_exists_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::does_not_exist(&db);
let db = TempRocksDB::new();
subtests::does_not_exist(&*db);
}

#[test]
fn rocks_db_delete() {
let path = DBPath::new("delete_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::delete(&db);
let db = TempRocksDB::new();
subtests::delete(&*db);
}

#[test]
fn rocks_db_bulk_write() {
let path = DBPath::new("bulk_write_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::bulk_write(&db);
let db = TempRocksDB::new();
subtests::bulk_write(&*db);
}

#[test]
fn rocks_db_bulk_read() {
let path = DBPath::new("bulk_read_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::bulk_read(&db);
let db = TempRocksDB::new();
subtests::bulk_read(&*db);
}

#[test]
fn rocks_db_bulk_delete() {
let path = DBPath::new("bulk_delete_rocks_test");
let db = RocksDb::open(path.as_ref()).unwrap();
subtests::bulk_delete(&db);
let db = TempRocksDB::new();
subtests::bulk_delete(&*db);
}

0 comments on commit d52d4f7

Please sign in to comment.