From 94a0e2ff7d04af4e0ff9f56e1ec564b2135146b3 Mon Sep 17 00:00:00 2001 From: Andronik Date: Sun, 14 Aug 2022 14:53:48 +0200 Subject: [PATCH 1/3] rocksdb: remove and simplify a bunch of stuff --- kvdb-memorydb/src/lib.rs | 4 - kvdb-rocksdb/Cargo.toml | 2 - kvdb-rocksdb/src/iter.rs | 80 ----------- kvdb-rocksdb/src/lib.rs | 281 +++++++++++++-------------------------- kvdb/src/lib.rs | 3 - 5 files changed, 94 insertions(+), 276 deletions(-) diff --git a/kvdb-memorydb/src/lib.rs b/kvdb-memorydb/src/lib.rs index 265aa3312..509835f83 100644 --- a/kvdb-memorydb/src/lib.rs +++ b/kvdb-memorydb/src/lib.rs @@ -117,10 +117,6 @@ impl KeyValueDB for InMemory { None => Box::new(None.into_iter()), } } - - fn restore(&self, _new_db: &str) -> io::Result<()> { - Err(io::Error::new(io::ErrorKind::Other, "Attempted to restore in-memory database")) - } } #[cfg(test)] diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 0bbd1c935..4741483e7 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -14,13 +14,11 @@ harness = false [dependencies] smallvec = "1.0.0" -fs-swap = "0.2.6" kvdb = { path = "../kvdb", version = "0.11" } log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.12.0" regex = "1.3.1" -owning_ref = "0.4.0" parity-util-mem = { path = "../parity-util-mem", version = "0.11", default-features = false, features = ["std", "smallvec"] } # OpenBSD and MSVC are unteested and shouldn't enable jemalloc: diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index 818099ca4..ca5ef7e59 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -16,59 +16,11 @@ //! See https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes for details. use crate::DBAndColumns; -use owning_ref::{OwningHandle, StableAddress}; -use parking_lot::RwLockReadGuard; use rocksdb::{DBIterator, Direction, IteratorMode, ReadOptions}; -use std::ops::{Deref, DerefMut}; /// A tuple holding key and value data, used as the iterator item type. pub type KeyValuePair = (Box<[u8]>, Box<[u8]>); -/// Iterator with built-in synchronization. -pub struct ReadGuardedIterator<'a, I, T> { - inner: OwningHandle>, DerefWrapper>>, -} - -// We can't implement `StableAddress` for a `RwLockReadGuard` -// directly due to orphan rules. -#[repr(transparent)] -struct UnsafeStableAddress<'a, T>(RwLockReadGuard<'a, T>); - -impl<'a, T> Deref for UnsafeStableAddress<'a, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} - -// RwLockReadGuard dereferences to a stable address; qed -unsafe impl<'a, T> StableAddress for UnsafeStableAddress<'a, T> {} - -struct DerefWrapper(T); - -impl Deref for DerefWrapper { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for DerefWrapper { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> { - type Item = I::Item; - - fn next(&mut self) -> Option { - self.inner.deref_mut().as_mut().and_then(|iter| iter.next()) - } -} - /// Instantiate iterators yielding `KeyValuePair`s. pub trait IterationHandler { type Iterator: Iterator; @@ -84,38 +36,6 @@ pub trait IterationHandler { fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator; } -impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T> -where - &'a T: IterationHandler, -{ - /// Creates a new `ReadGuardedIterator` that maps `RwLock` to `RwLock`, - /// where `DBIterator` iterates over all keys. - pub fn new(read_lock: RwLockReadGuard<'a, Option>, col: u32, read_opts: ReadOptions) -> Self { - Self { inner: Self::new_inner(read_lock, |db| db.iter(col, read_opts)) } - } - - /// Creates a new `ReadGuardedIterator` that maps `RwLock` to `RwLock`, - /// where `DBIterator` iterates over keys which start with the given prefix. - pub fn new_with_prefix( - read_lock: RwLockReadGuard<'a, Option>, - col: u32, - prefix: &[u8], - read_opts: ReadOptions, - ) -> Self { - Self { inner: Self::new_inner(read_lock, |db| db.iter_with_prefix(col, prefix, read_opts)) } - } - - fn new_inner( - rlock: RwLockReadGuard<'a, Option>, - f: impl FnOnce(&'a T) -> <&'a T as IterationHandler>::Iterator, - ) -> OwningHandle>, DerefWrapper::Iterator>>> { - OwningHandle::new_with_fn(UnsafeStableAddress(rlock), move |rlock| { - let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; - DerefWrapper(rlock.as_ref().map(f)) - }) - } -} - impl<'a> IterationHandler for &'a DBAndColumns { type Iterator = DBIterator<'a>; diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index a8029c566..de18b7805 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -12,22 +12,19 @@ mod stats; use std::{ cmp, collections::HashMap, - convert::identity, - error, fs, io, mem, + error, fs, io, path::{Path, PathBuf}, result, }; use parity_util_mem::MallocSizeOf; -use parking_lot::RwLock; use rocksdb::{ BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB, }; use crate::iter::KeyValuePair; -use fs_swap::{swap, swap_nonatomic}; use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB}; -use log::{debug, warn}; +use log::warn; #[cfg(target_os = "linux")] use regex::Regex; @@ -293,7 +290,7 @@ impl DBAndColumns { /// Key-Value database. #[derive(MallocSizeOf)] pub struct Database { - db: RwLock>, + inner: DBAndColumns, #[ignore_malloc_size_of = "insignificant"] config: DatabaseConfig, #[ignore_malloc_size_of = "insignificant"] @@ -419,7 +416,7 @@ impl Database { }; Ok(Database { - db: RwLock::new(Some(DBAndColumns { db, column_names })), + inner: DBAndColumns { db, column_names }, config: config.clone(), path: path.as_ref().to_owned(), opts, @@ -506,80 +503,72 @@ impl Database { /// Commit transaction to database. pub fn write(&self, tr: DBTransaction) -> io::Result<()> { - match *self.db.read() { - Some(ref cfs) => { - let mut batch = WriteBatch::default(); - let ops = tr.ops; - - self.stats.tally_writes(ops.len() as u64); - self.stats.tally_transactions(1); - - let mut stats_total_bytes = 0; - - for op in ops { - let cf = cfs.cf(op.col() as usize); - - match op { - DBOp::Insert { col: _, key, value } => { - stats_total_bytes += key.len() + value.len(); - batch.put_cf(cf, &key, &value); - }, - DBOp::Delete { col: _, key } => { - // We count deletes as writes. - stats_total_bytes += key.len(); - batch.delete_cf(cf, &key); - }, - DBOp::DeletePrefix { col, prefix } => { - let end_prefix = kvdb::end_prefix(&prefix[..]); - let no_end = end_prefix.is_none(); - let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]); - batch.delete_range_cf(cf, &prefix[..], &end_range[..]); - if no_end { - use crate::iter::IterationHandler as _; - - let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] }; - // We call `iter_with_prefix` directly on `cfs` to avoid taking a lock twice - // See https://github.com/paritytech/parity-common/pull/396. - let read_opts = generate_read_options(); - for (key, _) in cfs.iter_with_prefix(col, prefix, read_opts) { - batch.delete_cf(cf, &key[..]); - } - } - }, - }; - } - self.stats.tally_bytes_written(stats_total_bytes as u64); - - check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) - }, - None => Err(other_io_err("Database is closed")), + let cfs = &self.inner; + let mut batch = WriteBatch::default(); + let ops = tr.ops; + + self.stats.tally_writes(ops.len() as u64); + self.stats.tally_transactions(1); + + let mut stats_total_bytes = 0; + + for op in ops { + let cf = cfs.cf(op.col() as usize); + + match op { + DBOp::Insert { col: _, key, value } => { + stats_total_bytes += key.len() + value.len(); + batch.put_cf(cf, &key, &value); + }, + DBOp::Delete { col: _, key } => { + // We count deletes as writes. + stats_total_bytes += key.len(); + batch.delete_cf(cf, &key); + }, + DBOp::DeletePrefix { col, prefix } => { + let end_prefix = kvdb::end_prefix(&prefix[..]); + let no_end = end_prefix.is_none(); + let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]); + batch.delete_range_cf(cf, &prefix[..], &end_range[..]); + if no_end { + use crate::iter::IterationHandler as _; + + let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] }; + // We call `iter_with_prefix` directly on `cfs` to avoid taking a lock twice + // See https://github.com/paritytech/parity-common/pull/396. + let read_opts = generate_read_options(); + for (key, _) in cfs.iter_with_prefix(col, prefix, read_opts) { + batch.delete_cf(cf, &key[..]); + } + } + }, + }; } + self.stats.tally_bytes_written(stats_total_bytes as u64); + + check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) } /// Get value by key. pub fn get(&self, col: u32, key: &[u8]) -> io::Result> { - match *self.db.read() { - Some(ref cfs) => { - if cfs.column_names.get(col as usize).is_none() { - return Err(other_io_err("column index is out of bounds")) - } - self.stats.tally_reads(1); - let value = cfs - .db - .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) - .map(|r| r.map(|v| v.to_vec())) - .map_err(other_io_err); - - match value { - Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64), - Ok(None) => self.stats.tally_bytes_read(key.len() as u64), - _ => {}, - }; - - value - }, - None => Ok(None), + let cfs = &self.inner; + if cfs.column_names.get(col as usize).is_none() { + return Err(other_io_err("column index is out of bounds")) } + self.stats.tally_reads(1); + let value = cfs + .db + .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) + .map(|r| r.map(|v| v.to_vec())) + .map_err(other_io_err); + + match value { + Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64), + Ok(None) => self.stats.tally_bytes_read(key.len() as u64), + _ => {}, + }; + + value } /// Get value by partial key. Prefix size should match configured prefix size. @@ -591,126 +580,56 @@ impl Database { /// Will hold a lock until the iterator is dropped /// preventing the database from being closed. pub fn iter<'a>(&'a self, col: u32) -> impl Iterator + 'a { - let read_lock = self.db.read(); - let optional = if read_lock.is_some() { - let read_opts = generate_read_options(); - let guarded = iter::ReadGuardedIterator::new(read_lock, col, read_opts); - Some(guarded) - } else { - None - }; - optional.into_iter().flat_map(identity) + let read_opts = generate_read_options(); + iter::IterationHandler::iter(&&self.inner, col, read_opts) } /// Iterator over data in the `col` database column index matching the given prefix. /// Will hold a lock until the iterator is dropped /// preventing the database from being closed. fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator + 'a { - let read_lock = self.db.read(); - let optional = if read_lock.is_some() { - let mut read_opts = generate_read_options(); - // rocksdb doesn't work with an empty upper bound - if let Some(end_prefix) = kvdb::end_prefix(prefix) { - read_opts.set_iterate_upper_bound(end_prefix); - } - let guarded = iter::ReadGuardedIterator::new_with_prefix(read_lock, col, prefix, read_opts); - Some(guarded) - } else { - None - }; - optional.into_iter().flat_map(identity) - } - - /// Close the database - fn close(&self) { - *self.db.write() = None; - } - - /// Restore the database from a copy at given path. - pub fn restore>(&self, new_db: P) -> io::Result<()> { - self.close(); - - // swap is guaranteed to be atomic - match swap(new_db.as_ref(), &self.path) { - Ok(_) => { - // ignore errors - let _ = fs::remove_dir_all(new_db.as_ref()); - }, - Err(err) => { - debug!("DB atomic swap failed: {}", err); - match swap_nonatomic(new_db.as_ref(), &self.path) { - Ok(_) => { - // ignore errors - let _ = fs::remove_dir_all(new_db); - }, - Err(err) => { - warn!("Failed to swap DB directories: {:?}", err); - return Err(io::Error::new( - io::ErrorKind::Other, - "DB restoration failed: could not swap DB directories", - )) - }, - } - }, + let mut read_opts = generate_read_options(); + // rocksdb doesn't work with an empty upper bound + if let Some(end_prefix) = kvdb::end_prefix(prefix) { + read_opts.set_iterate_upper_bound(end_prefix); } - - // reopen the database and steal handles into self - let db = Self::open(&self.config, &self.path)?; - *self.db.write() = mem::replace(&mut *db.db.write(), None); - Ok(()) + iter::IterationHandler::iter_with_prefix(&&self.inner, col, prefix, read_opts) } /// The number of column families in the db. pub fn num_columns(&self) -> u32 { - self.db - .read() - .as_ref() - .and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) }) - .map(|n| n as u32) - .unwrap_or(0) + self.inner.column_names.len() as u32 } /// The number of keys in a column (estimated). pub fn num_keys(&self, col: u32) -> io::Result { const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys"; - match *self.db.read() { - Some(ref cfs) => { - let cf = cfs.cf(col as usize); - match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) { - Ok(estimate) => Ok(estimate.unwrap_or_default()), - Err(err_string) => Err(other_io_err(err_string)), - } - }, - None => Ok(0), + let cfs = &self.inner; + let cf = cfs.cf(col as usize); + match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) { + Ok(estimate) => Ok(estimate.unwrap_or_default()), + Err(err_string) => Err(other_io_err(err_string)), } } /// Remove the last column family in the database. The deletion is definitive. - pub fn remove_last_column(&self) -> io::Result<()> { - match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut column_names }) => { - if let Some(name) = column_names.pop() { - db.drop_cf(&name).map_err(other_io_err)?; - } - Ok(()) - }, - None => Ok(()), + pub fn remove_last_column(&mut self) -> io::Result<()> { + let DBAndColumns { ref mut db, ref mut column_names } = self.inner; + if let Some(name) = column_names.pop() { + db.drop_cf(&name).map_err(other_io_err)?; } + Ok(()) } /// Add a new column family to the DB. - pub fn add_column(&self) -> io::Result<()> { - match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut column_names }) => { - let col = column_names.len() as u32; - let name = format!("col{}", col); - let col_config = self.config.column_config(&self.block_opts, col as u32); - let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?; - column_names.push(name); - Ok(()) - }, - None => Ok(()), - } + pub fn add_column(&mut self) -> io::Result<()> { + let DBAndColumns { ref mut db, ref mut column_names } = self.inner; + let col = column_names.len() as u32; + let name = format!("col{}", col); + let col_config = self.config.column_config(&self.block_opts, col as u32); + let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?; + column_names.push(name); + Ok(()) } /// Get RocksDB statistics. @@ -743,10 +662,7 @@ impl Database { /// /// Calling this as primary will return an error. pub fn try_catch_up_with_primary(&self) -> io::Result<()> { - match self.db.read().as_ref() { - Some(DBAndColumns { db, .. }) => db.try_catch_up_with_primary().map_err(other_io_err), - None => Ok(()), - } + self.inner.db.try_catch_up_with_primary().map_err(other_io_err) } } @@ -775,10 +691,6 @@ impl KeyValueDB for Database { Box::new(unboxed.into_iter()) } - fn restore(&self, new_db: &str) -> io::Result<()> { - Database::restore(self, new_db) - } - fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats { let rocksdb_stats = self.get_statistics(); let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64); @@ -930,12 +842,7 @@ mod tests { } db.write(batch).unwrap(); - { - let db = db.db.read(); - db.as_ref().map(|db| { - assert!(db.static_property_or_warn(0, "rocksdb.cur-size-all-mem-tables") > 512); - }); - } + assert!(db.inner.static_property_or_warn(0, "rocksdb.cur-size-all-mem-tables") > 512); } #[test] @@ -976,7 +883,7 @@ mod tests { // open 1, add 4. { - let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap(); + let mut db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap(); assert_eq!(db.num_columns(), 1); for i in 2..=5 { @@ -1001,7 +908,7 @@ mod tests { // open 5, remove 4. { - let db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns"); + let mut db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns"); assert_eq!(db.num_columns(), 5); for i in (1..5).rev() { diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index 7693439b9..437113082 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -128,9 +128,6 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf { prefix: &'a [u8], ) -> Box, Box<[u8]>)> + 'a>; - /// Attempt to replace this database with a new one located at the given path. - fn restore(&self, new_db: &str) -> io::Result<()>; - /// Query statistics. /// /// Not all kvdb implementations are able or expected to implement this, so by From 2e97d3d388773c3b62ff7f94570675cc1454beee Mon Sep 17 00:00:00 2001 From: Andronik Date: Sun, 14 Aug 2022 15:13:54 +0200 Subject: [PATCH 2/3] update changelogs --- kvdb-memorydb/CHANGELOG.md | 2 ++ kvdb-rocksdb/CHANGELOG.md | 4 ++++ kvdb/CHANGELOG.md | 2 ++ 3 files changed, 8 insertions(+) diff --git a/kvdb-memorydb/CHANGELOG.md b/kvdb-memorydb/CHANGELOG.md index e6dc4f85c..c9b3e5c86 100644 --- a/kvdb-memorydb/CHANGELOG.md +++ b/kvdb-memorydb/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog]. [Keep a Changelog]: http://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Breaking +- Updated `kvdb` to 0.12. [662](https://github.com/paritytech/parity-common/pull/662) ## [0.11.0] - 2022-02-04 ### Breaking diff --git a/kvdb-rocksdb/CHANGELOG.md b/kvdb-rocksdb/CHANGELOG.md index 08ae66937..1143ee90a 100644 --- a/kvdb-rocksdb/CHANGELOG.md +++ b/kvdb-rocksdb/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog]. [Keep a Changelog]: http://keepachangelog.com/en/1.0.0/ ## [Unreleased] +- Removed `owning_ref` from dependencies :tada:. [662](https://github.com/paritytech/parity-common/pull/662) +### Breaking +- Update `kvdb` to 0.12. [662](https://github.com/paritytech/parity-common/pull/662) + - `add_column` and `remove_last_column` now require `&mut self` ## [0.15.2] - 2022-03-20 - Disable `jemalloc` feature for `rocksdb` where it is not working. [#633](https://github.com/paritytech/parity-common/pull/633) diff --git a/kvdb/CHANGELOG.md b/kvdb/CHANGELOG.md index 5c5093b14..02d3a6a46 100644 --- a/kvdb/CHANGELOG.md +++ b/kvdb/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog]. [Keep a Changelog]: http://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Breaking +- Removed `fn remove` from `KeyValueDB` trait. [662](https://github.com/paritytech/parity-common/pull/662) ## [0.11.0] - 2022-02-04 ### Breaking From f5863a15d2c28f819d0602f627c12d34db63eab6 Mon Sep 17 00:00:00 2001 From: Andronik Date: Sun, 14 Aug 2022 20:32:28 +0200 Subject: [PATCH 3/3] Update kvdb/CHANGELOG.md Co-authored-by: cheme --- kvdb/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb/CHANGELOG.md b/kvdb/CHANGELOG.md index 02d3a6a46..3dd994f4d 100644 --- a/kvdb/CHANGELOG.md +++ b/kvdb/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog]. ## [Unreleased] ### Breaking -- Removed `fn remove` from `KeyValueDB` trait. [662](https://github.com/paritytech/parity-common/pull/662) +- Removed `fn restore` from `KeyValueDB` trait. [662](https://github.com/paritytech/parity-common/pull/662) ## [0.11.0] - 2022-02-04 ### Breaking