Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor database-related structs and modules #69

Merged
merged 5 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use itertools::Itertools;
use thiserror::Error;

#[derive(Clone, Debug)]

pub struct VecDisplay<T: Display>(pub Vec<T>);
impl<T: Display> Display for VecDisplay<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}]", self.0.iter().map(|item| item.to_string()).join(", "))
}
}

#[derive(Clone, Debug)]
pub struct TwoDimVecDisplay<T: Display + Clone>(pub Vec<Vec<T>>);
impl<T: Display + Clone> Display for TwoDimVecDisplay<T> {
Expand Down
73 changes: 54 additions & 19 deletions consensus/src/model/stores/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ use parking_lot::RwLock;
use rand::Rng;
use rocksdb::WriteBatch;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use std::{fmt::Display, str, sync::Arc};

const SEP: u8 = b'/';

struct DbKey {
#[derive(Debug, Clone)]
pub struct DbKey {
path: Vec<u8>,
}

impl DbKey {
fn new<TKey: Copy + AsRef<[u8]>>(prefix: &[u8], key: TKey) -> Self {
pub fn new<TKey: Copy + AsRef<[u8]>>(prefix: &[u8], key: TKey) -> Self {
Self { path: prefix.iter().chain(std::iter::once(&SEP)).chain(key.as_ref().iter()).copied().collect() }
}

pub fn prefix_only(prefix: &[u8]) -> Self {
Self::new(prefix, b"")
}
}

impl AsRef<[u8]> for DbKey {
Expand All @@ -24,6 +29,16 @@ impl AsRef<[u8]> for DbKey {
}
}

impl Display for DbKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let pos = self.path.len() - 1 - self.path.iter().rev().position(|c| *c == SEP).unwrap(); // Find the last position of `SEP`
michaelsutton marked this conversation as resolved.
Show resolved Hide resolved
let (prefix, key) = (&self.path[..pos], &self.path[pos + 1..]);
f.write_str(str::from_utf8(prefix).unwrap_or("{cannot display prefix}"))?;
f.write_str("/")?;
f.write_str(&faster_hex::hex_string(key))
}
}

#[derive(Clone)]
pub struct Cache<TKey: Clone + std::hash::Hash + Eq + Send + Sync, TData: Clone + Send + Sync> {
map: Arc<RwLock<IndexMap<TKey, TData>>>, // We use IndexMap and not HashMap, because it makes it cheaper to remove a random element when the cache is full.
Expand Down Expand Up @@ -132,12 +147,15 @@ where
{
if let Some(data) = self.cache.get(&key) {
Ok(data)
} else if let Some(slice) = self.db.get_pinned(DbKey::new(self.prefix, key))? {
let data: Arc<TData> = Arc::new(bincode::deserialize(&slice)?);
self.cache.insert(key, Arc::clone(&data));
Ok(data)
} else {
Err(StoreError::KeyNotFound(key.to_string()))
let db_key = DbKey::new(self.prefix, key);
if let Some(slice) = self.db.get_pinned(&db_key)? {
let data: Arc<TData> = Arc::new(bincode::deserialize(&slice)?);
self.cache.insert(key, Arc::clone(&data));
Ok(data)
} else {
Err(StoreError::KeyNotFound(db_key))
}
}
}

Expand Down Expand Up @@ -247,12 +265,15 @@ where
{
if let Some(data) = self.cache.get(&key) {
Ok(data)
} else if let Some(slice) = self.db.get_pinned(DbKey::new(self.prefix, key))? {
let data: TData = bincode::deserialize(&slice)?;
self.cache.insert(key, data);
Ok(data)
} else {
Err(StoreError::KeyNotFound(key.to_string()))
let db_key = DbKey::new(self.prefix, key);
if let Some(slice) = self.db.get_pinned(&db_key)? {
let data: TData = bincode::deserialize(&slice)?;
self.cache.insert(key, data);
Ok(data)
} else {
Err(StoreError::KeyNotFound(db_key))
}
}
}

Expand Down Expand Up @@ -304,9 +325,7 @@ impl<T> CachedDbItem<T> {
*self.cached_item.write() = Some(item.clone());
Ok(item)
} else {
Err(StoreError::KeyNotFound(
String::from_utf8(Vec::from(self.key)).unwrap_or_else(|k| "cannot parse key to utf8".to_string()),
))
Err(StoreError::KeyNotFound(DbKey::prefix_only(self.key)))
}
}

Expand Down Expand Up @@ -342,9 +361,7 @@ impl<T> CachedDbItem<T> {
let item: T = bincode::deserialize(&slice)?;
item
} else {
return Err(StoreError::KeyNotFound(
String::from_utf8(Vec::from(self.key)).unwrap_or_else(|k| "cannot parse key to utf8".to_string()),
));
return Err(StoreError::KeyNotFound(DbKey::prefix_only(self.key)));
};

item = op(item); // Apply the update op
Expand Down Expand Up @@ -414,3 +431,21 @@ impl DbWriter for BatchDbWriter<'_> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use hashes::Hash;

#[test]
fn test_db_key_display() {
let key1 = DbKey::new(b"human-readable", Hash::from_u64_word(34567890));
let key2 = DbKey::new(&[1, 2, 2, 89], Hash::from_u64_word(345690));
let key3 = DbKey::prefix_only(&[1, 2, 2, 89]);
let key4 = DbKey::prefix_only(b"direct-prefix");
println!("{}", key1);
michaelsutton marked this conversation as resolved.
Show resolved Hide resolved
println!("{}", key2);
println!("{}", key3);
println!("{}", key4);
}
}
11 changes: 6 additions & 5 deletions consensus/src/model/stores/errors.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use super::caching::DbKey;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum StoreError {
#[error("key not found in store")]
KeyNotFound(String),
#[error("key {0} not found in store")]
KeyNotFound(DbKey),

#[error("key already exists in store")]
#[error("key {0} already exists in store")]
KeyAlreadyExists(String),

#[error("rocksdb error")]
#[error("rocksdb error {0}")]
DbError(#[from] rocksdb::Error),

#[error("bincode error")]
#[error("bincode error {0}")]
DeserializationError(#[from] Box<bincode::ErrorKind>),
}

Expand Down
16 changes: 8 additions & 8 deletions consensus/src/model/stores/ghostdag.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::processes::ghostdag::ordering::SortableBlock;

use super::caching::CachedDbAccessForCopy;
use super::caching::{CachedDbAccessForCopy, DbKey};
use super::{caching::CachedDbAccess, errors::StoreError, DB};
use consensus_core::BlockHashMap;
use consensus_core::{blockhash::BlockHashes, BlueWorkType};
Expand Down Expand Up @@ -353,48 +353,48 @@ impl GhostdagStoreReader for MemoryGhostdagStore {
fn get_blue_score(&self, hash: Hash) -> Result<u64, StoreError> {
match self.blue_score_map.borrow().get(&hash) {
Some(blue_score) => Ok(*blue_score),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_blue_work(&self, hash: Hash) -> Result<BlueWorkType, StoreError> {
match self.blue_work_map.borrow().get(&hash) {
Some(blue_work) => Ok(*blue_work),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_selected_parent(&self, hash: Hash) -> Result<Hash, StoreError> {
match self.selected_parent_map.borrow().get(&hash) {
Some(selected_parent) => Ok(*selected_parent),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_mergeset_blues(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
match self.mergeset_blues_map.borrow().get(&hash) {
Some(mergeset_blues) => Ok(BlockHashes::clone(mergeset_blues)),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_mergeset_reds(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
match self.mergeset_reds_map.borrow().get(&hash) {
Some(mergeset_reds) => Ok(BlockHashes::clone(mergeset_reds)),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_blues_anticone_sizes(&self, hash: Hash) -> Result<HashKTypeMap, StoreError> {
match self.blues_anticone_sizes_map.borrow().get(&hash) {
Some(sizes) => Ok(HashKTypeMap::clone(sizes)),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_data(&self, hash: Hash) -> Result<Arc<GhostdagData>, StoreError> {
if !self.has(hash)? {
return Err(StoreError::KeyNotFound(hash.to_string()));
return Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash)));
}
Ok(Arc::new(GhostdagData::new(
self.blue_score_map.borrow()[&hash],
Expand Down
13 changes: 9 additions & 4 deletions consensus/src/model/stores/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use rocksdb::WriteBatch;
use serde::{Deserialize, Serialize};
use std::{collections::hash_map::Entry::Vacant, sync::Arc};

use super::{caching::CachedDbAccess, caching::CachedDbItem, errors::StoreError, DB};
use super::{
caching::CachedDbAccess,
caching::{CachedDbItem, DbKey},
errors::StoreError,
DB,
};
use crate::processes::reachability::interval::Interval;
use hashes::Hash;

Expand Down Expand Up @@ -319,14 +324,14 @@ impl MemoryReachabilityStore {
fn get_data_mut(&mut self, hash: Hash) -> Result<&mut ReachabilityData, StoreError> {
match self.map.get_mut(&hash) {
Some(data) => Ok(data),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}

fn get_data(&self, hash: Hash) -> Result<&ReachabilityData, StoreError> {
match self.map.get(&hash) {
Some(data) => Ok(data),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(STORE_PREFIX, hash))),
}
}
}
Expand Down Expand Up @@ -377,7 +382,7 @@ impl ReachabilityStore for MemoryReachabilityStore {
fn get_reindex_root(&self) -> Result<Hash, StoreError> {
match self.reindex_root {
Some(root) => Ok(root),
None => Err(StoreError::KeyNotFound("reindex root".to_string())),
None => Err(StoreError::KeyNotFound(DbKey::prefix_only(REINDEX_ROOT_KEY))),
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use super::{caching::CachedDbAccess, errors::StoreError, DB};
use super::{
caching::{CachedDbAccess, DbKey},
errors::StoreError,
DB,
};
use consensus_core::{blockhash::BlockHashes, BlockHashMap};
use hashes::Hash;
use parking_lot::{RwLock, RwLockWriteGuard};
Expand Down Expand Up @@ -154,14 +158,14 @@ impl RelationsStoreReader for MemoryRelationsStore {
fn get_parents(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
match self.parents_map.get(&hash) {
Some(parents) => Ok(BlockHashes::clone(parents)),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(PARENTS_PREFIX, hash))),
}
}

fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
match self.children_map.get(&hash) {
Some(children) => Ok(BlockHashes::clone(children)),
None => Err(StoreError::KeyNotFound(hash.to_string())),
None => Err(StoreError::KeyNotFound(DbKey::new(CHILDREN_PREFIX, hash))),
}
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<S: GhostdagStoreReader, T: ReachabilityStoreReader, U: HeaderStoreReader, V
self.reachability_service.is_chain_ancestor_of(current_pruning_point, ghostdag_data.selected_parent);

// Note: the pruning point from the POV of the current block is the first block in its chain that is in depth of self.pruning_depth and
// its finality score is greater than the previous pruning point. This is why the diff between finality_score(selected_parent.blue_score + 1) * finality_interval
// its finality score is greater than the previous pruning point. This is why if the diff between finality_score(selected_parent.blue_score + 1) * finality_interval
// and the current block blue score is less than self.pruning_depth we can know for sure that this block didn't trigger a pruning point change.
let min_required_blue_score_for_next_pruning_point = (self.finality_score(sp_header_pp_blue_score) + 1) * self.finality_depth;
let next_or_current_pp = if has_pruning_point_in_its_selected_chain
Expand Down