Skip to content

Commit

Permalink
refactor: assorted cleanups of store
Browse files Browse the repository at this point in the history
* use consistent spelling for `io::Reslut`
* repplace `e.into()` with more explicit `io::Error::from`
* implement From rather than Into
* reduce minore code duplication by delegating to existing functions
* use Arc::clone to make it explicit flow around store's identity
* use buffered writer when dumping store to a file
* fix latent bug around comparing wide pointers rust-lang/rust#69757.
  I thing we should not be hit by that due to full LTO, but that was a scary one.
* remove some extra clones when working with cache
  • Loading branch information
matklad committed Apr 11, 2022
1 parent dede047 commit a89da7f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 79 deletions.
6 changes: 3 additions & 3 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ impl From<rocksdb::Error> for DBError {
}
}

impl Into<io::Error> for DBError {
fn into(self) -> io::Error {
io::Error::new(io::ErrorKind::Other, self)
impl From<DBError> for io::Error {
fn from(err: DBError) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}
}

Expand Down
138 changes: 62 additions & 76 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fs::File;
use std::io::{BufReader, Read, Write};
use std::ops::Deref;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::Path;
use std::sync::Arc;
use std::{fmt, io};
Expand Down Expand Up @@ -53,31 +52,26 @@ impl Store {
Store { storage }
}

pub fn get(&self, column: DBCol, key: &[u8]) -> Result<Option<Vec<u8>>, io::Error> {
self.storage.get(column, key).map_err(|e| e.into())
pub fn get(&self, column: DBCol, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
self.storage.get(column, key).map_err(io::Error::from)
}

pub fn get_ser<T: BorshDeserialize>(
&self,
column: DBCol,
key: &[u8],
) -> Result<Option<T>, io::Error> {
match self.storage.get(column, key) {
Ok(Some(bytes)) => match T::try_from_slice(bytes.as_ref()) {
pub fn get_ser<T: BorshDeserialize>(&self, column: DBCol, key: &[u8]) -> io::Result<Option<T>> {
match self.get(column, key)? {
Some(bytes) => match T::try_from_slice(&bytes) {
Ok(result) => Ok(Some(result)),
Err(e) => Err(e),
},
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
None => Ok(None),
}
}

pub fn exists(&self, column: DBCol, key: &[u8]) -> Result<bool, io::Error> {
self.storage.get(column, key).map(|value| value.is_some()).map_err(|e| e.into())
pub fn exists(&self, column: DBCol, key: &[u8]) -> io::Result<bool> {
self.get(column, key).map(|value| value.is_some())
}

pub fn store_update(&self) -> StoreUpdate {
StoreUpdate::new(self.storage.clone())
StoreUpdate::new(Arc::clone(&self.storage))
}

pub fn iter<'a>(
Expand Down Expand Up @@ -106,16 +100,15 @@ impl Store {
&'a self,
column: DBCol,
key_prefix: &'a [u8],
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, T), io::Error>> + 'a> {
Box::new(
self.storage
.iter_prefix(column, key_prefix)
.map(|(key, value)| Ok((key.to_vec(), T::try_from_slice(value.as_ref())?))),
)
) -> impl Iterator<Item = io::Result<(Box<[u8]>, T)>> + 'a {
self.storage
.iter_prefix(column, key_prefix)
.map(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?)))
}

pub fn save_to_file(&self, column: DBCol, filename: &Path) -> Result<(), std::io::Error> {
let mut file = File::create(filename)?;
pub fn save_to_file(&self, column: DBCol, filename: &Path) -> io::Result<()> {
let file = File::create(filename)?;
let mut file = BufWriter::new(file);
for (key, value) in self.storage.iter_without_rc_logic(column) {
file.write_u32::<LittleEndian>(key.len() as u32)?;
file.write_all(&key)?;
Expand All @@ -125,7 +118,7 @@ impl Store {
Ok(())
}

pub fn load_from_file(&self, column: DBCol, filename: &Path) -> Result<(), std::io::Error> {
pub fn load_from_file(&self, column: DBCol, filename: &Path) -> io::Result<()> {
let file = File::open(filename)?;
let mut file = BufReader::new(file);
let mut transaction = self.storage.transaction();
Expand All @@ -134,7 +127,7 @@ impl Store {
loop {
let key_len = match file.read_u32::<LittleEndian>() {
Ok(key_len) => key_len as usize,
Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
Err(err) => return Err(err),
};
key.resize(key_len, 0);
Expand All @@ -146,7 +139,7 @@ impl Store {

transaction.put(column, &key, &value);
}
self.storage.write(transaction).map_err(|e| e.into())
self.storage.write(transaction).map_err(io::Error::from)
}

pub fn get_rocksdb(&self) -> Option<&RocksDB> {
Expand All @@ -173,7 +166,7 @@ impl StoreUpdate {
}

pub fn new_with_tries(tries: ShardTries) -> Self {
let storage = tries.get_store().storage.clone();
let storage = Arc::clone(&tries.get_store().storage);
let transaction = storage.transaction();
StoreUpdate { storage, transaction, tries: Some(tries) }
}
Expand All @@ -193,7 +186,7 @@ impl StoreUpdate {
column: DBCol,
key: &[u8],
value: &T,
) -> Result<(), io::Error> {
) -> io::Result<()> {
debug_assert!(!column.is_rc());
let data = value.try_to_vec()?;
self.set(column, key, &data);
Expand All @@ -210,19 +203,17 @@ impl StoreUpdate {

/// Merge another store update into this one.
pub fn merge(&mut self, other: StoreUpdate) {
if let Some(tries) = other.tries {
if self.tries.is_none() {
self.tries = Some(tries);
} else {
debug_assert!(self.tries.as_ref().unwrap().is_same(&tries));
}
match (&self.tries, other.tries) {
(None | Some(_), None) => (),
(None, Some(tries)) => self.tries = Some(tries),
(Some(t1), Some(t2)) => debug_assert!(t1.is_same(&t2)),
}

self.merge_transaction(other.transaction);
}

/// Merge DB Transaction.
pub fn merge_transaction(&mut self, transaction: DBTransaction) {
fn merge_transaction(&mut self, transaction: DBTransaction) {
for op in transaction.ops {
match op {
DBOp::Insert { col, key, value } => self.transaction.put(col, &key, &value),
Expand All @@ -235,18 +226,18 @@ impl StoreUpdate {
}
}

pub fn commit(self) -> Result<(), io::Error> {
pub fn commit(self) -> io::Result<()> {
debug_assert!(
{
let non_refcount_keys = self
.transaction
.ops
.iter()
.filter_map(|op| match op {
DBOp::Insert { col, key, .. } => Some((*col as u8, key)),
DBOp::Delete { col, key } => Some((*col as u8, key)),
DBOp::UpdateRefcount { .. } => None,
DBOp::DeleteAll { .. } => None,
DBOp::Insert { col, key, .. } | DBOp::Delete { col, key } => {
Some((*col as u8, key))
}
DBOp::UpdateRefcount { .. } | DBOp::DeleteAll { .. } => None,
})
.collect::<Vec<_>>();
non_refcount_keys.len()
Expand All @@ -256,13 +247,13 @@ impl StoreUpdate {
self
);
if let Some(tries) = self.tries {
assert_eq!(
tries.get_store().storage.deref() as *const _,
self.storage.deref() as *const _
);
// Note: avoid comparing wide pointers here to work-around
// https://github.com/rust-lang/rust/issues/69757
let addr = |arc| Arc::as_ptr(arc) as *const u8;
assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),);
tries.update_cache(&self.transaction)?;
}
self.storage.write(self.transaction).map_err(|e| e.into())
self.storage.write(self.transaction).map_err(io::Error::from)
}
}

Expand All @@ -289,22 +280,18 @@ pub fn read_with_cache<'a, T: BorshDeserialize + 'a>(
cache: &'a mut LruCache<Vec<u8>, T>,
key: &[u8],
) -> io::Result<Option<&'a T>> {
let key_vec = key.to_vec();
if cache.get(&key_vec).is_some() {
return Ok(Some(cache.get(&key_vec).unwrap()));
// Note: Due to `&mut -> &` conversions, it's not possible to avoid double
// hash map lookups here.
if cache.contains(key) {
return Ok(cache.get(key));
}
if let Some(result) = storage.get_ser(col, key)? {
cache.put(key.to_vec(), result);
return Ok(cache.get(&key_vec));
return Ok(cache.get(key));
}
Ok(None)
}

pub fn create_store(path: &Path) -> Store {
let db = Arc::new(RocksDB::new(path).expect("Failed to open the database"));
Store::new(db)
}

#[derive(Default, Debug)]
pub struct StoreConfig {
/// Attempted writes to the DB will fail. Doesn't require a `LOCK` file.
Expand All @@ -314,17 +301,19 @@ pub struct StoreConfig {
pub enable_statistics: bool,
}

pub fn create_store(path: &Path) -> Store {
create_store_with_config(path, StoreConfig::default())
}

pub fn create_store_with_config(path: &Path, store_config: StoreConfig) -> Store {
let mut opts = RocksDBOptions::default();
if store_config.enable_statistics {
opts = opts.enable_statistics();
}

let db = Arc::new(
(if store_config.read_only { opts.read_only(path) } else { opts.read_write(path) })
.expect("Failed to open the database"),
);
Store::new(db)
let db = if store_config.read_only { opts.read_only(path) } else { opts.read_write(path) }
.expect("Failed to open the database");
Store::new(Arc::new(db))
}

/// Reads an object from Trie.
Expand All @@ -334,18 +323,15 @@ pub fn get<T: BorshDeserialize>(
state_update: &TrieUpdate,
key: &TrieKey,
) -> Result<Option<T>, StorageError> {
state_update.get(key).and_then(|opt| {
opt.map_or_else(
|| Ok(None),
|data| {
T::try_from_slice(&data)
.map_err(|_| {
StorageError::StorageInconsistentState("Failed to deserialize".to_string())
})
.map(Some)
},
)
})
match state_update.get(key)? {
None => Ok(None),
Some(data) => match T::try_from_slice(&data) {
Err(_err) => {
Err(StorageError::StorageInconsistentState("Failed to deserialize".to_string()))
}
Ok(value) => Ok(Some(value)),
},
}
}

/// Writes an object into Trie.
Expand Down Expand Up @@ -509,11 +495,11 @@ pub fn remove_account(
Ok(())
}

pub fn get_genesis_state_roots(store: &Store) -> Result<Option<Vec<StateRoot>>, std::io::Error> {
pub fn get_genesis_state_roots(store: &Store) -> io::Result<Option<Vec<StateRoot>>> {
store.get_ser::<Vec<StateRoot>>(DBCol::ColBlockMisc, GENESIS_STATE_ROOTS_KEY)
}

pub fn get_genesis_hash(store: &Store) -> Result<Option<CryptoHash>, std::io::Error> {
pub fn get_genesis_hash(store: &Store) -> io::Result<Option<CryptoHash>> {
store.get_ser::<CryptoHash>(DBCol::ColBlockMisc, GENESIS_JSON_HASH_KEY)
}

Expand All @@ -538,13 +524,13 @@ pub struct StoreCompiledContractCache {
/// Key must take into account VM being used and its configuration, so that
/// we don't cache non-gas metered binaries, for example.
impl CompiledContractCache for StoreCompiledContractCache {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> {
fn put(&self, key: &[u8], value: &[u8]) -> io::Result<()> {
let mut store_update = self.store.store_update();
store_update.set(DBCol::ColCachedContractCode, key, value);
store_update.commit()
}

fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
self.store.get(DBCol::ColCachedContractCode, key)
}
}
Expand Down

0 comments on commit a89da7f

Please sign in to comment.