diff --git a/consensus/src/processes/pruning_proof/apply.rs b/consensus/src/processes/pruning_proof/apply.rs new file mode 100644 index 000000000..8463d6464 --- /dev/null +++ b/consensus/src/processes/pruning_proof/apply.rs @@ -0,0 +1,236 @@ +use std::{ + cmp::Reverse, + collections::{hash_map::Entry::Vacant, BinaryHeap, HashSet}, + sync::Arc, +}; + +use itertools::Itertools; +use kaspa_consensus_core::{ + blockhash::{BlockHashes, ORIGIN}, + errors::pruning::{PruningImportError, PruningImportResult}, + header::Header, + pruning::PruningPointProof, + trusted::TrustedBlock, + BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, +}; +use kaspa_core::{debug, trace}; +use kaspa_hashes::Hash; +use kaspa_pow::calc_block_level; +use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions}; +use rocksdb::WriteBatch; + +use crate::{ + model::{ + services::reachability::ReachabilityService, + stores::{ + ghostdag::{GhostdagData, GhostdagStore}, + headers::HeaderStore, + reachability::StagingReachabilityStore, + relations::StagingRelationsStore, + selected_chain::SelectedChainStore, + virtual_state::{VirtualState, VirtualStateStore}, + }, + }, + processes::{ + ghostdag::{mergeset::unordered_mergeset_without_selected_parent, ordering::SortableBlock}, + reachability::inquirer as reachability, + relations::RelationsStoreExtensions, + }, +}; + +use super::PruningProofManager; + +impl PruningProofManager { + pub fn apply_proof(&self, mut proof: PruningPointProof, trusted_set: &[TrustedBlock]) -> PruningImportResult<()> { + let pruning_point_header = proof[0].last().unwrap().clone(); + let pruning_point = pruning_point_header.hash; + + // Create a copy of the proof, since we're going to be mutating the proof passed to us + let proof_sets = (0..=self.max_block_level) + .map(|level| BlockHashSet::from_iter(proof[level as usize].iter().map(|header| header.hash))) + .collect_vec(); + + let mut trusted_gd_map: BlockHashMap = BlockHashMap::new(); + for tb in trusted_set.iter() { + trusted_gd_map.insert(tb.block.hash(), tb.ghostdag.clone().into()); + let tb_block_level = calc_block_level(&tb.block.header, self.max_block_level); + + (0..=tb_block_level).for_each(|current_proof_level| { + // If this block was in the original proof, ignore it + if proof_sets[current_proof_level as usize].contains(&tb.block.hash()) { + return; + } + + proof[current_proof_level as usize].push(tb.block.header.clone()); + }); + } + + proof.iter_mut().for_each(|level_proof| { + level_proof.sort_by(|a, b| a.blue_work.cmp(&b.blue_work)); + }); + + self.populate_reachability_and_headers(&proof); + + { + let reachability_read = self.reachability_store.read(); + for tb in trusted_set.iter() { + // Header-only trusted blocks are expected to be in pruning point past + if tb.block.is_header_only() && !reachability_read.is_dag_ancestor_of(tb.block.hash(), pruning_point) { + return Err(PruningImportError::PruningPointPastMissingReachability(tb.block.hash())); + } + } + } + + for (level, headers) in proof.iter().enumerate() { + trace!("Applying level {} from the pruning point proof", level); + let mut level_ancestors: HashSet = HashSet::new(); + level_ancestors.insert(ORIGIN); + + for header in headers.iter() { + let parents = Arc::new( + self.parents_manager + .parents_at_level(header, level as BlockLevel) + .iter() + .copied() + .filter(|parent| level_ancestors.contains(parent)) + .collect_vec() + .push_if_empty(ORIGIN), + ); + + self.relations_stores.write()[level].insert(header.hash, parents.clone()).unwrap(); + + if level == 0 { + let gd = if let Some(gd) = trusted_gd_map.get(&header.hash) { + gd.clone() + } else { + let calculated_gd = self.ghostdag_manager.ghostdag(&parents); + // Override the ghostdag data with the real blue score and blue work + GhostdagData { + blue_score: header.blue_score, + blue_work: header.blue_work, + selected_parent: calculated_gd.selected_parent, + mergeset_blues: calculated_gd.mergeset_blues, + mergeset_reds: calculated_gd.mergeset_reds, + blues_anticone_sizes: calculated_gd.blues_anticone_sizes, + } + }; + self.ghostdag_store.insert(header.hash, Arc::new(gd)).unwrap(); + } + + level_ancestors.insert(header.hash); + } + } + + let virtual_parents = vec![pruning_point]; + let virtual_state = Arc::new(VirtualState { + parents: virtual_parents.clone(), + ghostdag_data: self.ghostdag_manager.ghostdag(&virtual_parents), + ..VirtualState::default() + }); + self.virtual_stores.write().state.set(virtual_state).unwrap(); + + let mut batch = WriteBatch::default(); + self.body_tips_store.write().init_batch(&mut batch, &virtual_parents).unwrap(); + self.headers_selected_tip_store + .write() + .set_batch(&mut batch, SortableBlock { hash: pruning_point, blue_work: pruning_point_header.blue_work }) + .unwrap(); + self.selected_chain_store.write().init_with_pruning_point(&mut batch, pruning_point).unwrap(); + self.depth_store.insert_batch(&mut batch, pruning_point, ORIGIN, ORIGIN).unwrap(); + self.db.write(batch).unwrap(); + + Ok(()) + } + + pub fn populate_reachability_and_headers(&self, proof: &PruningPointProof) { + let capacity_estimate = self.estimate_proof_unique_size(proof); + let mut dag = BlockHashMap::with_capacity(capacity_estimate); + let mut up_heap = BinaryHeap::with_capacity(capacity_estimate); + for header in proof.iter().flatten().cloned() { + if let Vacant(e) = dag.entry(header.hash) { + // TODO: Check if pow passes + let block_level = calc_block_level(&header, self.max_block_level); + self.headers_store.insert(header.hash, header.clone(), block_level).unwrap(); + + let mut parents = BlockHashSet::with_capacity(header.direct_parents().len() * 2); + // We collect all available parent relations in order to maximize reachability information. + // By taking into account parents from all levels we ensure that the induced DAG has valid + // reachability information for each level-specific sub-DAG -- hence a single reachability + // oracle can serve them all + for level in 0..=self.max_block_level { + for parent in self.parents_manager.parents_at_level(&header, level) { + parents.insert(*parent); + } + } + + struct DagEntry { + header: Arc
, + parents: Arc, + } + + up_heap.push(Reverse(SortableBlock { hash: header.hash, blue_work: header.blue_work })); + e.insert(DagEntry { header, parents: Arc::new(parents) }); + } + } + + debug!("Estimated proof size: {}, actual size: {}", capacity_estimate, dag.len()); + + for reverse_sortable_block in up_heap.into_sorted_iter() { + // TODO: Convert to into_iter_sorted once it gets stable + let hash = reverse_sortable_block.0.hash; + let dag_entry = dag.get(&hash).unwrap(); + + // Filter only existing parents + let parents_in_dag = BinaryHeap::from_iter( + dag_entry + .parents + .iter() + .cloned() + .filter(|parent| dag.contains_key(parent)) + .map(|parent| SortableBlock { hash: parent, blue_work: dag.get(&parent).unwrap().header.blue_work }), + ); + + let reachability_read = self.reachability_store.upgradable_read(); + + // Find the maximal parent antichain from the possibly redundant set of existing parents + let mut reachability_parents: Vec = Vec::new(); + for parent in parents_in_dag.into_sorted_iter() { + if reachability_read.is_dag_ancestor_of_any(parent.hash, &mut reachability_parents.iter().map(|parent| parent.hash)) { + continue; + } + + reachability_parents.push(parent); + } + let reachability_parents_hashes = + BlockHashes::new(reachability_parents.iter().map(|parent| parent.hash).collect_vec().push_if_empty(ORIGIN)); + let selected_parent = reachability_parents.iter().max().map(|parent| parent.hash).unwrap_or(ORIGIN); + + // Prepare batch + let mut batch = WriteBatch::default(); + let mut reachability_relations_write = self.reachability_relations_store.write(); + let mut staging_reachability = StagingReachabilityStore::new(reachability_read); + let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write); + + // Stage + staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap(); + let mergeset = unordered_mergeset_without_selected_parent( + &staging_reachability_relations, + &staging_reachability, + selected_parent, + &reachability_parents_hashes, + ); + reachability::add_block(&mut staging_reachability, hash, selected_parent, &mut mergeset.iter().copied()).unwrap(); + + // Commit + let reachability_write = staging_reachability.commit(&mut batch).unwrap(); + staging_reachability_relations.commit(&mut batch).unwrap(); + + // Write + self.db.write(batch).unwrap(); + + // Drop + drop(reachability_write); + drop(reachability_relations_write); + } + } +} diff --git a/consensus/src/processes/pruning_proof/build.rs b/consensus/src/processes/pruning_proof/build.rs new file mode 100644 index 000000000..8ae6fb34c --- /dev/null +++ b/consensus/src/processes/pruning_proof/build.rs @@ -0,0 +1,532 @@ +use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc}; + +use itertools::Itertools; +use kaspa_consensus_core::{ + blockhash::{BlockHashExtensions, BlockHashes, ORIGIN}, + header::Header, + pruning::PruningPointProof, + BlockHashSet, BlockLevel, HashMapCustomHasher, +}; +use kaspa_core::debug; +use kaspa_database::prelude::{CachePolicy, ConnBuilder, StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions, DB}; +use kaspa_hashes::Hash; + +use crate::{ + model::{ + services::reachability::ReachabilityService, + stores::{ + ghostdag::{DbGhostdagStore, GhostdagStore, GhostdagStoreReader}, + headers::{HeaderStoreReader, HeaderWithBlockLevel}, + relations::RelationsStoreReader, + }, + }, + processes::{ + ghostdag::{ordering::SortableBlock, protocol::GhostdagManager}, + pruning_proof::PruningProofManagerInternalError, + }, +}; + +use super::{PruningProofManager, PruningProofManagerInternalResult}; + +#[derive(Clone)] +struct RelationsStoreInFutureOfRoot { + relations_store: T, + reachability_service: U, + root: Hash, +} + +impl RelationsStoreReader for RelationsStoreInFutureOfRoot { + fn get_parents(&self, hash: Hash) -> Result { + self.relations_store.get_parents(hash).map(|hashes| { + Arc::new(hashes.iter().copied().filter(|h| self.reachability_service.is_dag_ancestor_of(self.root, *h)).collect_vec()) + }) + } + + fn get_children(&self, hash: Hash) -> StoreResult> { + // We assume hash is in future of root + assert!(self.reachability_service.is_dag_ancestor_of(self.root, hash)); + self.relations_store.get_children(hash) + } + + fn has(&self, hash: Hash) -> Result { + if self.reachability_service.is_dag_ancestor_of(self.root, hash) { + Ok(false) + } else { + self.relations_store.has(hash) + } + } + + fn counts(&self) -> Result<(usize, usize), kaspa_database::prelude::StoreError> { + unimplemented!() + } +} + +impl PruningProofManager { + pub(crate) fn build_pruning_point_proof(&self, pp: Hash) -> PruningPointProof { + if pp == self.genesis_hash { + return vec![]; + } + + let (_db_lifetime, temp_db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let pp_header = self.headers_store.get_header_with_block_level(pp).unwrap(); + let (ghostdag_stores, selected_tip_by_level, roots_by_level) = self.calc_gd_for_all_levels(&pp_header, temp_db); + + (0..=self.max_block_level) + .map(|level| { + let level = level as usize; + let selected_tip = selected_tip_by_level[level]; + let block_at_depth_2m = self + .block_at_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m) + .map_err(|err| format!("level: {}, err: {}", level, err)) + .unwrap(); + + // TODO (relaxed): remove the assertion below + // (New Logic) This is the root we calculated by going through block relations + let root = roots_by_level[level]; + // (Old Logic) This is the root we can calculate given that the GD records are already filled + // The root calc logic below is the original logic before the on-demand higher level GD calculation + // We only need old_root to sanity check the new logic + let old_root = if level != self.max_block_level as usize { + let block_at_depth_m_at_next_level = self + .block_at_depth(&*ghostdag_stores[level + 1], selected_tip_by_level[level + 1], self.pruning_proof_m) + .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err)) + .unwrap(); + if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) { + block_at_depth_m_at_next_level + } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) { + block_at_depth_2m + } else { + self.find_common_ancestor_in_chain_of_a( + &*ghostdag_stores[level], + block_at_depth_m_at_next_level, + block_at_depth_2m, + ) + .map_err(|err| format!("level: {}, err: {}", level, err)) + .unwrap() + } + } else { + block_at_depth_2m + }; + + // new root is expected to be always an ancestor of old_root because new root takes a safety margin + assert!(self.reachability_service.is_dag_ancestor_of(root, old_root)); + + let mut headers = Vec::with_capacity(2 * self.pruning_proof_m as usize); + let mut queue = BinaryHeap::>::new(); + let mut visited = BlockHashSet::new(); + queue.push(Reverse(SortableBlock::new(root, self.headers_store.get_header(root).unwrap().blue_work))); + while let Some(current) = queue.pop() { + let current = current.0.hash; + if !visited.insert(current) { + continue; + } + + // The second condition is always expected to be true (ghostdag store will have the entry) + // because we are traversing the exact diamond (future(root) ⋂ past(tip)) for which we calculated + // GD for (see fill_level_proof_ghostdag_data). TODO (relaxed): remove the condition or turn into assertion + if !self.reachability_service.is_dag_ancestor_of(current, selected_tip) + || !ghostdag_stores[level].has(current).is_ok_and(|found| found) + { + continue; + } + + headers.push(self.headers_store.get_header(current).unwrap()); + for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() { + queue.push(Reverse(SortableBlock::new(child, self.headers_store.get_header(child).unwrap().blue_work))); + } + } + + // TODO (relaxed): remove the assertion below + // Temp assertion for verifying a bug fix: assert that the full 2M chain is actually contained in the composed level proof + let set = BlockHashSet::from_iter(headers.iter().map(|h| h.hash)); + let chain_2m = self + .chain_up_to_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m) + .map_err(|err| { + dbg!(level, selected_tip, block_at_depth_2m, root); + format!("Assert 2M chain -- level: {}, err: {}", level, err) + }) + .unwrap(); + let chain_2m_len = chain_2m.len(); + for (i, chain_hash) in chain_2m.into_iter().enumerate() { + if !set.contains(&chain_hash) { + let next_level_tip = selected_tip_by_level[level + 1]; + let next_level_chain_m = + self.chain_up_to_depth(&*ghostdag_stores[level + 1], next_level_tip, self.pruning_proof_m).unwrap(); + let next_level_block_m = next_level_chain_m.last().copied().unwrap(); + dbg!(next_level_chain_m.len()); + dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_tip).unwrap().blue_score); + dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_block_m).unwrap().blue_score); + dbg!(ghostdag_stores[level].get_compact_data(selected_tip).unwrap().blue_score); + dbg!(ghostdag_stores[level].get_compact_data(block_at_depth_2m).unwrap().blue_score); + dbg!(level, selected_tip, block_at_depth_2m, root); + panic!("Assert 2M chain -- missing block {} at index {} out of {} chain blocks", chain_hash, i, chain_2m_len); + } + } + + headers + }) + .collect_vec() + } + + fn calc_gd_for_all_levels( + &self, + pp_header: &HeaderWithBlockLevel, + temp_db: Arc, + ) -> (Vec>, Vec, Vec) { + let current_dag_level = self.find_current_dag_level(&pp_header.header); + let mut ghostdag_stores: Vec>> = vec![None; self.max_block_level as usize + 1]; + let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1]; + let mut root_by_level = vec![None; self.max_block_level as usize + 1]; + for level in (0..=self.max_block_level).rev() { + let level_usize = level as usize; + let required_block = if level != self.max_block_level { + let next_level_store = ghostdag_stores[level_usize + 1].as_ref().unwrap().clone(); + let block_at_depth_m_at_next_level = self + .block_at_depth(&*next_level_store, selected_tip_by_level[level_usize + 1].unwrap(), self.pruning_proof_m) + .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err)) + .unwrap(); + Some(block_at_depth_m_at_next_level) + } else { + None + }; + let (store, selected_tip, root) = self + .find_sufficient_root(pp_header, level, current_dag_level, required_block, temp_db.clone()) + .unwrap_or_else(|_| panic!("find_sufficient_root failed for level {level}")); + ghostdag_stores[level_usize] = Some(store); + selected_tip_by_level[level_usize] = Some(selected_tip); + root_by_level[level_usize] = Some(root); + } + + ( + ghostdag_stores.into_iter().map(Option::unwrap).collect_vec(), + selected_tip_by_level.into_iter().map(Option::unwrap).collect_vec(), + root_by_level.into_iter().map(Option::unwrap).collect_vec(), + ) + } + + /// Find a sufficient root at a given level by going through the headers store and looking + /// for a deep enough level block + /// For each root candidate, fill in the ghostdag data to see if it actually is deep enough. + /// If the root is deep enough, it will satisfy these conditions + /// 1. block at depth 2m at this level ∈ Future(root) + /// 2. block at depth m at the next level ∈ Future(root) + /// + /// Returns: the filled ghostdag store from root to tip, the selected tip and the root + fn find_sufficient_root( + &self, + pp_header: &HeaderWithBlockLevel, + level: BlockLevel, + current_dag_level: BlockLevel, + required_block: Option, + temp_db: Arc, + ) -> PruningProofManagerInternalResult<(Arc, Hash, Hash)> { + // Step 1: Determine which selected tip to use + let selected_tip = if pp_header.block_level >= level { + pp_header.header.hash + } else { + self.find_selected_parent_header_at_level(&pp_header.header, level)?.hash + }; + + let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize); + let required_level_depth = 2 * self.pruning_proof_m; + + // We only have the headers store (which has level 0 blue_scores) to assemble the proof data from. + // We need to look deeper at higher levels (2x deeper every level) to find 2M (plus margin) blocks at that level + let mut required_base_level_depth = self.estimated_blue_depth_at_level_0( + level, + required_level_depth + 100, // We take a safety margin + current_dag_level, + ); + + let mut is_last_level_header; + let mut tries = 0; + + let block_at_depth_m_at_next_level = required_block.unwrap_or(selected_tip); + + loop { + // Step 2 - Find a deep enough root candidate + let block_at_depth_2m = match self.level_block_at_base_depth(level, selected_tip, required_base_level_depth) { + Ok((header, is_last_header)) => { + is_last_level_header = is_last_header; + header + } + Err(e) => return Err(e), + }; + + let root = if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) { + block_at_depth_2m + } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) { + block_at_depth_m_at_next_level + } else { + // find common ancestor of block_at_depth_m_at_next_level and block_at_depth_2m in chain of block_at_depth_m_at_next_level + let mut common_ancestor = self.headers_store.get_header(block_at_depth_m_at_next_level).unwrap(); + + while !self.reachability_service.is_dag_ancestor_of(common_ancestor.hash, block_at_depth_2m) { + common_ancestor = match self.find_selected_parent_header_at_level(&common_ancestor, level) { + Ok(header) => header, + // Try to give this last header a chance at being root + Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => break, + Err(e) => return Err(e), + }; + } + + common_ancestor.hash + }; + + if level == 0 { + return Ok((self.ghostdag_store.clone(), selected_tip, root)); + } + + // Step 3 - Fill the ghostdag data from root to tip + let ghostdag_store = Arc::new(DbGhostdagStore::new_temp(temp_db.clone(), level, cache_policy, cache_policy, tries)); + let has_required_block = self.fill_level_proof_ghostdag_data( + root, + pp_header.header.hash, + &ghostdag_store, + Some(block_at_depth_m_at_next_level), + level, + ); + + // Step 4 - Check if we actually have enough depth. + // Need to ensure this does the same 2M+1 depth that block_at_depth does + if has_required_block + && (root == self.genesis_hash || ghostdag_store.get_blue_score(selected_tip).unwrap() >= required_level_depth) + { + break Ok((ghostdag_store, selected_tip, root)); + } + + tries += 1; + if is_last_level_header { + if has_required_block { + // Normally this scenario doesn't occur when syncing with nodes that already have the safety margin change in place. + // However, when syncing with an older node version that doesn't have a safety margin for the proof, it's possible to + // try to find 2500 depth worth of headers at a level, but the proof only contains about 2000 headers. To be able to sync + // with such an older node. As long as we found the required block, we can still proceed. + debug!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned. Required block found so trying anyway."); + break Ok((ghostdag_store, selected_tip, root)); + } else { + panic!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned"); + } + } + + // If we don't have enough depth now, we need to look deeper + required_base_level_depth = (required_base_level_depth as f64 * 1.1) as u64; + debug!("Failed to find sufficient root for level {level} after {tries} tries. Retrying again to find with depth {required_base_level_depth}"); + } + } + + /// BFS forward iterates from root until selected tip, ignoring blocks in the antipast of selected_tip. + /// For each block along the way, insert that hash into the ghostdag_store + /// If we have a required_block to find, this will return true if that block was found along the way + fn fill_level_proof_ghostdag_data( + &self, + root: Hash, + selected_tip: Hash, + ghostdag_store: &Arc, + required_block: Option, + level: BlockLevel, + ) -> bool { + let relations_service = RelationsStoreInFutureOfRoot { + relations_store: self.level_relations_services[level as usize].clone(), + reachability_service: self.reachability_service.clone(), + root, + }; + let gd_manager = GhostdagManager::new( + root, + self.ghostdag_k, + ghostdag_store.clone(), + relations_service.clone(), + self.headers_store.clone(), + self.reachability_service.clone(), + level != 0, + ); + + ghostdag_store.insert(root, Arc::new(gd_manager.genesis_ghostdag_data())).unwrap(); + ghostdag_store.insert(ORIGIN, gd_manager.origin_ghostdag_data()).unwrap(); + + let mut topological_heap: BinaryHeap<_> = Default::default(); + let mut visited = BlockHashSet::new(); + for child in relations_service.get_children(root).unwrap().read().iter().copied() { + topological_heap.push(Reverse(SortableBlock { + hash: child, + // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology + blue_work: self.headers_store.get_header(child).unwrap().blue_work, + })); + } + + let mut has_required_block = required_block.is_some_and(|required_block| root == required_block); + loop { + let Some(current) = topological_heap.pop() else { + break; + }; + let current_hash = current.0.hash; + if !visited.insert(current_hash) { + continue; + } + + if !self.reachability_service.is_dag_ancestor_of(current_hash, selected_tip) { + // We don't care about blocks in the antipast of the selected tip + continue; + } + + if !has_required_block && required_block.is_some_and(|required_block| current_hash == required_block) { + has_required_block = true; + } + + let current_gd = gd_manager.ghostdag(&relations_service.get_parents(current_hash).unwrap()); + + ghostdag_store.insert(current_hash, Arc::new(current_gd)).unwrap_or_exists(); + + for child in relations_service.get_children(current_hash).unwrap().read().iter().copied() { + topological_heap.push(Reverse(SortableBlock { + hash: child, + // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology + blue_work: self.headers_store.get_header(child).unwrap().blue_work, + })); + } + } + + has_required_block + } + + // The "current dag level" is the level right before the level whose parents are + // not the same as our header's direct parents + // + // Find the current DAG level by going through all the parents at each level, + // starting from the bottom level and see which is the first level that has + // parents that are NOT our current pp_header's direct parents. + fn find_current_dag_level(&self, pp_header: &Header) -> BlockLevel { + let direct_parents = BlockHashSet::from_iter(pp_header.direct_parents().iter().copied()); + pp_header + .parents_by_level + .iter() + .enumerate() + .skip(1) + .find_map(|(level, parents)| { + if BlockHashSet::from_iter(parents.iter().copied()) == direct_parents { + None + } else { + Some((level - 1) as BlockLevel) + } + }) + .unwrap_or(self.max_block_level) + } + + fn estimated_blue_depth_at_level_0(&self, level: BlockLevel, level_depth: u64, current_dag_level: BlockLevel) -> u64 { + level_depth.checked_shl(level.saturating_sub(current_dag_level) as u32).unwrap_or(level_depth) + } + + /// selected parent at level = the parent of the header at the level + /// with the highest blue_work + fn find_selected_parent_header_at_level( + &self, + header: &Header, + level: BlockLevel, + ) -> PruningProofManagerInternalResult> { + // Parents manager parents_at_level may return parents that aren't in relations_service, so it's important + // to filter to include only parents that are in relations_service. + let sp = self + .parents_manager + .parents_at_level(header, level) + .iter() + .copied() + .filter(|p| self.level_relations_services[level as usize].has(*p).unwrap()) + .filter_map(|p| self.headers_store.get_header(p).unwrap_option().map(|h| SortableBlock::new(p, h.blue_work))) + .max() + .ok_or(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof("no parents with header".to_string()))?; + Ok(self.headers_store.get_header(sp.hash).expect("unwrapped above")) + } + + /// Finds the block on a given level that is at base_depth deep from it. + /// Also returns if the block was the last one in the level + /// base_depth = the blue score depth at level 0 + fn level_block_at_base_depth( + &self, + level: BlockLevel, + high: Hash, + base_depth: u64, + ) -> PruningProofManagerInternalResult<(Hash, bool)> { + let high_header = self + .headers_store + .get_header(high) + .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {base_depth}, {err}")))?; + let high_header_score = high_header.blue_score; + let mut current_header = high_header; + + let mut is_last_header = false; + + while current_header.blue_score + base_depth >= high_header_score { + if current_header.direct_parents().is_empty() { + break; + } + + current_header = match self.find_selected_parent_header_at_level(¤t_header, level) { + Ok(header) => header, + Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => { + // We want to give this root a shot if all its past is pruned + is_last_header = true; + break; + } + Err(e) => return Err(e), + }; + } + Ok((current_header.hash, is_last_header)) + } + + /// Copy of `block_at_depth` which returns the full chain up to depth. Temporarily used for assertion purposes. + fn chain_up_to_depth( + &self, + ghostdag_store: &impl GhostdagStoreReader, + high: Hash, + depth: u64, + ) -> Result, PruningProofManagerInternalError> { + let high_gd = ghostdag_store + .get_compact_data(high) + .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?; + let mut current_gd = high_gd; + let mut current = high; + let mut res = vec![current]; + while current_gd.blue_score + depth >= high_gd.blue_score { + if current_gd.selected_parent.is_origin() { + break; + } + let prev = current; + current = current_gd.selected_parent; + res.push(current); + current_gd = ghostdag_store.get_compact_data(current).map_err(|err| { + PruningProofManagerInternalError::BlockAtDepth(format!( + "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}", + high, depth, prev, high_gd.blue_score, current_gd.blue_score, err + )) + })?; + } + Ok(res) + } + + fn find_common_ancestor_in_chain_of_a( + &self, + ghostdag_store: &impl GhostdagStoreReader, + a: Hash, + b: Hash, + ) -> Result { + let a_gd = ghostdag_store + .get_compact_data(a) + .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?; + let mut current_gd = a_gd; + let mut current; + let mut loop_counter = 0; + loop { + current = current_gd.selected_parent; + loop_counter += 1; + if current.is_origin() { + break Err(PruningProofManagerInternalError::NoCommonAncestor(format!("a: {a}, b: {b} ({loop_counter} loop steps)"))); + } + if self.reachability_service.is_dag_ancestor_of(current, b) { + break Ok(current); + } + current_gd = ghostdag_store + .get_compact_data(current) + .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?; + } + } +} diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs index e9690ec38..2b3ba5f9d 100644 --- a/consensus/src/processes/pruning_proof/mod.rs +++ b/consensus/src/processes/pruning_proof/mod.rs @@ -1,40 +1,32 @@ +mod apply; +mod build; +mod validate; + use std::{ - cmp::{max, Reverse}, collections::{ - hash_map::Entry::{self, Vacant}, - BinaryHeap, HashSet, VecDeque, - }, - ops::{Deref, DerefMut}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, + hash_map::Entry::{self}, + VecDeque, }, + ops::Deref, + sync::{atomic::AtomicBool, Arc}, }; use itertools::Itertools; -use kaspa_math::int::SignedInteger; use parking_lot::{Mutex, RwLock}; use rocksdb::WriteBatch; use kaspa_consensus_core::{ - blockhash::{self, BlockHashExtensions, BlockHashes, ORIGIN}, - errors::{ - consensus::{ConsensusError, ConsensusResult}, - pruning::{PruningImportError, PruningImportResult}, - }, + blockhash::{self, BlockHashExtensions}, + errors::consensus::{ConsensusError, ConsensusResult}, header::Header, pruning::{PruningPointProof, PruningPointTrustedData}, - trusted::{TrustedBlock, TrustedGhostdagData, TrustedHeader}, + trusted::{TrustedGhostdagData, TrustedHeader}, BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType, }; -use kaspa_core::{debug, info, trace}; -use kaspa_database::{ - prelude::{CachePolicy, ConnBuilder, StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions}, - utils::DbLifetime, -}; +use kaspa_core::info; +use kaspa_database::{prelude::StoreResultExtensions, utils::DbLifetime}; use kaspa_hashes::Hash; use kaspa_pow::calc_block_level; -use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions}; use thiserror::Error; use crate::{ @@ -43,35 +35,26 @@ use crate::{ storage::ConsensusStorage, }, model::{ - services::{ - reachability::{MTReachabilityService, ReachabilityService}, - relations::MTRelationsService, - }, + services::{reachability::MTReachabilityService, relations::MTRelationsService}, stores::{ depth::DbDepthStore, - ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStore, GhostdagStoreReader}, - headers::{DbHeadersStore, HeaderStore, HeaderStoreReader, HeaderWithBlockLevel}, + ghostdag::{DbGhostdagStore, GhostdagStoreReader}, + headers::{DbHeadersStore, HeaderStore, HeaderStoreReader}, headers_selected_tip::DbHeadersSelectedTipStore, past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStore}, pruning::{DbPruningStore, PruningStoreReader}, - reachability::{DbReachabilityStore, ReachabilityStoreReader, StagingReachabilityStore}, - relations::{DbRelationsStore, RelationsStoreReader, StagingRelationsStore}, - selected_chain::{DbSelectedChainStore, SelectedChainStore}, + reachability::DbReachabilityStore, + relations::{DbRelationsStore, RelationsStoreReader}, + selected_chain::DbSelectedChainStore, tips::DbTipsStore, - virtual_state::{VirtualState, VirtualStateStore, VirtualStateStoreReader, VirtualStores}, + virtual_state::{VirtualStateStoreReader, VirtualStores}, DB, }, }, - processes::{ - ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions, - window::WindowType, - }, + processes::window::WindowType, }; -use super::{ - ghostdag::{mergeset::unordered_mergeset_without_selected_parent, protocol::GhostdagManager}, - window::WindowManager, -}; +use super::{ghostdag::protocol::GhostdagManager, window::WindowManager}; #[derive(Error, Debug)] enum PruningProofManagerInternalError { @@ -110,39 +93,6 @@ struct TempProofContext { db_lifetime: DbLifetime, } -#[derive(Clone)] -struct RelationsStoreInFutureOfRoot { - relations_store: T, - reachability_service: U, - root: Hash, -} - -impl RelationsStoreReader for RelationsStoreInFutureOfRoot { - fn get_parents(&self, hash: Hash) -> Result { - self.relations_store.get_parents(hash).map(|hashes| { - Arc::new(hashes.iter().copied().filter(|h| self.reachability_service.is_dag_ancestor_of(self.root, *h)).collect_vec()) - }) - } - - fn get_children(&self, hash: Hash) -> StoreResult> { - // We assume hash is in future of root - assert!(self.reachability_service.is_dag_ancestor_of(self.root, hash)); - self.relations_store.get_children(hash) - } - - fn has(&self, hash: Hash) -> Result { - if self.reachability_service.is_dag_ancestor_of(self.root, hash) { - Ok(false) - } else { - self.relations_store.has(hash) - } - } - - fn counts(&self) -> Result<(usize, usize), kaspa_database::prelude::StoreError> { - unimplemented!() - } -} - pub struct PruningProofManager { db: Arc, @@ -241,10 +191,7 @@ impl PruningProofManager { continue; } - let state = kaspa_pow::State::new(header); - let (_, pow) = state.check_pow(header.nonce); - let signed_block_level = self.max_block_level as i64 - pow.bits() as i64; - let block_level = max(signed_block_level, 0) as BlockLevel; + let block_level = calc_block_level(header, self.max_block_level); self.headers_store.insert(header.hash, header.clone(), block_level).unwrap(); } @@ -259,949 +206,14 @@ impl PruningProofManager { drop(pruning_point_write); } - pub fn apply_proof(&self, mut proof: PruningPointProof, trusted_set: &[TrustedBlock]) -> PruningImportResult<()> { - let pruning_point_header = proof[0].last().unwrap().clone(); - let pruning_point = pruning_point_header.hash; - - // Create a copy of the proof, since we're going to be mutating the proof passed to us - let proof_sets = (0..=self.max_block_level) - .map(|level| BlockHashSet::from_iter(proof[level as usize].iter().map(|header| header.hash))) - .collect_vec(); - - let mut trusted_gd_map: BlockHashMap = BlockHashMap::new(); - for tb in trusted_set.iter() { - trusted_gd_map.insert(tb.block.hash(), tb.ghostdag.clone().into()); - let tb_block_level = calc_block_level(&tb.block.header, self.max_block_level); - - (0..=tb_block_level).for_each(|current_proof_level| { - // If this block was in the original proof, ignore it - if proof_sets[current_proof_level as usize].contains(&tb.block.hash()) { - return; - } - - proof[current_proof_level as usize].push(tb.block.header.clone()); - }); - } - - proof.iter_mut().for_each(|level_proof| { - level_proof.sort_by(|a, b| a.blue_work.cmp(&b.blue_work)); - }); - - self.populate_reachability_and_headers(&proof); - - { - let reachability_read = self.reachability_store.read(); - for tb in trusted_set.iter() { - // Header-only trusted blocks are expected to be in pruning point past - if tb.block.is_header_only() && !reachability_read.is_dag_ancestor_of(tb.block.hash(), pruning_point) { - return Err(PruningImportError::PruningPointPastMissingReachability(tb.block.hash())); - } - } - } - - for (level, headers) in proof.iter().enumerate() { - trace!("Applying level {} from the pruning point proof", level); - let mut level_ancestors: HashSet = HashSet::new(); - level_ancestors.insert(ORIGIN); - - for header in headers.iter() { - let parents = Arc::new( - self.parents_manager - .parents_at_level(header, level as BlockLevel) - .iter() - .copied() - .filter(|parent| level_ancestors.contains(parent)) - .collect_vec() - .push_if_empty(ORIGIN), - ); - - self.relations_stores.write()[level].insert(header.hash, parents.clone()).unwrap(); - - if level == 0 { - let gd = if let Some(gd) = trusted_gd_map.get(&header.hash) { - gd.clone() - } else { - let calculated_gd = self.ghostdag_manager.ghostdag(&parents); - // Override the ghostdag data with the real blue score and blue work - GhostdagData { - blue_score: header.blue_score, - blue_work: header.blue_work, - selected_parent: calculated_gd.selected_parent, - mergeset_blues: calculated_gd.mergeset_blues, - mergeset_reds: calculated_gd.mergeset_reds, - blues_anticone_sizes: calculated_gd.blues_anticone_sizes, - } - }; - self.ghostdag_store.insert(header.hash, Arc::new(gd)).unwrap(); - } - - level_ancestors.insert(header.hash); - } - } - - let virtual_parents = vec![pruning_point]; - let virtual_state = Arc::new(VirtualState { - parents: virtual_parents.clone(), - ghostdag_data: self.ghostdag_manager.ghostdag(&virtual_parents), - ..VirtualState::default() - }); - self.virtual_stores.write().state.set(virtual_state).unwrap(); - - let mut batch = WriteBatch::default(); - self.body_tips_store.write().init_batch(&mut batch, &virtual_parents).unwrap(); - self.headers_selected_tip_store - .write() - .set_batch(&mut batch, SortableBlock { hash: pruning_point, blue_work: pruning_point_header.blue_work }) - .unwrap(); - self.selected_chain_store.write().init_with_pruning_point(&mut batch, pruning_point).unwrap(); - self.depth_store.insert_batch(&mut batch, pruning_point, ORIGIN, ORIGIN).unwrap(); - self.db.write(batch).unwrap(); - - Ok(()) - } - + // Used in apply and validate fn estimate_proof_unique_size(&self, proof: &PruningPointProof) -> usize { let approx_history_size = proof[0][0].daa_score; let approx_unique_full_levels = f64::log2(approx_history_size as f64 / self.pruning_proof_m as f64).max(0f64) as usize; proof.iter().map(|l| l.len()).sum::().min((approx_unique_full_levels + 1) * self.pruning_proof_m as usize) } - pub fn populate_reachability_and_headers(&self, proof: &PruningPointProof) { - let capacity_estimate = self.estimate_proof_unique_size(proof); - let mut dag = BlockHashMap::with_capacity(capacity_estimate); - let mut up_heap = BinaryHeap::with_capacity(capacity_estimate); - for header in proof.iter().flatten().cloned() { - if let Vacant(e) = dag.entry(header.hash) { - let state = kaspa_pow::State::new(&header); - let (_, pow) = state.check_pow(header.nonce); // TODO: Check if pow passes - let signed_block_level = self.max_block_level as i64 - pow.bits() as i64; - let block_level = max(signed_block_level, 0) as BlockLevel; - self.headers_store.insert(header.hash, header.clone(), block_level).unwrap(); - - let mut parents = BlockHashSet::with_capacity(header.direct_parents().len() * 2); - // We collect all available parent relations in order to maximize reachability information. - // By taking into account parents from all levels we ensure that the induced DAG has valid - // reachability information for each level-specific sub-DAG -- hence a single reachability - // oracle can serve them all - for level in 0..=self.max_block_level { - for parent in self.parents_manager.parents_at_level(&header, level) { - parents.insert(*parent); - } - } - - struct DagEntry { - header: Arc
, - parents: Arc, - } - - up_heap.push(Reverse(SortableBlock { hash: header.hash, blue_work: header.blue_work })); - e.insert(DagEntry { header, parents: Arc::new(parents) }); - } - } - - debug!("Estimated proof size: {}, actual size: {}", capacity_estimate, dag.len()); - - for reverse_sortable_block in up_heap.into_sorted_iter() { - // TODO: Convert to into_iter_sorted once it gets stable - let hash = reverse_sortable_block.0.hash; - let dag_entry = dag.get(&hash).unwrap(); - - // Filter only existing parents - let parents_in_dag = BinaryHeap::from_iter( - dag_entry - .parents - .iter() - .cloned() - .filter(|parent| dag.contains_key(parent)) - .map(|parent| SortableBlock { hash: parent, blue_work: dag.get(&parent).unwrap().header.blue_work }), - ); - - let reachability_read = self.reachability_store.upgradable_read(); - - // Find the maximal parent antichain from the possibly redundant set of existing parents - let mut reachability_parents: Vec = Vec::new(); - for parent in parents_in_dag.into_sorted_iter() { - if reachability_read.is_dag_ancestor_of_any(parent.hash, &mut reachability_parents.iter().map(|parent| parent.hash)) { - continue; - } - - reachability_parents.push(parent); - } - let reachability_parents_hashes = - BlockHashes::new(reachability_parents.iter().map(|parent| parent.hash).collect_vec().push_if_empty(ORIGIN)); - let selected_parent = reachability_parents.iter().max().map(|parent| parent.hash).unwrap_or(ORIGIN); - - // Prepare batch - let mut batch = WriteBatch::default(); - let mut reachability_relations_write = self.reachability_relations_store.write(); - let mut staging_reachability = StagingReachabilityStore::new(reachability_read); - let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write); - - // Stage - staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap(); - let mergeset = unordered_mergeset_without_selected_parent( - &staging_reachability_relations, - &staging_reachability, - selected_parent, - &reachability_parents_hashes, - ); - reachability::add_block(&mut staging_reachability, hash, selected_parent, &mut mergeset.iter().copied()).unwrap(); - - // Commit - let reachability_write = staging_reachability.commit(&mut batch).unwrap(); - staging_reachability_relations.commit(&mut batch).unwrap(); - - // Write - self.db.write(batch).unwrap(); - - // Drop - drop(reachability_write); - drop(reachability_relations_write); - } - } - - fn init_validate_pruning_point_proof_stores_and_processes( - &self, - proof: &PruningPointProof, - ) -> PruningImportResult { - if proof[0].is_empty() { - return Err(PruningImportError::PruningProofNotEnoughHeaders); - } - - let headers_estimate = self.estimate_proof_unique_size(proof); - - let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10)); - let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize); - let headers_store = - Arc::new(DbHeadersStore::new(db.clone(), CachePolicy::Count(headers_estimate), CachePolicy::Count(headers_estimate))); - let ghostdag_stores = (0..=self.max_block_level) - .map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, cache_policy, cache_policy))) - .collect_vec(); - let mut relations_stores = - (0..=self.max_block_level).map(|level| DbRelationsStore::new(db.clone(), level, cache_policy, cache_policy)).collect_vec(); - let reachability_stores = (0..=self.max_block_level) - .map(|level| Arc::new(RwLock::new(DbReachabilityStore::with_block_level(db.clone(), cache_policy, cache_policy, level)))) - .collect_vec(); - - let reachability_services = (0..=self.max_block_level) - .map(|level| MTReachabilityService::new(reachability_stores[level as usize].clone())) - .collect_vec(); - - let ghostdag_managers = ghostdag_stores - .iter() - .cloned() - .enumerate() - .map(|(level, ghostdag_store)| { - GhostdagManager::new( - self.genesis_hash, - self.ghostdag_k, - ghostdag_store, - relations_stores[level].clone(), - headers_store.clone(), - reachability_services[level].clone(), - level != 0, - ) - }) - .collect_vec(); - - { - let mut batch = WriteBatch::default(); - for level in 0..=self.max_block_level { - let level = level as usize; - reachability::init(reachability_stores[level].write().deref_mut()).unwrap(); - relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap(); - ghostdag_stores[level].insert(ORIGIN, ghostdag_managers[level].origin_ghostdag_data()).unwrap(); - } - - db.write(batch).unwrap(); - } - - Ok(TempProofContext { db_lifetime, headers_store, ghostdag_stores, relations_stores, reachability_stores, ghostdag_managers }) - } - - fn populate_stores_for_validate_pruning_point_proof( - &self, - proof: &PruningPointProof, - ctx: &mut TempProofContext, - log_validating: bool, - ) -> PruningImportResult> { - let headers_store = &ctx.headers_store; - let ghostdag_stores = &ctx.ghostdag_stores; - let mut relations_stores = ctx.relations_stores.clone(); - let reachability_stores = &ctx.reachability_stores; - let ghostdag_managers = &ctx.ghostdag_managers; - - let proof_pp_header = proof[0].last().expect("checked if empty"); - let proof_pp = proof_pp_header.hash; - - let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1]; - for level in (0..=self.max_block_level).rev() { - // Before processing this level, check if the process is exiting so we can end early - if self.is_consensus_exiting.load(Ordering::Relaxed) { - return Err(PruningImportError::PruningValidationInterrupted); - } - - if log_validating { - info!("Validating level {level} from the pruning point proof ({} headers)", proof[level as usize].len()); - } - let level_idx = level as usize; - let mut selected_tip = None; - for (i, header) in proof[level as usize].iter().enumerate() { - let header_level = calc_block_level(header, self.max_block_level); - if header_level < level { - return Err(PruningImportError::PruningProofWrongBlockLevel(header.hash, header_level, level)); - } - - headers_store.insert(header.hash, header.clone(), header_level).unwrap_or_exists(); - - let parents = self - .parents_manager - .parents_at_level(header, level) - .iter() - .copied() - .filter(|parent| ghostdag_stores[level_idx].has(*parent).unwrap()) - .collect_vec(); - - // Only the first block at each level is allowed to have no known parents - if parents.is_empty() && i != 0 { - return Err(PruningImportError::PruningProofHeaderWithNoKnownParents(header.hash, level)); - } - - let parents: BlockHashes = parents.push_if_empty(ORIGIN).into(); - - if relations_stores[level_idx].has(header.hash).unwrap() { - return Err(PruningImportError::PruningProofDuplicateHeaderAtLevel(header.hash, level)); - } - - relations_stores[level_idx].insert(header.hash, parents.clone()).unwrap(); - let ghostdag_data = Arc::new(ghostdag_managers[level_idx].ghostdag(&parents)); - ghostdag_stores[level_idx].insert(header.hash, ghostdag_data.clone()).unwrap(); - selected_tip = Some(match selected_tip { - Some(tip) => ghostdag_managers[level_idx].find_selected_parent([tip, header.hash]), - None => header.hash, - }); - - let mut reachability_mergeset = { - let reachability_read = reachability_stores[level_idx].read(); - ghostdag_data - .unordered_mergeset_without_selected_parent() - .filter(|hash| reachability_read.has(*hash).unwrap()) - .collect_vec() // We collect to vector so reachability_read can be released and let `reachability::add_block` use a write lock. - .into_iter() - }; - reachability::add_block( - reachability_stores[level_idx].write().deref_mut(), - header.hash, - ghostdag_data.selected_parent, - &mut reachability_mergeset, - ) - .unwrap(); - - if selected_tip.unwrap() == header.hash { - reachability::hint_virtual_selected_parent(reachability_stores[level_idx].write().deref_mut(), header.hash) - .unwrap(); - } - } - - if level < self.max_block_level { - let block_at_depth_m_at_next_level = self - .block_at_depth( - &*ghostdag_stores[level_idx + 1], - selected_tip_by_level[level_idx + 1].unwrap(), - self.pruning_proof_m, - ) - .unwrap(); - if !relations_stores[level_idx].has(block_at_depth_m_at_next_level).unwrap() { - return Err(PruningImportError::PruningProofMissingBlockAtDepthMFromNextLevel(level, level + 1)); - } - } - - if selected_tip.unwrap() != proof_pp - && !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&selected_tip.unwrap()) - { - return Err(PruningImportError::PruningProofMissesBlocksBelowPruningPoint(selected_tip.unwrap(), level)); - } - - selected_tip_by_level[level_idx] = selected_tip; - } - - Ok(selected_tip_by_level.into_iter().map(|selected_tip| selected_tip.unwrap()).collect()) - } - - fn validate_proof_selected_tip( - &self, - proof_selected_tip: Hash, - level: BlockLevel, - proof_pp_level: BlockLevel, - proof_pp: Hash, - proof_pp_header: &Header, - ) -> PruningImportResult<()> { - // A proof selected tip of some level has to be the proof suggested prunint point itself if its level - // is lower or equal to the pruning point level, or a parent of the pruning point on the relevant level - // otherwise. - if level <= proof_pp_level { - if proof_selected_tip != proof_pp { - return Err(PruningImportError::PruningProofSelectedTipIsNotThePruningPoint(proof_selected_tip, level)); - } - } else if !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&proof_selected_tip) { - return Err(PruningImportError::PruningProofSelectedTipNotParentOfPruningPoint(proof_selected_tip, level)); - } - - Ok(()) - } - - // find_proof_and_consensus_common_chain_ancestor_ghostdag_data returns an option of a tuple - // that contains the ghostdag data of the proof and current consensus common ancestor. If no - // such ancestor exists, it returns None. - fn find_proof_and_consensus_common_ancestor_ghostdag_data( - &self, - proof_ghostdag_stores: &[Arc], - current_consensus_ghostdag_stores: &[Arc], - proof_selected_tip: Hash, - level: BlockLevel, - proof_selected_tip_gd: CompactGhostdagData, - ) -> Option<(CompactGhostdagData, CompactGhostdagData)> { - let mut proof_current = proof_selected_tip; - let mut proof_current_gd = proof_selected_tip_gd; - loop { - match current_consensus_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap_option() { - Some(current_gd) => { - break Some((proof_current_gd, current_gd)); - } - None => { - proof_current = proof_current_gd.selected_parent; - if proof_current.is_origin() { - break None; - } - proof_current_gd = proof_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap(); - } - }; - } - } - - pub fn validate_pruning_point_proof(&self, proof: &PruningPointProof) -> PruningImportResult<()> { - if proof.len() != self.max_block_level as usize + 1 { - return Err(PruningImportError::ProofNotEnoughLevels(self.max_block_level as usize + 1)); - } - - // Initialize the stores for the proof - let mut proof_stores_and_processes = self.init_validate_pruning_point_proof_stores_and_processes(proof)?; - let proof_pp_header = proof[0].last().expect("checked if empty"); - let proof_pp = proof_pp_header.hash; - let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level); - let proof_selected_tip_by_level = - self.populate_stores_for_validate_pruning_point_proof(proof, &mut proof_stores_and_processes, true)?; - let proof_ghostdag_stores = proof_stores_and_processes.ghostdag_stores; - - // Get the proof for the current consensus and recreate the stores for it - // This is expected to be fast because if a proof exists, it will be cached. - // If no proof exists, this is empty - let mut current_consensus_proof = self.get_pruning_point_proof(); - if current_consensus_proof.is_empty() { - // An empty proof can only happen if we're at genesis. We're going to create a proof for this case that contains the genesis header only - let genesis_header = self.headers_store.get_header(self.genesis_hash).unwrap(); - current_consensus_proof = Arc::new((0..=self.max_block_level).map(|_| vec![genesis_header.clone()]).collect_vec()); - } - let mut current_consensus_stores_and_processes = - self.init_validate_pruning_point_proof_stores_and_processes(¤t_consensus_proof)?; - let _ = self.populate_stores_for_validate_pruning_point_proof( - ¤t_consensus_proof, - &mut current_consensus_stores_and_processes, - false, - )?; - let current_consensus_ghostdag_stores = current_consensus_stores_and_processes.ghostdag_stores; - - let pruning_read = self.pruning_point_store.read(); - let relations_read = self.relations_stores.read(); - let current_pp = pruning_read.get().unwrap().pruning_point; - let current_pp_header = self.headers_store.get_header(current_pp).unwrap(); - - for (level_idx, selected_tip) in proof_selected_tip_by_level.iter().copied().enumerate() { - let level = level_idx as BlockLevel; - self.validate_proof_selected_tip(selected_tip, level, proof_pp_level, proof_pp, proof_pp_header)?; - - let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(selected_tip).unwrap(); - - // Next check is to see if this proof is "better" than what's in the current consensus - // Step 1 - look at only levels that have a full proof (least 2m blocks in the proof) - if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m { - continue; - } - - // Step 2 - if we can find a common ancestor between the proof and current consensus - // we can determine if the proof is better. The proof is better if the blue work* difference between the - // old current consensus's tips and the common ancestor is less than the blue work difference between the - // proof's tip and the common ancestor. - // *Note: blue work is the same as blue score on levels higher than 0 - if let Some((proof_common_ancestor_gd, common_ancestor_gd)) = self.find_proof_and_consensus_common_ancestor_ghostdag_data( - &proof_ghostdag_stores, - ¤t_consensus_ghostdag_stores, - selected_tip, - level, - proof_selected_tip_gd, - ) { - let selected_tip_blue_work_diff = - SignedInteger::from(proof_selected_tip_gd.blue_work) - SignedInteger::from(proof_common_ancestor_gd.blue_work); - for parent in self.parents_manager.parents_at_level(¤t_pp_header, level).iter().copied() { - let parent_blue_work = current_consensus_ghostdag_stores[level_idx].get_blue_work(parent).unwrap(); - let parent_blue_work_diff = - SignedInteger::from(parent_blue_work) - SignedInteger::from(common_ancestor_gd.blue_work); - if parent_blue_work_diff >= selected_tip_blue_work_diff { - return Err(PruningImportError::PruningProofInsufficientBlueWork); - } - } - - return Ok(()); - } - } - - if current_pp == self.genesis_hash { - // If the proof has better tips and the current pruning point is still - // genesis, we consider the proof state to be better. - return Ok(()); - } - - // If we got here it means there's no level with shared blocks - // between the proof and the current consensus. In this case we - // consider the proof to be better if it has at least one level - // with 2*self.pruning_proof_m blue blocks where consensus doesn't. - for level in (0..=self.max_block_level).rev() { - let level_idx = level as usize; - - let proof_selected_tip = proof_selected_tip_by_level[level_idx]; - let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(proof_selected_tip).unwrap(); - if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m { - continue; - } - - match relations_read[level_idx].get_parents(current_pp).unwrap_option() { - Some(parents) => { - if parents.iter().copied().any(|parent| { - current_consensus_ghostdag_stores[level_idx].get_blue_score(parent).unwrap() < 2 * self.pruning_proof_m - }) { - return Ok(()); - } - } - None => { - // If the current pruning point doesn't have a parent at this level, we consider the proof state to be better. - return Ok(()); - } - } - } - - drop(pruning_read); - drop(relations_read); - drop(proof_stores_and_processes.db_lifetime); - drop(current_consensus_stores_and_processes.db_lifetime); - - Err(PruningImportError::PruningProofNotEnoughHeaders) - } - - // The "current dag level" is the level right before the level whose parents are - // not the same as our header's direct parents - // - // Find the current DAG level by going through all the parents at each level, - // starting from the bottom level and see which is the first level that has - // parents that are NOT our current pp_header's direct parents. - fn find_current_dag_level(&self, pp_header: &Header) -> BlockLevel { - let direct_parents = BlockHashSet::from_iter(pp_header.direct_parents().iter().copied()); - pp_header - .parents_by_level - .iter() - .enumerate() - .skip(1) - .find_map(|(level, parents)| { - if BlockHashSet::from_iter(parents.iter().copied()) == direct_parents { - None - } else { - Some((level - 1) as BlockLevel) - } - }) - .unwrap_or(self.max_block_level) - } - - fn estimated_blue_depth_at_level_0(&self, level: BlockLevel, level_depth: u64, current_dag_level: BlockLevel) -> u64 { - level_depth.checked_shl(level.saturating_sub(current_dag_level) as u32).unwrap_or(level_depth) - } - - /// selected parent at level = the parent of the header at the level - /// with the highest blue_work - fn find_selected_parent_header_at_level( - &self, - header: &Header, - level: BlockLevel, - ) -> PruningProofManagerInternalResult> { - // Parents manager parents_at_level may return parents that aren't in relations_service, so it's important - // to filter to include only parents that are in relations_service. - let sp = self - .parents_manager - .parents_at_level(header, level) - .iter() - .copied() - .filter(|p| self.level_relations_services[level as usize].has(*p).unwrap()) - .filter_map(|p| self.headers_store.get_header(p).unwrap_option().map(|h| SortableBlock::new(p, h.blue_work))) - .max() - .ok_or(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof("no parents with header".to_string()))?; - Ok(self.headers_store.get_header(sp.hash).expect("unwrapped above")) - } - - /// Find a sufficient root at a given level by going through the headers store and looking - /// for a deep enough level block - /// For each root candidate, fill in the ghostdag data to see if it actually is deep enough. - /// If the root is deep enough, it will satisfy these conditions - /// 1. block at depth 2m at this level ∈ Future(root) - /// 2. block at depth m at the next level ∈ Future(root) - /// - /// Returns: the filled ghostdag store from root to tip, the selected tip and the root - fn find_sufficient_root( - &self, - pp_header: &HeaderWithBlockLevel, - level: BlockLevel, - current_dag_level: BlockLevel, - required_block: Option, - temp_db: Arc, - ) -> PruningProofManagerInternalResult<(Arc, Hash, Hash)> { - // Step 1: Determine which selected tip to use - let selected_tip = if pp_header.block_level >= level { - pp_header.header.hash - } else { - self.find_selected_parent_header_at_level(&pp_header.header, level)?.hash - }; - - let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize); - let required_level_depth = 2 * self.pruning_proof_m; - - // We only have the headers store (which has level 0 blue_scores) to assemble the proof data from. - // We need to look deeper at higher levels (2x deeper every level) to find 2M (plus margin) blocks at that level - let mut required_base_level_depth = self.estimated_blue_depth_at_level_0( - level, - required_level_depth + 100, // We take a safety margin - current_dag_level, - ); - - let mut is_last_level_header; - let mut tries = 0; - - let block_at_depth_m_at_next_level = required_block.unwrap_or(selected_tip); - - loop { - // Step 2 - Find a deep enough root candidate - let block_at_depth_2m = match self.level_block_at_base_depth(level, selected_tip, required_base_level_depth) { - Ok((header, is_last_header)) => { - is_last_level_header = is_last_header; - header - } - Err(e) => return Err(e), - }; - - let root = if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) { - block_at_depth_2m - } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) { - block_at_depth_m_at_next_level - } else { - // find common ancestor of block_at_depth_m_at_next_level and block_at_depth_2m in chain of block_at_depth_m_at_next_level - let mut common_ancestor = self.headers_store.get_header(block_at_depth_m_at_next_level).unwrap(); - - while !self.reachability_service.is_dag_ancestor_of(common_ancestor.hash, block_at_depth_2m) { - common_ancestor = match self.find_selected_parent_header_at_level(&common_ancestor, level) { - Ok(header) => header, - // Try to give this last header a chance at being root - Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => break, - Err(e) => return Err(e), - }; - } - - common_ancestor.hash - }; - - if level == 0 { - return Ok((self.ghostdag_store.clone(), selected_tip, root)); - } - - // Step 3 - Fill the ghostdag data from root to tip - let ghostdag_store = Arc::new(DbGhostdagStore::new_temp(temp_db.clone(), level, cache_policy, cache_policy, tries)); - let has_required_block = self.fill_level_proof_ghostdag_data( - root, - pp_header.header.hash, - &ghostdag_store, - Some(block_at_depth_m_at_next_level), - level, - ); - - // Step 4 - Check if we actually have enough depth. - // Need to ensure this does the same 2M+1 depth that block_at_depth does - if has_required_block - && (root == self.genesis_hash || ghostdag_store.get_blue_score(selected_tip).unwrap() >= required_level_depth) - { - break Ok((ghostdag_store, selected_tip, root)); - } - - tries += 1; - if is_last_level_header { - if has_required_block { - // Normally this scenario doesn't occur when syncing with nodes that already have the safety margin change in place. - // However, when syncing with an older node version that doesn't have a safety margin for the proof, it's possible to - // try to find 2500 depth worth of headers at a level, but the proof only contains about 2000 headers. To be able to sync - // with such an older node. As long as we found the required block, we can still proceed. - debug!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned. Required block found so trying anyway."); - break Ok((ghostdag_store, selected_tip, root)); - } else { - panic!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned"); - } - } - - // If we don't have enough depth now, we need to look deeper - required_base_level_depth = (required_base_level_depth as f64 * 1.1) as u64; - debug!("Failed to find sufficient root for level {level} after {tries} tries. Retrying again to find with depth {required_base_level_depth}"); - } - } - - fn calc_gd_for_all_levels( - &self, - pp_header: &HeaderWithBlockLevel, - temp_db: Arc, - ) -> (Vec>, Vec, Vec) { - let current_dag_level = self.find_current_dag_level(&pp_header.header); - let mut ghostdag_stores: Vec>> = vec![None; self.max_block_level as usize + 1]; - let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1]; - let mut root_by_level = vec![None; self.max_block_level as usize + 1]; - for level in (0..=self.max_block_level).rev() { - let level_usize = level as usize; - let required_block = if level != self.max_block_level { - let next_level_store = ghostdag_stores[level_usize + 1].as_ref().unwrap().clone(); - let block_at_depth_m_at_next_level = self - .block_at_depth(&*next_level_store, selected_tip_by_level[level_usize + 1].unwrap(), self.pruning_proof_m) - .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err)) - .unwrap(); - Some(block_at_depth_m_at_next_level) - } else { - None - }; - let (store, selected_tip, root) = self - .find_sufficient_root(pp_header, level, current_dag_level, required_block, temp_db.clone()) - .unwrap_or_else(|_| panic!("find_sufficient_root failed for level {level}")); - ghostdag_stores[level_usize] = Some(store); - selected_tip_by_level[level_usize] = Some(selected_tip); - root_by_level[level_usize] = Some(root); - } - - ( - ghostdag_stores.into_iter().map(Option::unwrap).collect_vec(), - selected_tip_by_level.into_iter().map(Option::unwrap).collect_vec(), - root_by_level.into_iter().map(Option::unwrap).collect_vec(), - ) - } - - pub(crate) fn build_pruning_point_proof(&self, pp: Hash) -> PruningPointProof { - if pp == self.genesis_hash { - return vec![]; - } - - let (_db_lifetime, temp_db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10)); - let pp_header = self.headers_store.get_header_with_block_level(pp).unwrap(); - let (ghostdag_stores, selected_tip_by_level, roots_by_level) = self.calc_gd_for_all_levels(&pp_header, temp_db); - - (0..=self.max_block_level) - .map(|level| { - let level = level as usize; - let selected_tip = selected_tip_by_level[level]; - let block_at_depth_2m = self - .block_at_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m) - .map_err(|err| format!("level: {}, err: {}", level, err)) - .unwrap(); - - // TODO (relaxed): remove the assertion below - // (New Logic) This is the root we calculated by going through block relations - let root = roots_by_level[level]; - // (Old Logic) This is the root we can calculate given that the GD records are already filled - // The root calc logic below is the original logic before the on-demand higher level GD calculation - // We only need old_root to sanity check the new logic - let old_root = if level != self.max_block_level as usize { - let block_at_depth_m_at_next_level = self - .block_at_depth(&*ghostdag_stores[level + 1], selected_tip_by_level[level + 1], self.pruning_proof_m) - .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err)) - .unwrap(); - if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) { - block_at_depth_m_at_next_level - } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) { - block_at_depth_2m - } else { - self.find_common_ancestor_in_chain_of_a( - &*ghostdag_stores[level], - block_at_depth_m_at_next_level, - block_at_depth_2m, - ) - .map_err(|err| format!("level: {}, err: {}", level, err)) - .unwrap() - } - } else { - block_at_depth_2m - }; - - // new root is expected to be always an ancestor of old_root because new root takes a safety margin - assert!(self.reachability_service.is_dag_ancestor_of(root, old_root)); - - let mut headers = Vec::with_capacity(2 * self.pruning_proof_m as usize); - let mut queue = BinaryHeap::>::new(); - let mut visited = BlockHashSet::new(); - queue.push(Reverse(SortableBlock::new(root, self.headers_store.get_header(root).unwrap().blue_work))); - while let Some(current) = queue.pop() { - let current = current.0.hash; - if !visited.insert(current) { - continue; - } - - // The second condition is always expected to be true (ghostdag store will have the entry) - // because we are traversing the exact diamond (future(root) ⋂ past(tip)) for which we calculated - // GD for (see fill_level_proof_ghostdag_data). TODO (relaxed): remove the condition or turn into assertion - if !self.reachability_service.is_dag_ancestor_of(current, selected_tip) - || !ghostdag_stores[level].has(current).is_ok_and(|found| found) - { - continue; - } - - headers.push(self.headers_store.get_header(current).unwrap()); - for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() { - queue.push(Reverse(SortableBlock::new(child, self.headers_store.get_header(child).unwrap().blue_work))); - } - } - - // TODO (relaxed): remove the assertion below - // Temp assertion for verifying a bug fix: assert that the full 2M chain is actually contained in the composed level proof - let set = BlockHashSet::from_iter(headers.iter().map(|h| h.hash)); - let chain_2m = self - .chain_up_to_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m) - .map_err(|err| { - dbg!(level, selected_tip, block_at_depth_2m, root); - format!("Assert 2M chain -- level: {}, err: {}", level, err) - }) - .unwrap(); - let chain_2m_len = chain_2m.len(); - for (i, chain_hash) in chain_2m.into_iter().enumerate() { - if !set.contains(&chain_hash) { - let next_level_tip = selected_tip_by_level[level + 1]; - let next_level_chain_m = - self.chain_up_to_depth(&*ghostdag_stores[level + 1], next_level_tip, self.pruning_proof_m).unwrap(); - let next_level_block_m = next_level_chain_m.last().copied().unwrap(); - dbg!(next_level_chain_m.len()); - dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_tip).unwrap().blue_score); - dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_block_m).unwrap().blue_score); - dbg!(ghostdag_stores[level].get_compact_data(selected_tip).unwrap().blue_score); - dbg!(ghostdag_stores[level].get_compact_data(block_at_depth_2m).unwrap().blue_score); - dbg!(level, selected_tip, block_at_depth_2m, root); - panic!("Assert 2M chain -- missing block {} at index {} out of {} chain blocks", chain_hash, i, chain_2m_len); - } - } - - headers - }) - .collect_vec() - } - - /// BFS forward iterates from root until selected tip, ignoring blocks in the antipast of selected_tip. - /// For each block along the way, insert that hash into the ghostdag_store - /// If we have a required_block to find, this will return true if that block was found along the way - fn fill_level_proof_ghostdag_data( - &self, - root: Hash, - selected_tip: Hash, - ghostdag_store: &Arc, - required_block: Option, - level: BlockLevel, - ) -> bool { - let relations_service = RelationsStoreInFutureOfRoot { - relations_store: self.level_relations_services[level as usize].clone(), - reachability_service: self.reachability_service.clone(), - root, - }; - let gd_manager = GhostdagManager::new( - root, - self.ghostdag_k, - ghostdag_store.clone(), - relations_service.clone(), - self.headers_store.clone(), - self.reachability_service.clone(), - level != 0, - ); - - ghostdag_store.insert(root, Arc::new(gd_manager.genesis_ghostdag_data())).unwrap(); - ghostdag_store.insert(ORIGIN, gd_manager.origin_ghostdag_data()).unwrap(); - - let mut topological_heap: BinaryHeap<_> = Default::default(); - let mut visited = BlockHashSet::new(); - for child in relations_service.get_children(root).unwrap().read().iter().copied() { - topological_heap.push(Reverse(SortableBlock { - hash: child, - // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology - blue_work: self.headers_store.get_header(child).unwrap().blue_work, - })); - } - - let mut has_required_block = required_block.is_some_and(|required_block| root == required_block); - loop { - let Some(current) = topological_heap.pop() else { - break; - }; - let current_hash = current.0.hash; - if !visited.insert(current_hash) { - continue; - } - - if !self.reachability_service.is_dag_ancestor_of(current_hash, selected_tip) { - // We don't care about blocks in the antipast of the selected tip - continue; - } - - if !has_required_block && required_block.is_some_and(|required_block| current_hash == required_block) { - has_required_block = true; - } - - let current_gd = gd_manager.ghostdag(&relations_service.get_parents(current_hash).unwrap()); - - ghostdag_store.insert(current_hash, Arc::new(current_gd)).unwrap_or_exists(); - - for child in relations_service.get_children(current_hash).unwrap().read().iter().copied() { - topological_heap.push(Reverse(SortableBlock { - hash: child, - // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology - blue_work: self.headers_store.get_header(child).unwrap().blue_work, - })); - } - } - - has_required_block - } - - /// Copy of `block_at_depth` which returns the full chain up to depth. Temporarily used for assertion purposes. - fn chain_up_to_depth( - &self, - ghostdag_store: &impl GhostdagStoreReader, - high: Hash, - depth: u64, - ) -> Result, PruningProofManagerInternalError> { - let high_gd = ghostdag_store - .get_compact_data(high) - .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?; - let mut current_gd = high_gd; - let mut current = high; - let mut res = vec![current]; - while current_gd.blue_score + depth >= high_gd.blue_score { - if current_gd.selected_parent.is_origin() { - break; - } - let prev = current; - current = current_gd.selected_parent; - res.push(current); - current_gd = ghostdag_store.get_compact_data(current).map_err(|err| { - PruningProofManagerInternalError::BlockAtDepth(format!( - "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}", - high, depth, prev, high_gd.blue_score, current_gd.blue_score, err - )) - })?; - } - Ok(res) - } - + // Used in build and validate fn block_at_depth( &self, ghostdag_store: &impl GhostdagStoreReader, @@ -1229,69 +241,6 @@ impl PruningProofManager { Ok(current) } - /// Finds the block on a given level that is at base_depth deep from it. - /// Also returns if the block was the last one in the level - /// base_depth = the blue score depth at level 0 - fn level_block_at_base_depth( - &self, - level: BlockLevel, - high: Hash, - base_depth: u64, - ) -> PruningProofManagerInternalResult<(Hash, bool)> { - let high_header = self - .headers_store - .get_header(high) - .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {base_depth}, {err}")))?; - let high_header_score = high_header.blue_score; - let mut current_header = high_header; - - let mut is_last_header = false; - - while current_header.blue_score + base_depth >= high_header_score { - if current_header.direct_parents().is_empty() { - break; - } - - current_header = match self.find_selected_parent_header_at_level(¤t_header, level) { - Ok(header) => header, - Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => { - // We want to give this root a shot if all its past is pruned - is_last_header = true; - break; - } - Err(e) => return Err(e), - }; - } - Ok((current_header.hash, is_last_header)) - } - - fn find_common_ancestor_in_chain_of_a( - &self, - ghostdag_store: &impl GhostdagStoreReader, - a: Hash, - b: Hash, - ) -> Result { - let a_gd = ghostdag_store - .get_compact_data(a) - .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?; - let mut current_gd = a_gd; - let mut current; - let mut loop_counter = 0; - loop { - current = current_gd.selected_parent; - loop_counter += 1; - if current.is_origin() { - break Err(PruningProofManagerInternalError::NoCommonAncestor(format!("a: {a}, b: {b} ({loop_counter} loop steps)"))); - } - if self.reachability_service.is_dag_ancestor_of(current, b) { - break Ok(current); - } - current_gd = ghostdag_store - .get_compact_data(current) - .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?; - } - } - /// Returns the k + 1 chain blocks below this hash (inclusive). If data is missing /// the search is halted and a partial chain is returned. /// diff --git a/consensus/src/processes/pruning_proof/validate.rs b/consensus/src/processes/pruning_proof/validate.rs new file mode 100644 index 000000000..63650cdc5 --- /dev/null +++ b/consensus/src/processes/pruning_proof/validate.rs @@ -0,0 +1,376 @@ +use std::{ + ops::DerefMut, + sync::{atomic::Ordering, Arc}, +}; + +use itertools::Itertools; +use kaspa_consensus_core::{ + blockhash::{BlockHashExtensions, BlockHashes, ORIGIN}, + errors::pruning::{PruningImportError, PruningImportResult}, + header::Header, + pruning::PruningPointProof, + BlockLevel, +}; +use kaspa_core::info; +use kaspa_database::prelude::{CachePolicy, ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions}; +use kaspa_hashes::Hash; +use kaspa_math::int::SignedInteger; +use kaspa_pow::calc_block_level; +use kaspa_utils::vec::VecExtensions; +use parking_lot::lock_api::RwLock; +use rocksdb::WriteBatch; + +use crate::{ + model::{ + services::reachability::MTReachabilityService, + stores::{ + ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagStore, GhostdagStoreReader}, + headers::{DbHeadersStore, HeaderStore, HeaderStoreReader}, + pruning::PruningStoreReader, + reachability::{DbReachabilityStore, ReachabilityStoreReader}, + relations::{DbRelationsStore, RelationsStoreReader}, + }, + }, + processes::{ghostdag::protocol::GhostdagManager, reachability::inquirer as reachability, relations::RelationsStoreExtensions}, +}; + +use super::{PruningProofManager, TempProofContext}; + +impl PruningProofManager { + pub fn validate_pruning_point_proof(&self, proof: &PruningPointProof) -> PruningImportResult<()> { + if proof.len() != self.max_block_level as usize + 1 { + return Err(PruningImportError::ProofNotEnoughLevels(self.max_block_level as usize + 1)); + } + + // Initialize the stores for the proof + let mut proof_stores_and_processes = self.init_validate_pruning_point_proof_stores_and_processes(proof)?; + let proof_pp_header = proof[0].last().expect("checked if empty"); + let proof_pp = proof_pp_header.hash; + let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level); + let proof_selected_tip_by_level = + self.populate_stores_for_validate_pruning_point_proof(proof, &mut proof_stores_and_processes, true)?; + let proof_ghostdag_stores = proof_stores_and_processes.ghostdag_stores; + + // Get the proof for the current consensus and recreate the stores for it + // This is expected to be fast because if a proof exists, it will be cached. + // If no proof exists, this is empty + let mut current_consensus_proof = self.get_pruning_point_proof(); + if current_consensus_proof.is_empty() { + // An empty proof can only happen if we're at genesis. We're going to create a proof for this case that contains the genesis header only + let genesis_header = self.headers_store.get_header(self.genesis_hash).unwrap(); + current_consensus_proof = Arc::new((0..=self.max_block_level).map(|_| vec![genesis_header.clone()]).collect_vec()); + } + let mut current_consensus_stores_and_processes = + self.init_validate_pruning_point_proof_stores_and_processes(¤t_consensus_proof)?; + let _ = self.populate_stores_for_validate_pruning_point_proof( + ¤t_consensus_proof, + &mut current_consensus_stores_and_processes, + false, + )?; + let current_consensus_ghostdag_stores = current_consensus_stores_and_processes.ghostdag_stores; + + let pruning_read = self.pruning_point_store.read(); + let relations_read = self.relations_stores.read(); + let current_pp = pruning_read.get().unwrap().pruning_point; + let current_pp_header = self.headers_store.get_header(current_pp).unwrap(); + + for (level_idx, selected_tip) in proof_selected_tip_by_level.iter().copied().enumerate() { + let level = level_idx as BlockLevel; + self.validate_proof_selected_tip(selected_tip, level, proof_pp_level, proof_pp, proof_pp_header)?; + + let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(selected_tip).unwrap(); + + // Next check is to see if this proof is "better" than what's in the current consensus + // Step 1 - look at only levels that have a full proof (least 2m blocks in the proof) + if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m { + continue; + } + + // Step 2 - if we can find a common ancestor between the proof and current consensus + // we can determine if the proof is better. The proof is better if the blue work* difference between the + // old current consensus's tips and the common ancestor is less than the blue work difference between the + // proof's tip and the common ancestor. + // *Note: blue work is the same as blue score on levels higher than 0 + if let Some((proof_common_ancestor_gd, common_ancestor_gd)) = self.find_proof_and_consensus_common_ancestor_ghostdag_data( + &proof_ghostdag_stores, + ¤t_consensus_ghostdag_stores, + selected_tip, + level, + proof_selected_tip_gd, + ) { + let selected_tip_blue_work_diff = + SignedInteger::from(proof_selected_tip_gd.blue_work) - SignedInteger::from(proof_common_ancestor_gd.blue_work); + for parent in self.parents_manager.parents_at_level(¤t_pp_header, level).iter().copied() { + let parent_blue_work = current_consensus_ghostdag_stores[level_idx].get_blue_work(parent).unwrap(); + let parent_blue_work_diff = + SignedInteger::from(parent_blue_work) - SignedInteger::from(common_ancestor_gd.blue_work); + if parent_blue_work_diff >= selected_tip_blue_work_diff { + return Err(PruningImportError::PruningProofInsufficientBlueWork); + } + } + + return Ok(()); + } + } + + if current_pp == self.genesis_hash { + // If the proof has better tips and the current pruning point is still + // genesis, we consider the proof state to be better. + return Ok(()); + } + + // If we got here it means there's no level with shared blocks + // between the proof and the current consensus. In this case we + // consider the proof to be better if it has at least one level + // with 2*self.pruning_proof_m blue blocks where consensus doesn't. + for level in (0..=self.max_block_level).rev() { + let level_idx = level as usize; + + let proof_selected_tip = proof_selected_tip_by_level[level_idx]; + let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(proof_selected_tip).unwrap(); + if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m { + continue; + } + + match relations_read[level_idx].get_parents(current_pp).unwrap_option() { + Some(parents) => { + if parents.iter().copied().any(|parent| { + current_consensus_ghostdag_stores[level_idx].get_blue_score(parent).unwrap() < 2 * self.pruning_proof_m + }) { + return Ok(()); + } + } + None => { + // If the current pruning point doesn't have a parent at this level, we consider the proof state to be better. + return Ok(()); + } + } + } + + drop(pruning_read); + drop(relations_read); + drop(proof_stores_and_processes.db_lifetime); + drop(current_consensus_stores_and_processes.db_lifetime); + + Err(PruningImportError::PruningProofNotEnoughHeaders) + } + + fn init_validate_pruning_point_proof_stores_and_processes( + &self, + proof: &PruningPointProof, + ) -> PruningImportResult { + if proof[0].is_empty() { + return Err(PruningImportError::PruningProofNotEnoughHeaders); + } + + let headers_estimate = self.estimate_proof_unique_size(proof); + + let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize); + let headers_store = + Arc::new(DbHeadersStore::new(db.clone(), CachePolicy::Count(headers_estimate), CachePolicy::Count(headers_estimate))); + let ghostdag_stores = (0..=self.max_block_level) + .map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, cache_policy, cache_policy))) + .collect_vec(); + let mut relations_stores = + (0..=self.max_block_level).map(|level| DbRelationsStore::new(db.clone(), level, cache_policy, cache_policy)).collect_vec(); + let reachability_stores = (0..=self.max_block_level) + .map(|level| Arc::new(RwLock::new(DbReachabilityStore::with_block_level(db.clone(), cache_policy, cache_policy, level)))) + .collect_vec(); + + let reachability_services = (0..=self.max_block_level) + .map(|level| MTReachabilityService::new(reachability_stores[level as usize].clone())) + .collect_vec(); + + let ghostdag_managers = ghostdag_stores + .iter() + .cloned() + .enumerate() + .map(|(level, ghostdag_store)| { + GhostdagManager::new( + self.genesis_hash, + self.ghostdag_k, + ghostdag_store, + relations_stores[level].clone(), + headers_store.clone(), + reachability_services[level].clone(), + level != 0, + ) + }) + .collect_vec(); + + { + let mut batch = WriteBatch::default(); + for level in 0..=self.max_block_level { + let level = level as usize; + reachability::init(reachability_stores[level].write().deref_mut()).unwrap(); + relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap(); + ghostdag_stores[level].insert(ORIGIN, ghostdag_managers[level].origin_ghostdag_data()).unwrap(); + } + + db.write(batch).unwrap(); + } + + Ok(TempProofContext { db_lifetime, headers_store, ghostdag_stores, relations_stores, reachability_stores, ghostdag_managers }) + } + + fn populate_stores_for_validate_pruning_point_proof( + &self, + proof: &PruningPointProof, + ctx: &mut TempProofContext, + log_validating: bool, + ) -> PruningImportResult> { + let headers_store = &ctx.headers_store; + let ghostdag_stores = &ctx.ghostdag_stores; + let mut relations_stores = ctx.relations_stores.clone(); + let reachability_stores = &ctx.reachability_stores; + let ghostdag_managers = &ctx.ghostdag_managers; + + let proof_pp_header = proof[0].last().expect("checked if empty"); + let proof_pp = proof_pp_header.hash; + + let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1]; + for level in (0..=self.max_block_level).rev() { + // Before processing this level, check if the process is exiting so we can end early + if self.is_consensus_exiting.load(Ordering::Relaxed) { + return Err(PruningImportError::PruningValidationInterrupted); + } + + if log_validating { + info!("Validating level {level} from the pruning point proof ({} headers)", proof[level as usize].len()); + } + let level_idx = level as usize; + let mut selected_tip = None; + for (i, header) in proof[level as usize].iter().enumerate() { + let header_level = calc_block_level(header, self.max_block_level); + if header_level < level { + return Err(PruningImportError::PruningProofWrongBlockLevel(header.hash, header_level, level)); + } + + headers_store.insert(header.hash, header.clone(), header_level).unwrap_or_exists(); + + let parents = self + .parents_manager + .parents_at_level(header, level) + .iter() + .copied() + .filter(|parent| ghostdag_stores[level_idx].has(*parent).unwrap()) + .collect_vec(); + + // Only the first block at each level is allowed to have no known parents + if parents.is_empty() && i != 0 { + return Err(PruningImportError::PruningProofHeaderWithNoKnownParents(header.hash, level)); + } + + let parents: BlockHashes = parents.push_if_empty(ORIGIN).into(); + + if relations_stores[level_idx].has(header.hash).unwrap() { + return Err(PruningImportError::PruningProofDuplicateHeaderAtLevel(header.hash, level)); + } + + relations_stores[level_idx].insert(header.hash, parents.clone()).unwrap(); + let ghostdag_data = Arc::new(ghostdag_managers[level_idx].ghostdag(&parents)); + ghostdag_stores[level_idx].insert(header.hash, ghostdag_data.clone()).unwrap(); + selected_tip = Some(match selected_tip { + Some(tip) => ghostdag_managers[level_idx].find_selected_parent([tip, header.hash]), + None => header.hash, + }); + + let mut reachability_mergeset = { + let reachability_read = reachability_stores[level_idx].read(); + ghostdag_data + .unordered_mergeset_without_selected_parent() + .filter(|hash| reachability_read.has(*hash).unwrap()) + .collect_vec() // We collect to vector so reachability_read can be released and let `reachability::add_block` use a write lock. + .into_iter() + }; + reachability::add_block( + reachability_stores[level_idx].write().deref_mut(), + header.hash, + ghostdag_data.selected_parent, + &mut reachability_mergeset, + ) + .unwrap(); + + if selected_tip.unwrap() == header.hash { + reachability::hint_virtual_selected_parent(reachability_stores[level_idx].write().deref_mut(), header.hash) + .unwrap(); + } + } + + if level < self.max_block_level { + let block_at_depth_m_at_next_level = self + .block_at_depth( + &*ghostdag_stores[level_idx + 1], + selected_tip_by_level[level_idx + 1].unwrap(), + self.pruning_proof_m, + ) + .unwrap(); + if !relations_stores[level_idx].has(block_at_depth_m_at_next_level).unwrap() { + return Err(PruningImportError::PruningProofMissingBlockAtDepthMFromNextLevel(level, level + 1)); + } + } + + if selected_tip.unwrap() != proof_pp + && !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&selected_tip.unwrap()) + { + return Err(PruningImportError::PruningProofMissesBlocksBelowPruningPoint(selected_tip.unwrap(), level)); + } + + selected_tip_by_level[level_idx] = selected_tip; + } + + Ok(selected_tip_by_level.into_iter().map(|selected_tip| selected_tip.unwrap()).collect()) + } + + fn validate_proof_selected_tip( + &self, + proof_selected_tip: Hash, + level: BlockLevel, + proof_pp_level: BlockLevel, + proof_pp: Hash, + proof_pp_header: &Header, + ) -> PruningImportResult<()> { + // A proof selected tip of some level has to be the proof suggested prunint point itself if its level + // is lower or equal to the pruning point level, or a parent of the pruning point on the relevant level + // otherwise. + if level <= proof_pp_level { + if proof_selected_tip != proof_pp { + return Err(PruningImportError::PruningProofSelectedTipIsNotThePruningPoint(proof_selected_tip, level)); + } + } else if !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&proof_selected_tip) { + return Err(PruningImportError::PruningProofSelectedTipNotParentOfPruningPoint(proof_selected_tip, level)); + } + + Ok(()) + } + + // find_proof_and_consensus_common_chain_ancestor_ghostdag_data returns an option of a tuple + // that contains the ghostdag data of the proof and current consensus common ancestor. If no + // such ancestor exists, it returns None. + fn find_proof_and_consensus_common_ancestor_ghostdag_data( + &self, + proof_ghostdag_stores: &[Arc], + current_consensus_ghostdag_stores: &[Arc], + proof_selected_tip: Hash, + level: BlockLevel, + proof_selected_tip_gd: CompactGhostdagData, + ) -> Option<(CompactGhostdagData, CompactGhostdagData)> { + let mut proof_current = proof_selected_tip; + let mut proof_current_gd = proof_selected_tip_gd; + loop { + match current_consensus_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap_option() { + Some(current_gd) => { + break Some((proof_current_gd, current_gd)); + } + None => { + proof_current = proof_current_gd.selected_parent; + if proof_current.is_origin() { + break None; + } + proof_current_gd = proof_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap(); + } + }; + } + } +}