Skip to content

Commit

Permalink
Fixes a bug with deleting level relations during pruning (#210)
Browse files Browse the repository at this point in the history
* compile-time safety for detecting batch writer

* reproduce zero cache delete bug with a simple test

* refactor and use staging relations store for delete_level_relations algo

* comments

* remove trait duplication
  • Loading branch information
michaelsutton authored Jun 23, 2023
1 parent 3b5fc26 commit e4770c7
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 48 deletions.
33 changes: 15 additions & 18 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use itertools::Itertools;
use kaspa_consensus_core::BlockHashSet;
use kaspa_consensus_core::{blockhash::BlockHashes, BlockHashMap, BlockHasher, BlockLevel, HashMapCustomHasher};
use kaspa_database::prelude::MemoryWriter;
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::DB;
use kaspa_database::prelude::{BatchDbWriter, DbWriter};
use kaspa_database::prelude::{CachedDbAccess, DbKey, DirectDbWriter};
use kaspa_database::prelude::{DirectWriter, MemoryWriter};
use kaspa_database::registry::{DatabaseStorePrefixes, SEPARATOR};
use kaspa_hashes::Hash;
use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard};
use rocksdb::WriteBatch;
use std::sync::Arc;

Expand All @@ -24,7 +23,7 @@ pub trait RelationsStoreReader {

/// Low-level write API for `RelationsStore`
pub trait RelationsStore: RelationsStoreReader {
type DefaultWriter: DbWriter;
type DefaultWriter: DirectWriter;
fn default_writer(&self) -> Self::DefaultWriter;

fn set_parents(&mut self, writer: impl DbWriter, hash: Hash, parents: BlockHashes) -> Result<(), StoreError>;
Expand Down Expand Up @@ -109,34 +108,32 @@ impl RelationsStore for DbRelationsStore {
}

pub struct StagingRelationsStore<'a> {
store_read: RwLockUpgradableReadGuard<'a, DbRelationsStore>,
store: &'a mut DbRelationsStore,
staging_parents_writes: BlockHashMap<BlockHashes>,
staging_children_writes: BlockHashMap<BlockHashes>,
staging_deletions: BlockHashSet,
}

impl<'a> StagingRelationsStore<'a> {
pub fn new(store_read: RwLockUpgradableReadGuard<'a, DbRelationsStore>) -> Self {
pub fn new(store: &'a mut DbRelationsStore) -> Self {
Self {
store_read,
store,
staging_parents_writes: Default::default(),
staging_children_writes: Default::default(),
staging_deletions: Default::default(),
}
}

pub fn commit(self, batch: &mut WriteBatch) -> Result<RwLockWriteGuard<'a, DbRelationsStore>, StoreError> {
let store_write = RwLockUpgradableReadGuard::upgrade(self.store_read);
pub fn commit(self, batch: &mut WriteBatch) -> Result<(), StoreError> {
for (k, v) in self.staging_parents_writes {
store_write.parents_access.write(BatchDbWriter::new(batch), k, v)?
self.store.parents_access.write(BatchDbWriter::new(batch), k, v)?
}
for (k, v) in self.staging_children_writes {
store_write.children_access.write(BatchDbWriter::new(batch), k, v)?
self.store.children_access.write(BatchDbWriter::new(batch), k, v)?
}
// Deletions always come after mutations
store_write.parents_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?;
store_write.children_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?;
Ok(store_write)
self.store.parents_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())?;
self.store.children_access.delete_many(BatchDbWriter::new(batch), &mut self.staging_deletions.iter().copied())
}

fn check_not_in_deletions(&self, hash: Hash) -> Result<(), StoreError> {
Expand Down Expand Up @@ -179,7 +176,7 @@ impl RelationsStoreReader for StagingRelationsStore<'_> {
if let Some(data) = self.staging_parents_writes.get(&hash) {
Ok(BlockHashes::clone(data))
} else {
self.store_read.get_parents(hash)
self.store.get_parents(hash)
}
}

Expand All @@ -188,20 +185,20 @@ impl RelationsStoreReader for StagingRelationsStore<'_> {
if let Some(data) = self.staging_children_writes.get(&hash) {
Ok(BlockHashes::clone(data))
} else {
self.store_read.get_children(hash)
self.store.get_children(hash)
}
}

fn has(&self, hash: Hash) -> Result<bool, StoreError> {
if self.staging_deletions.contains(&hash) {
return Ok(false);
}
Ok(self.staging_parents_writes.contains_key(&hash) || self.store_read.has(hash)?)
Ok(self.staging_parents_writes.contains_key(&hash) || self.store.has(hash)?)
}

fn counts(&self) -> Result<(usize, usize), StoreError> {
Ok((
self.store_read
self.store
.parents_access
.iterator()
.map(|r| r.unwrap().0)
Expand All @@ -211,7 +208,7 @@ impl RelationsStoreReader for StagingRelationsStore<'_> {
.collect::<BlockHashSet>()
.difference(&self.staging_deletions)
.count(),
self.store_read
self.store
.children_access
.iterator()
.map(|r| r.unwrap().0)
Expand Down
9 changes: 6 additions & 3 deletions consensus/src/pipeline/pruning_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ impl PruningProcessor {
if !keep_blocks.contains(&current) {
let mut batch = WriteBatch::default();
let mut level_relations_write = self.relations_stores.write();
let mut staging_relations = StagingRelationsStore::new(self.reachability_relations_store.upgradable_read());
let mut reachability_relations_write = self.reachability_relations_store.write();
let mut staging_relations = StagingRelationsStore::new(&mut reachability_relations_write);
let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
let mut statuses_write = self.statuses_store.write();

Expand All @@ -370,8 +371,10 @@ impl PruningProcessor {
// TODO: consider adding block level to compact header data
let block_level = self.headers_store.get_header_with_block_level(current).unwrap().block_level;
(0..=block_level as usize).for_each(|level| {
relations::delete_level_relations(BatchDbWriter::new(&mut batch), &mut level_relations_write[level], current)
let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[level]);
relations::delete_level_relations(MemoryWriter::default(), &mut staging_level_relations, current)
.unwrap_option();
staging_level_relations.commit(&mut batch).unwrap();
self.ghostdag_stores[level].delete_batch(&mut batch, current).unwrap_option();
});

Expand All @@ -388,7 +391,7 @@ impl PruningProcessor {
}

let reachability_write = staging_reachability.commit(&mut batch).unwrap();
let reachability_relations_write = staging_relations.commit(&mut batch).unwrap();
staging_relations.commit(&mut batch).unwrap();

// Flush the batch to the DB
self.db.write(batch).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl VirtualStateProcessor {
// All blocks with lower blue work than filtering_root are:
// 1. not in its future (bcs blue work is monotonic),
// 2. will be removed eventually by the bounded merge check.
// So we prefer doing it in advance to allow better tips to be considered.
// Hence as an optimization we prefer removing such blocks in advance to allow valid tips to be considered.
let filtering_root = self.depth_store.merge_depth_root(candidate).unwrap();
let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default();
return (
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/processes/block_depth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl<S: DepthStoreReader, U: ReachabilityStoreReader, V: GhostdagStoreReader> Bl
};

// In this case we expect the pruning point or a block above it to be the block at depth.
// Note that above we already verified the chain and distance conditions for this
// Note that above we already verified the chain and distance conditions for this.
// Additionally observe that if `current` is a valid hash it must not be pruned for the same reason.
if current == ORIGIN {
current = pruning_point;
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,9 @@ impl PruningProofManager {

// Prepare batch
let mut batch = WriteBatch::default();
let mut reachability_relations_write = self.reachability_relations_store.write();
let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
let mut staging_reachability_relations = StagingRelationsStore::new(self.reachability_relations_store.upgradable_read());
let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write);

// Stage
staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap();
Expand All @@ -340,7 +341,7 @@ impl PruningProofManager {

// Commit
let reachability_write = staging_reachability.commit(&mut batch).unwrap();
let reachability_relations_write = staging_reachability_relations.commit(&mut batch).unwrap();
staging_reachability_relations.commit(&mut batch).unwrap();

// Write
self.db.write(batch).unwrap();
Expand Down
13 changes: 8 additions & 5 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ mod tests {

// Add blocks via a staging store
{
let mut relations_write = relations.write();
let mut staging_reachability = StagingReachabilityStore::new(reachability.upgradable_read());
let mut staging_relations = StagingRelationsStore::new(relations.upgradable_read());
let mut staging_relations = StagingRelationsStore::new(&mut relations_write);
let mut builder = DagBuilder::new(&mut staging_reachability, &mut staging_relations);
builder.init();
builder.add_block(DagBlock::new(test.genesis.into(), vec![ORIGIN]));
Expand All @@ -404,7 +405,7 @@ mod tests {
{
let mut batch = WriteBatch::default();
let reachability_write = staging_reachability.commit(&mut batch).unwrap();
let relations_write = staging_relations.commit(&mut batch).unwrap();
staging_relations.commit(&mut batch).unwrap();
db.write(batch).unwrap();
drop(reachability_write);
drop(relations_write);
Expand Down Expand Up @@ -443,8 +444,9 @@ mod tests {
drop(relations_read);

let mut batch = WriteBatch::default();
let mut relations_write = relations.write();
let mut staging_reachability = StagingReachabilityStore::new(reachability.upgradable_read());
let mut staging_relations = StagingRelationsStore::new(relations.upgradable_read());
let mut staging_relations = StagingRelationsStore::new(&mut relations_write);

for (i, block) in
test.ids().choose_multiple(&mut rand::thread_rng(), test.blocks.len()).into_iter().chain(once(test.genesis)).enumerate()
Expand All @@ -460,7 +462,7 @@ mod tests {
// Commit the staging changes
{
let reachability_write = staging_reachability.commit(&mut batch).unwrap();
let relations_write = staging_relations.commit(&mut batch).unwrap();
staging_relations.commit(&mut batch).unwrap();
db.write(batch).unwrap();
drop(reachability_write);
drop(relations_write);
Expand All @@ -483,8 +485,9 @@ mod tests {

// Recapture staging stores
batch = WriteBatch::default();
relations_write = relations.write();
staging_reachability = StagingReachabilityStore::new(reachability.upgradable_read());
staging_relations = StagingRelationsStore::new(relations.upgradable_read());
staging_relations = StagingRelationsStore::new(&mut relations_write);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/reachability/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use kaspa_consensus_core::{
blockhash::{BlockHashExtensions, BlockHashes, ORIGIN},
BlockHashMap, BlockHashSet,
};
use kaspa_database::prelude::{DbWriter, StoreError};
use kaspa_database::prelude::{DirectWriter, StoreError};
use kaspa_hashes::Hash;
use std::collections::{
hash_map::Entry::{Occupied, Vacant},
Expand Down Expand Up @@ -120,7 +120,7 @@ impl<'a, T: ReachabilityStore + ?Sized, S: RelationsStore + ?Sized> DagBuilder<'
self.delete_block_with_writer(self.relations.default_writer(), hash)
}

pub fn delete_block_with_writer(&mut self, writer: impl DbWriter, hash: Hash) -> &mut Self {
pub fn delete_block_with_writer(&mut self, writer: impl DirectWriter, hash: Hash) -> &mut Self {
let mergeset = delete_reachability_relations(writer, self.relations, self.reachability, hash);
delete_block(self.reachability, hash, &mut mergeset.iter().cloned()).unwrap();
self
Expand Down
63 changes: 58 additions & 5 deletions consensus/src/processes/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use kaspa_consensus_core::{
blockhash::{BlockHashIteratorExtensions, BlockHashes, ORIGIN},
BlockHashSet,
};
use kaspa_database::prelude::{BatchDbWriter, DbWriter, StoreError};
use kaspa_database::prelude::{BatchDbWriter, DbWriter, DirectWriter, StoreError};
use kaspa_hashes::Hash;
use rocksdb::WriteBatch;

Expand All @@ -21,9 +21,12 @@ pub fn init<S: RelationsStore + ?Sized>(relations: &mut S) {
/// kept topologically continuous. If any child of this `hash` will remain with no parent, we make
/// sure to connect it to `origin`. Note that apart from the special case of `origin`, these relations
/// are always a subset of the original header relations for this level.
///
/// NOTE: this algorithm does not support a batch writer bcs it might write to the same entry multiple times
/// (and writes will not accumulate if the entry gets out of the cache in between the calls)
pub fn delete_level_relations<W, S>(mut writer: W, relations: &mut S, hash: Hash) -> Result<(), StoreError>
where
W: DbWriter,
W: DirectWriter,
S: RelationsStore + ?Sized,
{
let children = relations.get_children(hash)?; // if the first entry was found, we expect all others as well, hence we unwrap below
Expand All @@ -46,12 +49,10 @@ where
/// (and writes will not accumulate if the entry gets out of the cache in between the calls)
pub fn delete_reachability_relations<W, S, U>(mut writer: W, relations: &mut S, reachability: &U, hash: Hash) -> BlockHashSet
where
W: DbWriter,
W: DirectWriter,
S: RelationsStore + ?Sized,
U: ReachabilityService + ?Sized,
{
assert!(!W::IS_BATCH, "batch writes are not supported for this algo, see doc.");

let selected_parent = reachability.get_chain_parent(hash);
let parents = relations.get_parents(hash).unwrap();
let children = relations.get_children(hash).unwrap();
Expand Down Expand Up @@ -157,3 +158,55 @@ pub trait RelationsStoreExtensions: RelationsStore {
}

impl<S: RelationsStore + ?Sized> RelationsStoreExtensions for S {}

#[cfg(test)]
mod tests {
use super::*;
use crate::model::stores::relations::{DbRelationsStore, RelationsStoreReader, StagingRelationsStore};
use kaspa_core::assert_match;
use kaspa_database::{prelude::MemoryWriter, utils::create_temp_db};
use std::sync::Arc;

#[test]
fn test_delete_level_relations_zero_cache() {
let (_lifetime, db) = create_temp_db();
let cache_size = 0;
let mut relations = DbRelationsStore::new(db.clone(), 0, cache_size);
relations.insert(ORIGIN, Default::default()).unwrap();
relations.insert(1.into(), Arc::new(vec![ORIGIN])).unwrap();
relations.insert(2.into(), Arc::new(vec![1.into()])).unwrap();

assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []);
assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [1.into()]);
assert_eq!(relations.get_parents(1.into()).unwrap().as_slice(), [ORIGIN]);
assert_eq!(relations.get_children(1.into()).unwrap().as_slice(), [2.into()]);
assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [1.into()]);
assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []);

let mut batch = WriteBatch::default();
let mut staging_relations = StagingRelationsStore::new(&mut relations);
delete_level_relations(MemoryWriter::default(), &mut staging_relations, 1.into()).unwrap();
staging_relations.commit(&mut batch).unwrap();
db.write(batch).unwrap();

assert_match!(relations.get_parents(1.into()), Err(StoreError::KeyNotFound(_)));
assert_match!(relations.get_children(1.into()), Err(StoreError::KeyNotFound(_)));

assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []);
assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [2.into()]);
assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [ORIGIN]);
assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []);

let mut batch = WriteBatch::default();
let mut staging_relations = StagingRelationsStore::new(&mut relations);
delete_level_relations(MemoryWriter::default(), &mut staging_relations, 2.into()).unwrap();
staging_relations.commit(&mut batch).unwrap();
db.write(batch).unwrap();

assert_match!(relations.get_parents(2.into()), Err(StoreError::KeyNotFound(_)));
assert_match!(relations.get_children(2.into()), Err(StoreError::KeyNotFound(_)));

assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []);
assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), []);
}
}
2 changes: 1 addition & 1 deletion database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod prelude {
pub use super::cache::Cache;
pub use super::item::CachedDbItem;
pub use super::key::DbKey;
pub use super::writer::{BatchDbWriter, DbWriter, DirectDbWriter, MemoryWriter};
pub use super::writer::{BatchDbWriter, DbWriter, DirectDbWriter, DirectWriter, MemoryWriter};
pub use db::{delete_db, open_db, DB};
pub use errors::{StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions};
}
Loading

0 comments on commit e4770c7

Please sign in to comment.