diff --git a/node/Cargo.toml b/node/Cargo.toml index 075d9f83f44c..fa981a25c31a 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" network = { path = "network" } ferret-libp2p = { path = "ferret-libp2p"} utils = { path = "utils" } +db = { path = "db" } libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "776d13ef046358964c7d64cda3295a3a3cb24743" } tokio = "0.1.22" diff --git a/node/db/Cargo.toml b/node/db/Cargo.toml new file mode 100644 index 000000000000..b88bbed6cc11 --- /dev/null +++ b/node/db/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "db" +version = "0.1.0" +authors = ["ChainSafe Systems "] +edition = "2018" + +[dependencies] +rocksdb = "0.13.0" \ No newline at end of file diff --git a/node/db/src/errors.rs b/node/db/src/errors.rs new file mode 100644 index 000000000000..d8d856f62a2f --- /dev/null +++ b/node/db/src/errors.rs @@ -0,0 +1,21 @@ +use rocksdb; +use std::fmt; + +#[derive(Debug, PartialEq)] +pub struct Error { + msg: String, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Database error: {}", self.msg) + } +} + +impl From for Error { + fn from(e: rocksdb::Error) -> Error { + Error { + msg: String::from(e), + } + } +} diff --git a/node/db/src/lib.rs b/node/db/src/lib.rs new file mode 100644 index 000000000000..1754f0c24263 --- /dev/null +++ b/node/db/src/lib.rs @@ -0,0 +1,33 @@ +pub mod errors; +pub mod rocks; + +use errors::Error; + +pub trait Write { + fn write(&self, key: K, value: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>; + fn delete(&self, key: K) -> Result<(), Error> + where + K: AsRef<[u8]>; + fn bulk_write(&self, keys: &[K], values: &[V]) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>; + fn bulk_delete(&self, keys: &[K]) -> Result<(), Error> + where + K: AsRef<[u8]>; +} + +pub trait Read { + fn read(&self, key: K) -> Result>, Error> + where + K: AsRef<[u8]>; + fn exists(&self, key: K) -> Result + where + K: AsRef<[u8]>; + fn bulk_read(&self, keys: &[K]) -> Result>>, Error> + where + K: AsRef<[u8]>; +} diff --git a/node/db/src/rocks.rs b/node/db/src/rocks.rs new file mode 100644 index 000000000000..a5695e2bdbbb --- /dev/null +++ b/node/db/src/rocks.rs @@ -0,0 +1,90 @@ +use super::errors::Error; +use super::{Read, Write}; +use rocksdb::{Options, WriteBatch, DB}; +use std::path::Path; + +#[derive(Debug)] +pub struct RocksDb { + db: DB, +} + +impl RocksDb { + pub fn open(path: &Path) -> Result { + let mut db_opts = Options::default(); + db_opts.create_if_missing(true); + let db = DB::open(&db_opts, path)?; + Ok(Self { db }) + } +} + +impl Write for RocksDb { + fn write(&self, key: K, value: V) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + Ok(self.db.put(key, value)?) + } + + fn delete(&self, key: K) -> Result<(), Error> + where + K: AsRef<[u8]>, + { + Ok(self.db.delete(key)?) + } + + fn bulk_write(&self, keys: &[K], values: &[V]) -> Result<(), Error> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let mut batch = WriteBatch::default(); + for (k, v) in keys.iter().zip(values.iter()) { + batch.put(k, v)?; + } + Ok(self.db.write(batch)?) + } + + fn bulk_delete(&self, keys: &[K]) -> Result<(), Error> + where + K: AsRef<[u8]>, + { + for k in keys.iter() { + self.db.delete(k)?; + } + Ok(()) + } +} + +impl Read for RocksDb { + fn read(&self, key: K) -> Result>, Error> + where + K: AsRef<[u8]>, + { + self.db.get(key).map_err(Error::from) + } + + fn exists(&self, key: K) -> Result + where + K: AsRef<[u8]>, + { + self.db + .get_pinned(key) + .map(|v| v.is_some()) + .map_err(Error::from) + } + + fn bulk_read(&self, keys: &[K]) -> Result>>, Error> + where + K: AsRef<[u8]>, + { + let mut v = Vec::with_capacity(keys.len()); + for k in keys.iter() { + match self.db.get(k) { + Ok(val) => v.push(val), + Err(e) => return Err(Error::from(e)), + } + } + Ok(v) + } +} diff --git a/node/db/tests/db_utils/mod.rs b/node/db/tests/db_utils/mod.rs new file mode 100644 index 000000000000..d43416abd6e9 --- /dev/null +++ b/node/db/tests/db_utils/mod.rs @@ -0,0 +1,41 @@ +// Taken from +// https://github.com/rust-rocksdb/rust-rocksdb/blob/master/tests/util/mod.rs +use rocksdb::{Options, DB}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Ensures that DB::Destroy is called for this database when DBPath is dropped. +pub struct DBPath { + pub path: PathBuf, +} + +impl DBPath { + /// Suffixes the given `prefix` with a timestamp to ensure that subsequent test runs don't reuse + /// an old database in case of panics prior to Drop being called. + pub fn new(prefix: &str) -> DBPath { + let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let path = format!( + "{}.{}.{}", + prefix, + current_time.as_secs(), + current_time.subsec_nanos() + ); + + DBPath { + path: PathBuf::from(path), + } + } +} + +impl Drop for DBPath { + fn drop(&mut self) { + let opts = Options::default(); + DB::destroy(&opts, &self.path).unwrap(); + } +} + +impl AsRef for DBPath { + fn as_ref(&self) -> &Path { + &self.path + } +} diff --git a/node/db/tests/rocks_test.rs b/node/db/tests/rocks_test.rs new file mode 100644 index 000000000000..ffc4c23764b6 --- /dev/null +++ b/node/db/tests/rocks_test.rs @@ -0,0 +1,108 @@ +mod db_utils; + +use db::{rocks::RocksDb, Read, Write}; +use db_utils::DBPath; + +#[test] +fn start() { + let path = DBPath::new("start_rocks_test"); + RocksDb::open(path.as_ref()).unwrap(); +} + +#[test] +fn write() { + let path = DBPath::new("write_rocks_test"); + let key = [1]; + let value = [1]; + + let db: RocksDb = RocksDb::open(path.as_ref()).unwrap(); + db.write(key, value).unwrap(); +} + +#[test] +fn read() { + let path = DBPath::new("read_rocks_test"); + let key = [0]; + let value = [1]; + let db = RocksDb::open(path.as_ref()).unwrap(); + db.write(key.clone(), value.clone()).unwrap(); + let res = db.read(key).unwrap().unwrap(); + assert_eq!(value.to_vec(), res); +} + +#[test] +fn exists() { + let path = DBPath::new("exists_rocks_test"); + let key = [0]; + let value = [1]; + let db = RocksDb::open(path.as_ref()).unwrap(); + db.write(key.clone(), value.clone()).unwrap(); + let res = db.exists(key).unwrap(); + assert_eq!(res, true); +} + +#[test] +fn does_not_exist() { + let path = DBPath::new("does_not_exists_rocks_test"); + let key = [0]; + let db = RocksDb::open(path.as_ref()).unwrap(); + let res = db.exists(key).unwrap(); + assert_eq!(res, false); +} + +#[test] +fn delete() { + let path = DBPath::new("delete_rocks_test"); + let key = [0]; + let value = [1]; + let db = RocksDb::open(path.as_ref()).unwrap(); + db.write(key.clone(), value.clone()).unwrap(); + let res = db.exists(key.clone()).unwrap(); + assert_eq!(res, true); + db.delete(key.clone()).unwrap(); + let res = db.exists(key.clone()).unwrap(); + assert_eq!(res, false); +} + +#[test] +fn bulk_write() { + let path = DBPath::new("bulk_write_rocks_test"); + let keys = [[0], [1], [2]]; + let values = [[0], [1], [2]]; + let db = RocksDb::open(path.as_ref()).unwrap(); + db.bulk_write(&keys, &values).unwrap(); + for k in keys.iter() { + let res = db.exists(k.clone()).unwrap(); + assert_eq!(res, true); + } +} + +#[test] +fn bulk_read() { + let path = DBPath::new("bulk_read_rocks_test"); + let keys = [[0], [1], [2]]; + let values = [[0], [1], [2]]; + let db = RocksDb::open(path.as_ref()).unwrap(); + db.bulk_write(&keys, &values).unwrap(); + let results = db.bulk_read(&keys).unwrap(); + for (result, value) in results.iter().zip(values.iter()) { + match result { + Some(v) => assert_eq!(v, value), + None => panic!("No values found!"), + } + } +} + +#[test] +fn bulk_delete() { + let path = DBPath::new("bulk_delete_rocks_test"); + let keys = [[0], [1], [2]]; + let values = [[0], [1], [2]]; + let db = RocksDb::open(path.as_ref()).unwrap(); + db.bulk_write(&keys, &values).unwrap(); + db.bulk_delete(&keys).unwrap(); + for k in keys.iter() { + let res = db.exists(k.clone()).unwrap(); + assert_eq!(res, false); + } +}