diff --git a/consensus/src/pipeline/pruning_processor/processor.rs b/consensus/src/pipeline/pruning_processor/processor.rs index a537b47..e5ff89e 100644 --- a/consensus/src/pipeline/pruning_processor/processor.rs +++ b/consensus/src/pipeline/pruning_processor/processor.rs @@ -2,7 +2,7 @@ use crate::{ consensus::{ - services::{ConsensusServices, DbGhostdagManager, DbPruningPointManager}, + services::{ConsensusServices, DbGhostdagManager, DbParentsManager, DbPruningPointManager}, storage::ConsensusStorage, }, model::{ @@ -33,7 +33,7 @@ use spectre_consensus_core::{ muhash::MuHashExtensions, pruning::{PruningPointProof, PruningPointTrustedData}, trusted::ExternalGhostdagData, - BlockHashSet, + BlockHashMap, BlockHashSet, BlockLevel, }; use spectre_consensusmanager::SessionLock; use spectre_core::{debug, info, warn}; @@ -42,7 +42,7 @@ use spectre_hashes::Hash; use spectre_muhash::MuHash; use spectre_utils::iter::IterExtensions; use std::{ - collections::VecDeque, + collections::{hash_map::Entry::Vacant, VecDeque}, ops::Deref, sync::{ atomic::{AtomicBool, Ordering}, @@ -72,6 +72,7 @@ pub struct PruningProcessor { ghostdag_managers: Arc>, pruning_point_manager: DbPruningPointManager, pruning_proof_manager: Arc, + parents_manager: DbParentsManager, // Pruning lock pruning_lock: SessionLock, @@ -109,6 +110,7 @@ impl PruningProcessor { ghostdag_managers: services.ghostdag_managers.clone(), pruning_point_manager: services.pruning_point_manager.clone(), pruning_proof_manager: services.pruning_proof_manager.clone(), + parents_manager: services.parents_manager.clone(), pruning_lock, config, is_consensus_exiting, @@ -262,41 +264,35 @@ impl PruningProcessor { // We keep full data for pruning point and its anticone, relations for DAA/GD // windows and pruning proof, and only headers for past pruning points let keep_blocks: BlockHashSet = data.anticone.iter().copied().collect(); - let keep_relations: BlockHashSet = std::iter::empty() - .chain(data.anticone.iter().copied()) - .chain(data.daa_window_blocks.iter().map(|th| th.header.hash)) - .chain(data.ghostdag_blocks.iter().map(|gd| gd.hash)) - .chain(proof.iter().flatten().map(|h| h.hash)) - .collect(); - let keep_level_zero_relations: BlockHashSet = std::iter::empty() + let mut keep_relations: BlockHashMap = std::iter::empty() .chain(data.anticone.iter().copied()) .chain(data.daa_window_blocks.iter().map(|th| th.header.hash)) .chain(data.ghostdag_blocks.iter().map(|gd| gd.hash)) .chain(proof[0].iter().map(|h| h.hash)) + .map(|h| (h, 0)) // Mark block level 0 for all the above. Note that below we add the remaining levels .collect(); let keep_headers: BlockHashSet = self.past_pruning_points(); info!("Header and Block pruning: waiting for consensus write permissions..."); let mut prune_guard = self.pruning_lock.blocking_write(); - let mut lock_acquire_time = Instant::now(); - let mut reachability_read = self.reachability_store.upgradable_read(); info!("Starting Header and Block pruning..."); { let mut counter = 0; let mut batch = WriteBatch::default(); - for kept in keep_level_zero_relations.iter().copied() { + // At this point keep_relations only holds level-0 relations which is the correct filtering criteria for primary GHOSTDAG + for kept in keep_relations.keys().copied() { let Some(ghostdag) = self.ghostdag_primary_store.get_data(kept).unwrap_option() else { continue; }; - if ghostdag.unordered_mergeset().any(|h| !keep_level_zero_relations.contains(&h)) { + if ghostdag.unordered_mergeset().any(|h| !keep_relations.contains_key(&h)) { let mut mutable_ghostdag: ExternalGhostdagData = ghostdag.as_ref().into(); - mutable_ghostdag.mergeset_blues.retain(|h| keep_level_zero_relations.contains(h)); - mutable_ghostdag.mergeset_reds.retain(|h| keep_level_zero_relations.contains(h)); - mutable_ghostdag.blues_anticone_sizes.retain(|k, _| keep_level_zero_relations.contains(k)); - if !keep_level_zero_relations.contains(&mutable_ghostdag.selected_parent) { + mutable_ghostdag.mergeset_blues.retain(|h| keep_relations.contains_key(h)); + mutable_ghostdag.mergeset_reds.retain(|h| keep_relations.contains_key(h)); + mutable_ghostdag.blues_anticone_sizes.retain(|k, _| keep_relations.contains_key(k)); + if !keep_relations.contains_key(&mutable_ghostdag.selected_parent) { mutable_ghostdag.selected_parent = ORIGIN; } counter += 1; @@ -307,6 +303,45 @@ impl PruningProcessor { info!("Header and Block pruning: updated ghostdag data for {} blocks", counter); } + // No need to hold the prune guard while we continue populating keep_relations + drop(prune_guard); + + // Add additional levels only after filtering GHOSTDAG data via level 0 + for (level, level_proof) in proof.iter().enumerate().skip(1) { + let level = level as BlockLevel; + // We obtain the headers of the pruning point anticone (including the pruning point) + // in order to mark all parents of anticone roots at level as not-to-be-deleted. + // This optimizes multi-level parent validation (see ParentsManager) + // by avoiding the deletion of high-level parents which might still be needed for future + // header validation (avoiding the need for reference blocks; see therein). + // + // Notes: + // + // 1. Normally, such blocks would be part of the proof for this level, but here we address the rare case + // where there are a few such parallel blocks (since the proof only contains the past of the pruning point's + // selected-tip-at-level) + // 2. We refer to the pp anticone as roots even though technically it might contain blocks which are not a pure + // antichain (i.e., some of them are in the past of others). These blocks only add redundant info which would + // be included anyway. + let roots_parents_at_level = data + .anticone + .iter() + .copied() + .map(|hash| self.headers_store.get_header_with_block_level(hash).expect("pruning point anticone is not pruned")) + .filter(|root| level > root.block_level) // If the root itself is at level, there's no need for its level-parents + .flat_map(|root| self.parents_manager.parents_at_level(&root.header, level).iter().copied().collect_vec()); + for hash in level_proof.iter().map(|header| header.hash).chain(roots_parents_at_level) { + if let Vacant(e) = keep_relations.entry(hash) { + // This hash was not added by any lower level -- mark it as affiliated with proof level `level` + e.insert(level); + } + } + } + + prune_guard = self.pruning_lock.blocking_write(); + let mut lock_acquire_time = Instant::now(); + let mut reachability_read = self.reachability_store.upgradable_read(); + { // Start with a batch for pruning body tips and selected chain stores let mut batch = WriteBatch::default(); @@ -394,7 +429,7 @@ impl PruningProcessor { self.acceptance_data_store.delete_batch(&mut batch, current).unwrap(); self.block_transactions_store.delete_batch(&mut batch, current).unwrap(); - if keep_relations.contains(¤t) { + if let Some(&affiliated_proof_level) = keep_relations.get(¤t) { if statuses_write.get(current).unwrap_option().is_some_and(|s| s.is_valid()) { // We set the status to header-only only if it was previously set to a valid // status. This is important since some proof headers might not have their status set @@ -403,17 +438,13 @@ impl PruningProcessor { statuses_write.set_batch(&mut batch, current, StatusHeaderOnly).unwrap(); } - // Delete level-0 relations for blocks which only belong to higher proof levels. - // Note: it is also possible to delete level relations for level x > 0 for any block that only belongs - // to proof levels higher than x, but this requires maintaining such per level usage mapping. - // Since the main motivation of this deletion step is to reduce the - // number of origin's children in level 0, and this is not a bottleneck in any other - // level, we currently chose to only delete level-0 redundant relations. - if !keep_level_zero_relations.contains(¤t) { - let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[0]); + // Delete level-x relations for blocks which only belong to higher-than-x proof levels. + // This preserves the semantic that for each level, relations represent a contiguous DAG area in that level + for lower_level in 0..affiliated_proof_level as usize { + let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[lower_level]); relations::delete_level_relations(MemoryWriter, &mut staging_level_relations, current).unwrap_option(); staging_level_relations.commit(&mut batch).unwrap(); - self.ghostdag_stores[0].delete_batch(&mut batch, current).unwrap_option(); + self.ghostdag_stores[lower_level].delete_batch(&mut batch, current).unwrap_option(); } } else { // Count only blocks which get fully pruned including DAG relations diff --git a/consensus/src/processes/parents_builder.rs b/consensus/src/processes/parents_builder.rs index 89028eb..7a65dcd 100644 --- a/consensus/src/processes/parents_builder.rs +++ b/consensus/src/processes/parents_builder.rs @@ -53,7 +53,7 @@ impl let mut origin_children_headers = None; let mut parents = Vec::with_capacity(self.max_block_level as usize); - for block_level in 0..self.max_block_level { + for block_level in 0..=self.max_block_level { // Direct parents are guaranteed to be in one another's anticones so add them all to // all the block levels they occupy. let mut level_candidates_to_reference_blocks = direct_parent_headers @@ -91,78 +91,89 @@ impl .collect::>() }; - for (i, parent) in grandparents.into_iter().enumerate() { - let has_reachability_data = self.reachability_service.has_reachability_data(parent); - - // Reference blocks are the blocks that are used in reachability queries to check if - // a candidate is in the future of another candidate. In most cases this is just the - // block itself, but in the case where a block doesn't have reachability data we need - // to use some blocks in its future as reference instead. - // If we make sure to add a parent in the future of the pruning point first, we can - // know that any pruned candidate that is in the past of some blocks in the pruning - // point anticone should be a parent (in the relevant level) of one of - // the origin children in the pruning point anticone. So we can check which - // origin children have this block as parent and use those block as - // reference blocks. - let reference_blocks = if has_reachability_data { - smallvec![parent] - } else { - // Here we explicitly declare the type because otherwise Rust would make it mutable. - let origin_children_headers: &Vec<_> = origin_children_headers.get_or_insert_with(|| { - self.relations_service - .get_children(ORIGIN) - .unwrap() - .read() - .iter() - .copied() - .map(|parent| self.headers_store.get_header(parent).unwrap()) - .collect_vec() - }); - let mut reference_blocks = SmallVec::with_capacity(origin_children_headers.len()); - for child_header in origin_children_headers.iter() { - if self.parents_at_level(child_header, block_level).contains(&parent) { - reference_blocks.push(child_header.hash); + let parents_at_level = if level_candidates_to_reference_blocks.is_empty() && first_parent_marker == grandparents.len() { + // Optimization: this is a common case for high levels where none of the direct parents is on the level + // and all direct parents have the same level parents. The condition captures this case because all grandparents + // will be below the first parent marker and there will be no additional grandparents. Bcs all grandparents come + // from a single, already validated parent, there's no need to run any additional antichain checks and we can return + // this set. + grandparents.into_iter().collect() + } else { + // + // Iterate through grandparents in order to find an antichain + for (i, parent) in grandparents.into_iter().enumerate() { + let has_reachability_data = self.reachability_service.has_reachability_data(parent); + + // Reference blocks are the blocks that are used in reachability queries to check if + // a candidate is in the future of another candidate. In most cases this is just the + // block itself, but in the case where a block doesn't have reachability data we need + // to use some blocks in its future as reference instead. + // If we make sure to add a parent in the future of the pruning point first, we can + // know that any pruned candidate that is in the past of some blocks in the pruning + // point anticone should be a parent (in the relevant level) of one of + // the origin children in the pruning point anticone. So we can check which + // origin children have this block as parent and use those block as + // reference blocks. + let reference_blocks = if has_reachability_data { + smallvec![parent] + } else { + // Here we explicitly declare the type because otherwise Rust would make it mutable. + let origin_children_headers: &Vec<_> = origin_children_headers.get_or_insert_with(|| { + self.relations_service + .get_children(ORIGIN) + .unwrap() + .read() + .iter() + .copied() + .map(|parent| self.headers_store.get_header(parent).unwrap()) + .collect_vec() + }); + let mut reference_blocks = SmallVec::with_capacity(origin_children_headers.len()); + for child_header in origin_children_headers.iter() { + if self.parents_at_level(child_header, block_level).contains(&parent) { + reference_blocks.push(child_header.hash); + } } + reference_blocks + }; + + // Make sure we process and insert all first parent's parents. See comments above. + // Note that as parents of an already validated block, they all form an antichain, + // hence no need for reachability queries yet. + if i < first_parent_marker { + level_candidates_to_reference_blocks.insert(parent, reference_blocks); + continue; } - reference_blocks - }; - - // Make sure we process and insert all first parent's parents. See comments above. - // Note that as parents of an already validated block, they all form an antichain, - // hence no need for reachability queries yet. - if i < first_parent_marker { - level_candidates_to_reference_blocks.insert(parent, reference_blocks); - continue; - } - if !has_reachability_data { - continue; - } + if !has_reachability_data { + continue; + } - let len_before_retain = level_candidates_to_reference_blocks.len(); - level_candidates_to_reference_blocks - .retain(|_, refs| !self.reachability_service.is_any_dag_ancestor(&mut refs.iter().copied(), parent)); - let is_any_candidate_ancestor_of = level_candidates_to_reference_blocks.len() < len_before_retain; - - // We should add the block as a candidate if it's in the future of another candidate - // or in the anticone of all candidates. - if is_any_candidate_ancestor_of - || !level_candidates_to_reference_blocks.iter().any(|(_, candidate_references)| { - self.reachability_service.is_dag_ancestor_of_any(parent, &mut candidate_references.iter().copied()) - }) - { - level_candidates_to_reference_blocks.insert(parent, reference_blocks); + let len_before_retain = level_candidates_to_reference_blocks.len(); + level_candidates_to_reference_blocks + .retain(|_, refs| !self.reachability_service.is_any_dag_ancestor(&mut refs.iter().copied(), parent)); + let is_any_candidate_ancestor_of = level_candidates_to_reference_blocks.len() < len_before_retain; + + // We should add the block as a candidate if it's in the future of another candidate + // or in the anticone of all candidates. + if is_any_candidate_ancestor_of + || !level_candidates_to_reference_blocks.iter().any(|(_, candidate_references)| { + self.reachability_service.is_dag_ancestor_of_any(parent, &mut candidate_references.iter().copied()) + }) + { + level_candidates_to_reference_blocks.insert(parent, reference_blocks); + } } - } - if block_level > 0 - && level_candidates_to_reference_blocks.len() == 1 - && level_candidates_to_reference_blocks.contains_key(&self.genesis_hash) - { + // After processing all grandparents, collect the successful level candidates + level_candidates_to_reference_blocks.keys().copied().collect_vec() + }; + + if block_level > 0 && parents_at_level.as_slice() == std::slice::from_ref(&self.genesis_hash) { break; } - parents.push(level_candidates_to_reference_blocks.keys().copied().collect_vec()); + parents.push(parents_at_level); } parents diff --git a/consensus/src/processes/reachability/tests/mod.rs b/consensus/src/processes/reachability/tests/mod.rs index 946a08b..aeb0eb0 100644 --- a/consensus/src/processes/reachability/tests/mod.rs +++ b/consensus/src/processes/reachability/tests/mod.rs @@ -105,6 +105,12 @@ impl DagBlock { } } +impl From<(u64, &[u64])> for DagBlock { + fn from(value: (u64, &[u64])) -> Self { + Self::new(value.0.into(), value.1.iter().map(|&i| i.into()).collect()) + } +} + /// A struct with fluent API to streamline DAG building pub struct DagBuilder<'a, T: ReachabilityStore + ?Sized, S: RelationsStore + ChildrenStore + ?Sized> { reachability: &'a mut T, diff --git a/utils/src/sync/semaphore.rs b/utils/src/sync/semaphore.rs index 2ea6dcc..c0ffec8 100644 --- a/utils/src/sync/semaphore.rs +++ b/utils/src/sync/semaphore.rs @@ -41,7 +41,7 @@ mod trace { if log_time + (Duration::from_secs(10).as_micros() as u64) < now { let log_value = self.log_value.load(Ordering::Relaxed); debug!( - "Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.2}", + "Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.4}", Duration::from_micros(now - log_time), Duration::from_micros(readers_time - log_value), (readers_time - log_value) as f64 / (now - log_time) as f64