From b493962fbe6ea76bfd9bb921e87a16bbc33b3379 Mon Sep 17 00:00:00 2001
From: coderofstuff <114628839+coderofstuff@users.noreply.github.com>
Date: Sat, 2 Nov 2024 21:46:27 -0600
Subject: [PATCH 1/5] Cleanup manual block level calc

There were two areas in pruning proof mod that
manually calculated block level.

This replaces those with a call to calc_block_level
---
 consensus/src/processes/pruning_proof/mod.rs | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)

diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs
index e9690ec38d..63ad3b6f5e 100644
--- a/consensus/src/processes/pruning_proof/mod.rs
+++ b/consensus/src/processes/pruning_proof/mod.rs
@@ -241,10 +241,7 @@ impl PruningProofManager {
                 continue;
             }
 
-            let state = kaspa_pow::State::new(header);
-            let (_, pow) = state.check_pow(header.nonce);
-            let signed_block_level = self.max_block_level as i64 - pow.bits() as i64;
-            let block_level = max(signed_block_level, 0) as BlockLevel;
+            let block_level = calc_block_level(header, self.max_block_level);
             self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
         }
 
@@ -372,10 +369,7 @@ impl PruningProofManager {
         let mut up_heap = BinaryHeap::with_capacity(capacity_estimate);
         for header in proof.iter().flatten().cloned() {
             if let Vacant(e) = dag.entry(header.hash) {
-                let state = kaspa_pow::State::new(&header);
-                let (_, pow) = state.check_pow(header.nonce); // TODO: Check if pow passes
-                let signed_block_level = self.max_block_level as i64 - pow.bits() as i64;
-                let block_level = max(signed_block_level, 0) as BlockLevel;
+                let block_level = calc_block_level(&header, self.max_block_level);
                 self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
 
                 let mut parents = BlockHashSet::with_capacity(header.direct_parents().len() * 2);

From 2bbfde1e39903fcc7e98e1f2ffb452083b49fdde Mon Sep 17 00:00:00 2001
From: coderofstuff <114628839+coderofstuff@users.noreply.github.com>
Date: Sat, 2 Nov 2024 21:46:56 -0600
Subject: [PATCH 2/5] Refactor pruning proof build functions

---
 .../src/processes/pruning_proof/build.rs      | 532 ++++++++++++++++++
 consensus/src/processes/pruning_proof/mod.rs  | 509 +----------------
 2 files changed, 537 insertions(+), 504 deletions(-)
 create mode 100644 consensus/src/processes/pruning_proof/build.rs

diff --git a/consensus/src/processes/pruning_proof/build.rs b/consensus/src/processes/pruning_proof/build.rs
new file mode 100644
index 0000000000..8ae6fb34ca
--- /dev/null
+++ b/consensus/src/processes/pruning_proof/build.rs
@@ -0,0 +1,532 @@
+use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc};
+
+use itertools::Itertools;
+use kaspa_consensus_core::{
+    blockhash::{BlockHashExtensions, BlockHashes, ORIGIN},
+    header::Header,
+    pruning::PruningPointProof,
+    BlockHashSet, BlockLevel, HashMapCustomHasher,
+};
+use kaspa_core::debug;
+use kaspa_database::prelude::{CachePolicy, ConnBuilder, StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions, DB};
+use kaspa_hashes::Hash;
+
+use crate::{
+    model::{
+        services::reachability::ReachabilityService,
+        stores::{
+            ghostdag::{DbGhostdagStore, GhostdagStore, GhostdagStoreReader},
+            headers::{HeaderStoreReader, HeaderWithBlockLevel},
+            relations::RelationsStoreReader,
+        },
+    },
+    processes::{
+        ghostdag::{ordering::SortableBlock, protocol::GhostdagManager},
+        pruning_proof::PruningProofManagerInternalError,
+    },
+};
+
+use super::{PruningProofManager, PruningProofManagerInternalResult};
+
+#[derive(Clone)]
+struct RelationsStoreInFutureOfRoot<T: RelationsStoreReader, U: ReachabilityService> {
+    relations_store: T,
+    reachability_service: U,
+    root: Hash,
+}
+
+impl<T: RelationsStoreReader, U: ReachabilityService> RelationsStoreReader for RelationsStoreInFutureOfRoot<T, U> {
+    fn get_parents(&self, hash: Hash) -> Result<BlockHashes, kaspa_database::prelude::StoreError> {
+        self.relations_store.get_parents(hash).map(|hashes| {
+            Arc::new(hashes.iter().copied().filter(|h| self.reachability_service.is_dag_ancestor_of(self.root, *h)).collect_vec())
+        })
+    }
+
+    fn get_children(&self, hash: Hash) -> StoreResult<kaspa_database::prelude::ReadLock<BlockHashSet>> {
+        // We assume hash is in future of root
+        assert!(self.reachability_service.is_dag_ancestor_of(self.root, hash));
+        self.relations_store.get_children(hash)
+    }
+
+    fn has(&self, hash: Hash) -> Result<bool, StoreError> {
+        if self.reachability_service.is_dag_ancestor_of(self.root, hash) {
+            Ok(false)
+        } else {
+            self.relations_store.has(hash)
+        }
+    }
+
+    fn counts(&self) -> Result<(usize, usize), kaspa_database::prelude::StoreError> {
+        unimplemented!()
+    }
+}
+
+impl PruningProofManager {
+    pub(crate) fn build_pruning_point_proof(&self, pp: Hash) -> PruningPointProof {
+        if pp == self.genesis_hash {
+            return vec![];
+        }
+
+        let (_db_lifetime, temp_db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
+        let pp_header = self.headers_store.get_header_with_block_level(pp).unwrap();
+        let (ghostdag_stores, selected_tip_by_level, roots_by_level) = self.calc_gd_for_all_levels(&pp_header, temp_db);
+
+        (0..=self.max_block_level)
+            .map(|level| {
+                let level = level as usize;
+                let selected_tip = selected_tip_by_level[level];
+                let block_at_depth_2m = self
+                    .block_at_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m)
+                    .map_err(|err| format!("level: {}, err: {}", level, err))
+                    .unwrap();
+
+                // TODO (relaxed): remove the assertion below
+                // (New Logic) This is the root we calculated by going through block relations
+                let root = roots_by_level[level];
+                // (Old Logic) This is the root we can calculate given that the GD records are already filled
+                // The root calc logic below is the original logic before the on-demand higher level GD calculation
+                // We only need old_root to sanity check the new logic
+                let old_root = if level != self.max_block_level as usize {
+                    let block_at_depth_m_at_next_level = self
+                        .block_at_depth(&*ghostdag_stores[level + 1], selected_tip_by_level[level + 1], self.pruning_proof_m)
+                        .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err))
+                        .unwrap();
+                    if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) {
+                        block_at_depth_m_at_next_level
+                    } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) {
+                        block_at_depth_2m
+                    } else {
+                        self.find_common_ancestor_in_chain_of_a(
+                            &*ghostdag_stores[level],
+                            block_at_depth_m_at_next_level,
+                            block_at_depth_2m,
+                        )
+                        .map_err(|err| format!("level: {}, err: {}", level, err))
+                        .unwrap()
+                    }
+                } else {
+                    block_at_depth_2m
+                };
+
+                // new root is expected to be always an ancestor of old_root because new root takes a safety margin
+                assert!(self.reachability_service.is_dag_ancestor_of(root, old_root));
+
+                let mut headers = Vec::with_capacity(2 * self.pruning_proof_m as usize);
+                let mut queue = BinaryHeap::<Reverse<SortableBlock>>::new();
+                let mut visited = BlockHashSet::new();
+                queue.push(Reverse(SortableBlock::new(root, self.headers_store.get_header(root).unwrap().blue_work)));
+                while let Some(current) = queue.pop() {
+                    let current = current.0.hash;
+                    if !visited.insert(current) {
+                        continue;
+                    }
+
+                    // The second condition is always expected to be true (ghostdag store will have the entry)
+                    // because we are traversing the exact diamond (future(root) ⋂ past(tip)) for which we calculated
+                    // GD for (see fill_level_proof_ghostdag_data). TODO (relaxed): remove the condition or turn into assertion
+                    if !self.reachability_service.is_dag_ancestor_of(current, selected_tip)
+                        || !ghostdag_stores[level].has(current).is_ok_and(|found| found)
+                    {
+                        continue;
+                    }
+
+                    headers.push(self.headers_store.get_header(current).unwrap());
+                    for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() {
+                        queue.push(Reverse(SortableBlock::new(child, self.headers_store.get_header(child).unwrap().blue_work)));
+                    }
+                }
+
+                // TODO (relaxed): remove the assertion below
+                // Temp assertion for verifying a bug fix: assert that the full 2M chain is actually contained in the composed level proof
+                let set = BlockHashSet::from_iter(headers.iter().map(|h| h.hash));
+                let chain_2m = self
+                    .chain_up_to_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m)
+                    .map_err(|err| {
+                        dbg!(level, selected_tip, block_at_depth_2m, root);
+                        format!("Assert 2M chain -- level: {}, err: {}", level, err)
+                    })
+                    .unwrap();
+                let chain_2m_len = chain_2m.len();
+                for (i, chain_hash) in chain_2m.into_iter().enumerate() {
+                    if !set.contains(&chain_hash) {
+                        let next_level_tip = selected_tip_by_level[level + 1];
+                        let next_level_chain_m =
+                            self.chain_up_to_depth(&*ghostdag_stores[level + 1], next_level_tip, self.pruning_proof_m).unwrap();
+                        let next_level_block_m = next_level_chain_m.last().copied().unwrap();
+                        dbg!(next_level_chain_m.len());
+                        dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_tip).unwrap().blue_score);
+                        dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_block_m).unwrap().blue_score);
+                        dbg!(ghostdag_stores[level].get_compact_data(selected_tip).unwrap().blue_score);
+                        dbg!(ghostdag_stores[level].get_compact_data(block_at_depth_2m).unwrap().blue_score);
+                        dbg!(level, selected_tip, block_at_depth_2m, root);
+                        panic!("Assert 2M chain -- missing block {} at index {} out of {} chain blocks", chain_hash, i, chain_2m_len);
+                    }
+                }
+
+                headers
+            })
+            .collect_vec()
+    }
+
+    fn calc_gd_for_all_levels(
+        &self,
+        pp_header: &HeaderWithBlockLevel,
+        temp_db: Arc<DB>,
+    ) -> (Vec<Arc<DbGhostdagStore>>, Vec<Hash>, Vec<Hash>) {
+        let current_dag_level = self.find_current_dag_level(&pp_header.header);
+        let mut ghostdag_stores: Vec<Option<Arc<DbGhostdagStore>>> = vec![None; self.max_block_level as usize + 1];
+        let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
+        let mut root_by_level = vec![None; self.max_block_level as usize + 1];
+        for level in (0..=self.max_block_level).rev() {
+            let level_usize = level as usize;
+            let required_block = if level != self.max_block_level {
+                let next_level_store = ghostdag_stores[level_usize + 1].as_ref().unwrap().clone();
+                let block_at_depth_m_at_next_level = self
+                    .block_at_depth(&*next_level_store, selected_tip_by_level[level_usize + 1].unwrap(), self.pruning_proof_m)
+                    .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err))
+                    .unwrap();
+                Some(block_at_depth_m_at_next_level)
+            } else {
+                None
+            };
+            let (store, selected_tip, root) = self
+                .find_sufficient_root(pp_header, level, current_dag_level, required_block, temp_db.clone())
+                .unwrap_or_else(|_| panic!("find_sufficient_root failed for level {level}"));
+            ghostdag_stores[level_usize] = Some(store);
+            selected_tip_by_level[level_usize] = Some(selected_tip);
+            root_by_level[level_usize] = Some(root);
+        }
+
+        (
+            ghostdag_stores.into_iter().map(Option::unwrap).collect_vec(),
+            selected_tip_by_level.into_iter().map(Option::unwrap).collect_vec(),
+            root_by_level.into_iter().map(Option::unwrap).collect_vec(),
+        )
+    }
+
+    /// Find a sufficient root at a given level by going through the headers store and looking
+    /// for a deep enough level block
+    /// For each root candidate, fill in the ghostdag data to see if it actually is deep enough.
+    /// If the root is deep enough, it will satisfy these conditions
+    /// 1. block at depth 2m at this level ∈ Future(root)
+    /// 2. block at depth m at the next level ∈ Future(root)
+    ///
+    /// Returns: the filled ghostdag store from root to tip, the selected tip and the root
+    fn find_sufficient_root(
+        &self,
+        pp_header: &HeaderWithBlockLevel,
+        level: BlockLevel,
+        current_dag_level: BlockLevel,
+        required_block: Option<Hash>,
+        temp_db: Arc<DB>,
+    ) -> PruningProofManagerInternalResult<(Arc<DbGhostdagStore>, Hash, Hash)> {
+        // Step 1: Determine which selected tip to use
+        let selected_tip = if pp_header.block_level >= level {
+            pp_header.header.hash
+        } else {
+            self.find_selected_parent_header_at_level(&pp_header.header, level)?.hash
+        };
+
+        let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize);
+        let required_level_depth = 2 * self.pruning_proof_m;
+
+        // We only have the headers store (which has level 0 blue_scores) to assemble the proof data from.
+        // We need to look deeper at higher levels (2x deeper every level) to find 2M (plus margin) blocks at that level
+        let mut required_base_level_depth = self.estimated_blue_depth_at_level_0(
+            level,
+            required_level_depth + 100, // We take a safety margin
+            current_dag_level,
+        );
+
+        let mut is_last_level_header;
+        let mut tries = 0;
+
+        let block_at_depth_m_at_next_level = required_block.unwrap_or(selected_tip);
+
+        loop {
+            // Step 2 - Find a deep enough root candidate
+            let block_at_depth_2m = match self.level_block_at_base_depth(level, selected_tip, required_base_level_depth) {
+                Ok((header, is_last_header)) => {
+                    is_last_level_header = is_last_header;
+                    header
+                }
+                Err(e) => return Err(e),
+            };
+
+            let root = if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) {
+                block_at_depth_2m
+            } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) {
+                block_at_depth_m_at_next_level
+            } else {
+                // find common ancestor of block_at_depth_m_at_next_level and block_at_depth_2m in chain of block_at_depth_m_at_next_level
+                let mut common_ancestor = self.headers_store.get_header(block_at_depth_m_at_next_level).unwrap();
+
+                while !self.reachability_service.is_dag_ancestor_of(common_ancestor.hash, block_at_depth_2m) {
+                    common_ancestor = match self.find_selected_parent_header_at_level(&common_ancestor, level) {
+                        Ok(header) => header,
+                        // Try to give this last header a chance at being root
+                        Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => break,
+                        Err(e) => return Err(e),
+                    };
+                }
+
+                common_ancestor.hash
+            };
+
+            if level == 0 {
+                return Ok((self.ghostdag_store.clone(), selected_tip, root));
+            }
+
+            // Step 3 - Fill the ghostdag data from root to tip
+            let ghostdag_store = Arc::new(DbGhostdagStore::new_temp(temp_db.clone(), level, cache_policy, cache_policy, tries));
+            let has_required_block = self.fill_level_proof_ghostdag_data(
+                root,
+                pp_header.header.hash,
+                &ghostdag_store,
+                Some(block_at_depth_m_at_next_level),
+                level,
+            );
+
+            // Step 4 - Check if we actually have enough depth.
+            // Need to ensure this does the same 2M+1 depth that block_at_depth does
+            if has_required_block
+                && (root == self.genesis_hash || ghostdag_store.get_blue_score(selected_tip).unwrap() >= required_level_depth)
+            {
+                break Ok((ghostdag_store, selected_tip, root));
+            }
+
+            tries += 1;
+            if is_last_level_header {
+                if has_required_block {
+                    // Normally this scenario doesn't occur when syncing with nodes that already have the safety margin change in place.
+                    // However, when syncing with an older node version that doesn't have a safety margin for the proof, it's possible to
+                    // try to find 2500 depth worth of headers at a level, but the proof only contains about 2000 headers. To be able to sync
+                    // with such an older node. As long as we found the required block, we can still proceed.
+                    debug!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned. Required block found so trying anyway.");
+                    break Ok((ghostdag_store, selected_tip, root));
+                } else {
+                    panic!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned");
+                }
+            }
+
+            // If we don't have enough depth now, we need to look deeper
+            required_base_level_depth = (required_base_level_depth as f64 * 1.1) as u64;
+            debug!("Failed to find sufficient root for level {level} after {tries} tries. Retrying again to find with depth {required_base_level_depth}");
+        }
+    }
+
+    /// BFS forward iterates from root until selected tip, ignoring blocks in the antipast of selected_tip.
+    /// For each block along the way, insert that hash into the ghostdag_store
+    /// If we have a required_block to find, this will return true if that block was found along the way
+    fn fill_level_proof_ghostdag_data(
+        &self,
+        root: Hash,
+        selected_tip: Hash,
+        ghostdag_store: &Arc<DbGhostdagStore>,
+        required_block: Option<Hash>,
+        level: BlockLevel,
+    ) -> bool {
+        let relations_service = RelationsStoreInFutureOfRoot {
+            relations_store: self.level_relations_services[level as usize].clone(),
+            reachability_service: self.reachability_service.clone(),
+            root,
+        };
+        let gd_manager = GhostdagManager::new(
+            root,
+            self.ghostdag_k,
+            ghostdag_store.clone(),
+            relations_service.clone(),
+            self.headers_store.clone(),
+            self.reachability_service.clone(),
+            level != 0,
+        );
+
+        ghostdag_store.insert(root, Arc::new(gd_manager.genesis_ghostdag_data())).unwrap();
+        ghostdag_store.insert(ORIGIN, gd_manager.origin_ghostdag_data()).unwrap();
+
+        let mut topological_heap: BinaryHeap<_> = Default::default();
+        let mut visited = BlockHashSet::new();
+        for child in relations_service.get_children(root).unwrap().read().iter().copied() {
+            topological_heap.push(Reverse(SortableBlock {
+                hash: child,
+                // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology
+                blue_work: self.headers_store.get_header(child).unwrap().blue_work,
+            }));
+        }
+
+        let mut has_required_block = required_block.is_some_and(|required_block| root == required_block);
+        loop {
+            let Some(current) = topological_heap.pop() else {
+                break;
+            };
+            let current_hash = current.0.hash;
+            if !visited.insert(current_hash) {
+                continue;
+            }
+
+            if !self.reachability_service.is_dag_ancestor_of(current_hash, selected_tip) {
+                // We don't care about blocks in the antipast of the selected tip
+                continue;
+            }
+
+            if !has_required_block && required_block.is_some_and(|required_block| current_hash == required_block) {
+                has_required_block = true;
+            }
+
+            let current_gd = gd_manager.ghostdag(&relations_service.get_parents(current_hash).unwrap());
+
+            ghostdag_store.insert(current_hash, Arc::new(current_gd)).unwrap_or_exists();
+
+            for child in relations_service.get_children(current_hash).unwrap().read().iter().copied() {
+                topological_heap.push(Reverse(SortableBlock {
+                    hash: child,
+                    // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology
+                    blue_work: self.headers_store.get_header(child).unwrap().blue_work,
+                }));
+            }
+        }
+
+        has_required_block
+    }
+
+    // The "current dag level" is the level right before the level whose parents are
+    // not the same as our header's direct parents
+    //
+    // Find the current DAG level by going through all the parents at each level,
+    // starting from the bottom level and see which is the first level that has
+    // parents that are NOT our current pp_header's direct parents.
+    fn find_current_dag_level(&self, pp_header: &Header) -> BlockLevel {
+        let direct_parents = BlockHashSet::from_iter(pp_header.direct_parents().iter().copied());
+        pp_header
+            .parents_by_level
+            .iter()
+            .enumerate()
+            .skip(1)
+            .find_map(|(level, parents)| {
+                if BlockHashSet::from_iter(parents.iter().copied()) == direct_parents {
+                    None
+                } else {
+                    Some((level - 1) as BlockLevel)
+                }
+            })
+            .unwrap_or(self.max_block_level)
+    }
+
+    fn estimated_blue_depth_at_level_0(&self, level: BlockLevel, level_depth: u64, current_dag_level: BlockLevel) -> u64 {
+        level_depth.checked_shl(level.saturating_sub(current_dag_level) as u32).unwrap_or(level_depth)
+    }
+
+    /// selected parent at level = the parent of the header at the level
+    /// with the highest blue_work
+    fn find_selected_parent_header_at_level(
+        &self,
+        header: &Header,
+        level: BlockLevel,
+    ) -> PruningProofManagerInternalResult<Arc<Header>> {
+        // Parents manager parents_at_level may return parents that aren't in relations_service, so it's important
+        // to filter to include only parents that are in relations_service.
+        let sp = self
+            .parents_manager
+            .parents_at_level(header, level)
+            .iter()
+            .copied()
+            .filter(|p| self.level_relations_services[level as usize].has(*p).unwrap())
+            .filter_map(|p| self.headers_store.get_header(p).unwrap_option().map(|h| SortableBlock::new(p, h.blue_work)))
+            .max()
+            .ok_or(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof("no parents with header".to_string()))?;
+        Ok(self.headers_store.get_header(sp.hash).expect("unwrapped above"))
+    }
+
+    /// Finds the block on a given level that is at base_depth deep from it.
+    /// Also returns if the block was the last one in the level
+    /// base_depth = the blue score depth at level 0
+    fn level_block_at_base_depth(
+        &self,
+        level: BlockLevel,
+        high: Hash,
+        base_depth: u64,
+    ) -> PruningProofManagerInternalResult<(Hash, bool)> {
+        let high_header = self
+            .headers_store
+            .get_header(high)
+            .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {base_depth}, {err}")))?;
+        let high_header_score = high_header.blue_score;
+        let mut current_header = high_header;
+
+        let mut is_last_header = false;
+
+        while current_header.blue_score + base_depth >= high_header_score {
+            if current_header.direct_parents().is_empty() {
+                break;
+            }
+
+            current_header = match self.find_selected_parent_header_at_level(&current_header, level) {
+                Ok(header) => header,
+                Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => {
+                    // We want to give this root a shot if all its past is pruned
+                    is_last_header = true;
+                    break;
+                }
+                Err(e) => return Err(e),
+            };
+        }
+        Ok((current_header.hash, is_last_header))
+    }
+
+    /// Copy of `block_at_depth` which returns the full chain up to depth. Temporarily used for assertion purposes.
+    fn chain_up_to_depth(
+        &self,
+        ghostdag_store: &impl GhostdagStoreReader,
+        high: Hash,
+        depth: u64,
+    ) -> Result<Vec<Hash>, PruningProofManagerInternalError> {
+        let high_gd = ghostdag_store
+            .get_compact_data(high)
+            .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?;
+        let mut current_gd = high_gd;
+        let mut current = high;
+        let mut res = vec![current];
+        while current_gd.blue_score + depth >= high_gd.blue_score {
+            if current_gd.selected_parent.is_origin() {
+                break;
+            }
+            let prev = current;
+            current = current_gd.selected_parent;
+            res.push(current);
+            current_gd = ghostdag_store.get_compact_data(current).map_err(|err| {
+                PruningProofManagerInternalError::BlockAtDepth(format!(
+                    "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}",
+                    high, depth, prev, high_gd.blue_score, current_gd.blue_score, err
+                ))
+            })?;
+        }
+        Ok(res)
+    }
+
+    fn find_common_ancestor_in_chain_of_a(
+        &self,
+        ghostdag_store: &impl GhostdagStoreReader,
+        a: Hash,
+        b: Hash,
+    ) -> Result<Hash, PruningProofManagerInternalError> {
+        let a_gd = ghostdag_store
+            .get_compact_data(a)
+            .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?;
+        let mut current_gd = a_gd;
+        let mut current;
+        let mut loop_counter = 0;
+        loop {
+            current = current_gd.selected_parent;
+            loop_counter += 1;
+            if current.is_origin() {
+                break Err(PruningProofManagerInternalError::NoCommonAncestor(format!("a: {a}, b: {b} ({loop_counter} loop steps)")));
+            }
+            if self.reachability_service.is_dag_ancestor_of(current, b) {
+                break Ok(current);
+            }
+            current_gd = ghostdag_store
+                .get_compact_data(current)
+                .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?;
+        }
+    }
+}
diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs
index 63ad3b6f5e..f151b28508 100644
--- a/consensus/src/processes/pruning_proof/mod.rs
+++ b/consensus/src/processes/pruning_proof/mod.rs
@@ -1,5 +1,7 @@
+mod build;
+
 use std::{
-    cmp::{max, Reverse},
+    cmp::Reverse,
     collections::{
         hash_map::Entry::{self, Vacant},
         BinaryHeap, HashSet, VecDeque,
@@ -29,7 +31,7 @@ use kaspa_consensus_core::{
 };
 use kaspa_core::{debug, info, trace};
 use kaspa_database::{
-    prelude::{CachePolicy, ConnBuilder, StoreError, StoreResult, StoreResultEmptyTuple, StoreResultExtensions},
+    prelude::{CachePolicy, ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions},
     utils::DbLifetime,
 };
 use kaspa_hashes::Hash;
@@ -50,7 +52,7 @@ use crate::{
         stores::{
             depth::DbDepthStore,
             ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStore, GhostdagStoreReader},
-            headers::{DbHeadersStore, HeaderStore, HeaderStoreReader, HeaderWithBlockLevel},
+            headers::{DbHeadersStore, HeaderStore, HeaderStoreReader},
             headers_selected_tip::DbHeadersSelectedTipStore,
             past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStore},
             pruning::{DbPruningStore, PruningStoreReader},
@@ -110,39 +112,6 @@ struct TempProofContext {
     db_lifetime: DbLifetime,
 }
 
-#[derive(Clone)]
-struct RelationsStoreInFutureOfRoot<T: RelationsStoreReader, U: ReachabilityService> {
-    relations_store: T,
-    reachability_service: U,
-    root: Hash,
-}
-
-impl<T: RelationsStoreReader, U: ReachabilityService> RelationsStoreReader for RelationsStoreInFutureOfRoot<T, U> {
-    fn get_parents(&self, hash: Hash) -> Result<BlockHashes, kaspa_database::prelude::StoreError> {
-        self.relations_store.get_parents(hash).map(|hashes| {
-            Arc::new(hashes.iter().copied().filter(|h| self.reachability_service.is_dag_ancestor_of(self.root, *h)).collect_vec())
-        })
-    }
-
-    fn get_children(&self, hash: Hash) -> StoreResult<kaspa_database::prelude::ReadLock<BlockHashSet>> {
-        // We assume hash is in future of root
-        assert!(self.reachability_service.is_dag_ancestor_of(self.root, hash));
-        self.relations_store.get_children(hash)
-    }
-
-    fn has(&self, hash: Hash) -> Result<bool, StoreError> {
-        if self.reachability_service.is_dag_ancestor_of(self.root, hash) {
-            Ok(false)
-        } else {
-            self.relations_store.has(hash)
-        }
-    }
-
-    fn counts(&self) -> Result<(usize, usize), kaspa_database::prelude::StoreError> {
-        unimplemented!()
-    }
-}
-
 pub struct PruningProofManager {
     db: Arc<DB>,
 
@@ -791,411 +760,6 @@ impl PruningProofManager {
         Err(PruningImportError::PruningProofNotEnoughHeaders)
     }
 
-    // The "current dag level" is the level right before the level whose parents are
-    // not the same as our header's direct parents
-    //
-    // Find the current DAG level by going through all the parents at each level,
-    // starting from the bottom level and see which is the first level that has
-    // parents that are NOT our current pp_header's direct parents.
-    fn find_current_dag_level(&self, pp_header: &Header) -> BlockLevel {
-        let direct_parents = BlockHashSet::from_iter(pp_header.direct_parents().iter().copied());
-        pp_header
-            .parents_by_level
-            .iter()
-            .enumerate()
-            .skip(1)
-            .find_map(|(level, parents)| {
-                if BlockHashSet::from_iter(parents.iter().copied()) == direct_parents {
-                    None
-                } else {
-                    Some((level - 1) as BlockLevel)
-                }
-            })
-            .unwrap_or(self.max_block_level)
-    }
-
-    fn estimated_blue_depth_at_level_0(&self, level: BlockLevel, level_depth: u64, current_dag_level: BlockLevel) -> u64 {
-        level_depth.checked_shl(level.saturating_sub(current_dag_level) as u32).unwrap_or(level_depth)
-    }
-
-    /// selected parent at level = the parent of the header at the level
-    /// with the highest blue_work
-    fn find_selected_parent_header_at_level(
-        &self,
-        header: &Header,
-        level: BlockLevel,
-    ) -> PruningProofManagerInternalResult<Arc<Header>> {
-        // Parents manager parents_at_level may return parents that aren't in relations_service, so it's important
-        // to filter to include only parents that are in relations_service.
-        let sp = self
-            .parents_manager
-            .parents_at_level(header, level)
-            .iter()
-            .copied()
-            .filter(|p| self.level_relations_services[level as usize].has(*p).unwrap())
-            .filter_map(|p| self.headers_store.get_header(p).unwrap_option().map(|h| SortableBlock::new(p, h.blue_work)))
-            .max()
-            .ok_or(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof("no parents with header".to_string()))?;
-        Ok(self.headers_store.get_header(sp.hash).expect("unwrapped above"))
-    }
-
-    /// Find a sufficient root at a given level by going through the headers store and looking
-    /// for a deep enough level block
-    /// For each root candidate, fill in the ghostdag data to see if it actually is deep enough.
-    /// If the root is deep enough, it will satisfy these conditions
-    /// 1. block at depth 2m at this level ∈ Future(root)
-    /// 2. block at depth m at the next level ∈ Future(root)
-    ///
-    /// Returns: the filled ghostdag store from root to tip, the selected tip and the root
-    fn find_sufficient_root(
-        &self,
-        pp_header: &HeaderWithBlockLevel,
-        level: BlockLevel,
-        current_dag_level: BlockLevel,
-        required_block: Option<Hash>,
-        temp_db: Arc<DB>,
-    ) -> PruningProofManagerInternalResult<(Arc<DbGhostdagStore>, Hash, Hash)> {
-        // Step 1: Determine which selected tip to use
-        let selected_tip = if pp_header.block_level >= level {
-            pp_header.header.hash
-        } else {
-            self.find_selected_parent_header_at_level(&pp_header.header, level)?.hash
-        };
-
-        let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize);
-        let required_level_depth = 2 * self.pruning_proof_m;
-
-        // We only have the headers store (which has level 0 blue_scores) to assemble the proof data from.
-        // We need to look deeper at higher levels (2x deeper every level) to find 2M (plus margin) blocks at that level
-        let mut required_base_level_depth = self.estimated_blue_depth_at_level_0(
-            level,
-            required_level_depth + 100, // We take a safety margin
-            current_dag_level,
-        );
-
-        let mut is_last_level_header;
-        let mut tries = 0;
-
-        let block_at_depth_m_at_next_level = required_block.unwrap_or(selected_tip);
-
-        loop {
-            // Step 2 - Find a deep enough root candidate
-            let block_at_depth_2m = match self.level_block_at_base_depth(level, selected_tip, required_base_level_depth) {
-                Ok((header, is_last_header)) => {
-                    is_last_level_header = is_last_header;
-                    header
-                }
-                Err(e) => return Err(e),
-            };
-
-            let root = if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) {
-                block_at_depth_2m
-            } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) {
-                block_at_depth_m_at_next_level
-            } else {
-                // find common ancestor of block_at_depth_m_at_next_level and block_at_depth_2m in chain of block_at_depth_m_at_next_level
-                let mut common_ancestor = self.headers_store.get_header(block_at_depth_m_at_next_level).unwrap();
-
-                while !self.reachability_service.is_dag_ancestor_of(common_ancestor.hash, block_at_depth_2m) {
-                    common_ancestor = match self.find_selected_parent_header_at_level(&common_ancestor, level) {
-                        Ok(header) => header,
-                        // Try to give this last header a chance at being root
-                        Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => break,
-                        Err(e) => return Err(e),
-                    };
-                }
-
-                common_ancestor.hash
-            };
-
-            if level == 0 {
-                return Ok((self.ghostdag_store.clone(), selected_tip, root));
-            }
-
-            // Step 3 - Fill the ghostdag data from root to tip
-            let ghostdag_store = Arc::new(DbGhostdagStore::new_temp(temp_db.clone(), level, cache_policy, cache_policy, tries));
-            let has_required_block = self.fill_level_proof_ghostdag_data(
-                root,
-                pp_header.header.hash,
-                &ghostdag_store,
-                Some(block_at_depth_m_at_next_level),
-                level,
-            );
-
-            // Step 4 - Check if we actually have enough depth.
-            // Need to ensure this does the same 2M+1 depth that block_at_depth does
-            if has_required_block
-                && (root == self.genesis_hash || ghostdag_store.get_blue_score(selected_tip).unwrap() >= required_level_depth)
-            {
-                break Ok((ghostdag_store, selected_tip, root));
-            }
-
-            tries += 1;
-            if is_last_level_header {
-                if has_required_block {
-                    // Normally this scenario doesn't occur when syncing with nodes that already have the safety margin change in place.
-                    // However, when syncing with an older node version that doesn't have a safety margin for the proof, it's possible to
-                    // try to find 2500 depth worth of headers at a level, but the proof only contains about 2000 headers. To be able to sync
-                    // with such an older node. As long as we found the required block, we can still proceed.
-                    debug!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned. Required block found so trying anyway.");
-                    break Ok((ghostdag_store, selected_tip, root));
-                } else {
-                    panic!("Failed to find sufficient root for level {level} after {tries} tries. Headers below the current depth of {required_base_level_depth} are already pruned");
-                }
-            }
-
-            // If we don't have enough depth now, we need to look deeper
-            required_base_level_depth = (required_base_level_depth as f64 * 1.1) as u64;
-            debug!("Failed to find sufficient root for level {level} after {tries} tries. Retrying again to find with depth {required_base_level_depth}");
-        }
-    }
-
-    fn calc_gd_for_all_levels(
-        &self,
-        pp_header: &HeaderWithBlockLevel,
-        temp_db: Arc<DB>,
-    ) -> (Vec<Arc<DbGhostdagStore>>, Vec<Hash>, Vec<Hash>) {
-        let current_dag_level = self.find_current_dag_level(&pp_header.header);
-        let mut ghostdag_stores: Vec<Option<Arc<DbGhostdagStore>>> = vec![None; self.max_block_level as usize + 1];
-        let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
-        let mut root_by_level = vec![None; self.max_block_level as usize + 1];
-        for level in (0..=self.max_block_level).rev() {
-            let level_usize = level as usize;
-            let required_block = if level != self.max_block_level {
-                let next_level_store = ghostdag_stores[level_usize + 1].as_ref().unwrap().clone();
-                let block_at_depth_m_at_next_level = self
-                    .block_at_depth(&*next_level_store, selected_tip_by_level[level_usize + 1].unwrap(), self.pruning_proof_m)
-                    .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err))
-                    .unwrap();
-                Some(block_at_depth_m_at_next_level)
-            } else {
-                None
-            };
-            let (store, selected_tip, root) = self
-                .find_sufficient_root(pp_header, level, current_dag_level, required_block, temp_db.clone())
-                .unwrap_or_else(|_| panic!("find_sufficient_root failed for level {level}"));
-            ghostdag_stores[level_usize] = Some(store);
-            selected_tip_by_level[level_usize] = Some(selected_tip);
-            root_by_level[level_usize] = Some(root);
-        }
-
-        (
-            ghostdag_stores.into_iter().map(Option::unwrap).collect_vec(),
-            selected_tip_by_level.into_iter().map(Option::unwrap).collect_vec(),
-            root_by_level.into_iter().map(Option::unwrap).collect_vec(),
-        )
-    }
-
-    pub(crate) fn build_pruning_point_proof(&self, pp: Hash) -> PruningPointProof {
-        if pp == self.genesis_hash {
-            return vec![];
-        }
-
-        let (_db_lifetime, temp_db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
-        let pp_header = self.headers_store.get_header_with_block_level(pp).unwrap();
-        let (ghostdag_stores, selected_tip_by_level, roots_by_level) = self.calc_gd_for_all_levels(&pp_header, temp_db);
-
-        (0..=self.max_block_level)
-            .map(|level| {
-                let level = level as usize;
-                let selected_tip = selected_tip_by_level[level];
-                let block_at_depth_2m = self
-                    .block_at_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m)
-                    .map_err(|err| format!("level: {}, err: {}", level, err))
-                    .unwrap();
-
-                // TODO (relaxed): remove the assertion below
-                // (New Logic) This is the root we calculated by going through block relations
-                let root = roots_by_level[level];
-                // (Old Logic) This is the root we can calculate given that the GD records are already filled
-                // The root calc logic below is the original logic before the on-demand higher level GD calculation
-                // We only need old_root to sanity check the new logic
-                let old_root = if level != self.max_block_level as usize {
-                    let block_at_depth_m_at_next_level = self
-                        .block_at_depth(&*ghostdag_stores[level + 1], selected_tip_by_level[level + 1], self.pruning_proof_m)
-                        .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err))
-                        .unwrap();
-                    if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) {
-                        block_at_depth_m_at_next_level
-                    } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) {
-                        block_at_depth_2m
-                    } else {
-                        self.find_common_ancestor_in_chain_of_a(
-                            &*ghostdag_stores[level],
-                            block_at_depth_m_at_next_level,
-                            block_at_depth_2m,
-                        )
-                        .map_err(|err| format!("level: {}, err: {}", level, err))
-                        .unwrap()
-                    }
-                } else {
-                    block_at_depth_2m
-                };
-
-                // new root is expected to be always an ancestor of old_root because new root takes a safety margin
-                assert!(self.reachability_service.is_dag_ancestor_of(root, old_root));
-
-                let mut headers = Vec::with_capacity(2 * self.pruning_proof_m as usize);
-                let mut queue = BinaryHeap::<Reverse<SortableBlock>>::new();
-                let mut visited = BlockHashSet::new();
-                queue.push(Reverse(SortableBlock::new(root, self.headers_store.get_header(root).unwrap().blue_work)));
-                while let Some(current) = queue.pop() {
-                    let current = current.0.hash;
-                    if !visited.insert(current) {
-                        continue;
-                    }
-
-                    // The second condition is always expected to be true (ghostdag store will have the entry)
-                    // because we are traversing the exact diamond (future(root) ⋂ past(tip)) for which we calculated
-                    // GD for (see fill_level_proof_ghostdag_data). TODO (relaxed): remove the condition or turn into assertion
-                    if !self.reachability_service.is_dag_ancestor_of(current, selected_tip)
-                        || !ghostdag_stores[level].has(current).is_ok_and(|found| found)
-                    {
-                        continue;
-                    }
-
-                    headers.push(self.headers_store.get_header(current).unwrap());
-                    for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() {
-                        queue.push(Reverse(SortableBlock::new(child, self.headers_store.get_header(child).unwrap().blue_work)));
-                    }
-                }
-
-                // TODO (relaxed): remove the assertion below
-                // Temp assertion for verifying a bug fix: assert that the full 2M chain is actually contained in the composed level proof
-                let set = BlockHashSet::from_iter(headers.iter().map(|h| h.hash));
-                let chain_2m = self
-                    .chain_up_to_depth(&*ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m)
-                    .map_err(|err| {
-                        dbg!(level, selected_tip, block_at_depth_2m, root);
-                        format!("Assert 2M chain -- level: {}, err: {}", level, err)
-                    })
-                    .unwrap();
-                let chain_2m_len = chain_2m.len();
-                for (i, chain_hash) in chain_2m.into_iter().enumerate() {
-                    if !set.contains(&chain_hash) {
-                        let next_level_tip = selected_tip_by_level[level + 1];
-                        let next_level_chain_m =
-                            self.chain_up_to_depth(&*ghostdag_stores[level + 1], next_level_tip, self.pruning_proof_m).unwrap();
-                        let next_level_block_m = next_level_chain_m.last().copied().unwrap();
-                        dbg!(next_level_chain_m.len());
-                        dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_tip).unwrap().blue_score);
-                        dbg!(ghostdag_stores[level + 1].get_compact_data(next_level_block_m).unwrap().blue_score);
-                        dbg!(ghostdag_stores[level].get_compact_data(selected_tip).unwrap().blue_score);
-                        dbg!(ghostdag_stores[level].get_compact_data(block_at_depth_2m).unwrap().blue_score);
-                        dbg!(level, selected_tip, block_at_depth_2m, root);
-                        panic!("Assert 2M chain -- missing block {} at index {} out of {} chain blocks", chain_hash, i, chain_2m_len);
-                    }
-                }
-
-                headers
-            })
-            .collect_vec()
-    }
-
-    /// BFS forward iterates from root until selected tip, ignoring blocks in the antipast of selected_tip.
-    /// For each block along the way, insert that hash into the ghostdag_store
-    /// If we have a required_block to find, this will return true if that block was found along the way
-    fn fill_level_proof_ghostdag_data(
-        &self,
-        root: Hash,
-        selected_tip: Hash,
-        ghostdag_store: &Arc<DbGhostdagStore>,
-        required_block: Option<Hash>,
-        level: BlockLevel,
-    ) -> bool {
-        let relations_service = RelationsStoreInFutureOfRoot {
-            relations_store: self.level_relations_services[level as usize].clone(),
-            reachability_service: self.reachability_service.clone(),
-            root,
-        };
-        let gd_manager = GhostdagManager::new(
-            root,
-            self.ghostdag_k,
-            ghostdag_store.clone(),
-            relations_service.clone(),
-            self.headers_store.clone(),
-            self.reachability_service.clone(),
-            level != 0,
-        );
-
-        ghostdag_store.insert(root, Arc::new(gd_manager.genesis_ghostdag_data())).unwrap();
-        ghostdag_store.insert(ORIGIN, gd_manager.origin_ghostdag_data()).unwrap();
-
-        let mut topological_heap: BinaryHeap<_> = Default::default();
-        let mut visited = BlockHashSet::new();
-        for child in relations_service.get_children(root).unwrap().read().iter().copied() {
-            topological_heap.push(Reverse(SortableBlock {
-                hash: child,
-                // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology
-                blue_work: self.headers_store.get_header(child).unwrap().blue_work,
-            }));
-        }
-
-        let mut has_required_block = required_block.is_some_and(|required_block| root == required_block);
-        loop {
-            let Some(current) = topological_heap.pop() else {
-                break;
-            };
-            let current_hash = current.0.hash;
-            if !visited.insert(current_hash) {
-                continue;
-            }
-
-            if !self.reachability_service.is_dag_ancestor_of(current_hash, selected_tip) {
-                // We don't care about blocks in the antipast of the selected tip
-                continue;
-            }
-
-            if !has_required_block && required_block.is_some_and(|required_block| current_hash == required_block) {
-                has_required_block = true;
-            }
-
-            let current_gd = gd_manager.ghostdag(&relations_service.get_parents(current_hash).unwrap());
-
-            ghostdag_store.insert(current_hash, Arc::new(current_gd)).unwrap_or_exists();
-
-            for child in relations_service.get_children(current_hash).unwrap().read().iter().copied() {
-                topological_heap.push(Reverse(SortableBlock {
-                    hash: child,
-                    // It's important to use here blue work and not score so we can iterate the heap in a way that respects the topology
-                    blue_work: self.headers_store.get_header(child).unwrap().blue_work,
-                }));
-            }
-        }
-
-        has_required_block
-    }
-
-    /// Copy of `block_at_depth` which returns the full chain up to depth. Temporarily used for assertion purposes.
-    fn chain_up_to_depth(
-        &self,
-        ghostdag_store: &impl GhostdagStoreReader,
-        high: Hash,
-        depth: u64,
-    ) -> Result<Vec<Hash>, PruningProofManagerInternalError> {
-        let high_gd = ghostdag_store
-            .get_compact_data(high)
-            .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?;
-        let mut current_gd = high_gd;
-        let mut current = high;
-        let mut res = vec![current];
-        while current_gd.blue_score + depth >= high_gd.blue_score {
-            if current_gd.selected_parent.is_origin() {
-                break;
-            }
-            let prev = current;
-            current = current_gd.selected_parent;
-            res.push(current);
-            current_gd = ghostdag_store.get_compact_data(current).map_err(|err| {
-                PruningProofManagerInternalError::BlockAtDepth(format!(
-                    "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}",
-                    high, depth, prev, high_gd.blue_score, current_gd.blue_score, err
-                ))
-            })?;
-        }
-        Ok(res)
-    }
-
     fn block_at_depth(
         &self,
         ghostdag_store: &impl GhostdagStoreReader,
@@ -1223,69 +787,6 @@ impl PruningProofManager {
         Ok(current)
     }
 
-    /// Finds the block on a given level that is at base_depth deep from it.
-    /// Also returns if the block was the last one in the level
-    /// base_depth = the blue score depth at level 0
-    fn level_block_at_base_depth(
-        &self,
-        level: BlockLevel,
-        high: Hash,
-        base_depth: u64,
-    ) -> PruningProofManagerInternalResult<(Hash, bool)> {
-        let high_header = self
-            .headers_store
-            .get_header(high)
-            .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {base_depth}, {err}")))?;
-        let high_header_score = high_header.blue_score;
-        let mut current_header = high_header;
-
-        let mut is_last_header = false;
-
-        while current_header.blue_score + base_depth >= high_header_score {
-            if current_header.direct_parents().is_empty() {
-                break;
-            }
-
-            current_header = match self.find_selected_parent_header_at_level(&current_header, level) {
-                Ok(header) => header,
-                Err(PruningProofManagerInternalError::NotEnoughHeadersToBuildProof(_)) => {
-                    // We want to give this root a shot if all its past is pruned
-                    is_last_header = true;
-                    break;
-                }
-                Err(e) => return Err(e),
-            };
-        }
-        Ok((current_header.hash, is_last_header))
-    }
-
-    fn find_common_ancestor_in_chain_of_a(
-        &self,
-        ghostdag_store: &impl GhostdagStoreReader,
-        a: Hash,
-        b: Hash,
-    ) -> Result<Hash, PruningProofManagerInternalError> {
-        let a_gd = ghostdag_store
-            .get_compact_data(a)
-            .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?;
-        let mut current_gd = a_gd;
-        let mut current;
-        let mut loop_counter = 0;
-        loop {
-            current = current_gd.selected_parent;
-            loop_counter += 1;
-            if current.is_origin() {
-                break Err(PruningProofManagerInternalError::NoCommonAncestor(format!("a: {a}, b: {b} ({loop_counter} loop steps)")));
-            }
-            if self.reachability_service.is_dag_ancestor_of(current, b) {
-                break Ok(current);
-            }
-            current_gd = ghostdag_store
-                .get_compact_data(current)
-                .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?;
-        }
-    }
-
     /// Returns the k + 1 chain blocks below this hash (inclusive). If data is missing
     /// the search is halted and a partial chain is returned.
     ///

From be1190b33c0a8bfe4077393ac248a69acaf64405 Mon Sep 17 00:00:00 2001
From: coderofstuff <114628839+coderofstuff@users.noreply.github.com>
Date: Tue, 5 Nov 2024 18:21:46 -0700
Subject: [PATCH 3/5] Refactor apply pruning proof functions

---
 .../src/processes/pruning_proof/apply.rs      | 235 ++++++++++++++++++
 consensus/src/processes/pruning_proof/mod.rs  | 229 ++---------------
 2 files changed, 249 insertions(+), 215 deletions(-)
 create mode 100644 consensus/src/processes/pruning_proof/apply.rs

diff --git a/consensus/src/processes/pruning_proof/apply.rs b/consensus/src/processes/pruning_proof/apply.rs
new file mode 100644
index 0000000000..f3d56d797e
--- /dev/null
+++ b/consensus/src/processes/pruning_proof/apply.rs
@@ -0,0 +1,235 @@
+use std::{
+    cmp::Reverse,
+    collections::{hash_map::Entry::Vacant, BinaryHeap, HashSet},
+    sync::Arc,
+};
+
+use itertools::Itertools;
+use kaspa_consensus_core::{
+    blockhash::{BlockHashes, ORIGIN},
+    errors::pruning::{PruningImportError, PruningImportResult},
+    header::Header,
+    pruning::PruningPointProof,
+    trusted::TrustedBlock,
+    BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher,
+};
+use kaspa_core::{debug, trace};
+use kaspa_hashes::Hash;
+use kaspa_pow::calc_block_level;
+use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};
+use rocksdb::WriteBatch;
+
+use crate::{
+    model::{
+        services::reachability::ReachabilityService,
+        stores::{
+            ghostdag::{GhostdagData, GhostdagStore},
+            headers::HeaderStore,
+            reachability::StagingReachabilityStore,
+            relations::StagingRelationsStore,
+            selected_chain::SelectedChainStore,
+            virtual_state::{VirtualState, VirtualStateStore},
+        },
+    },
+    processes::{
+        ghostdag::{mergeset::unordered_mergeset_without_selected_parent, ordering::SortableBlock},
+        reachability::inquirer as reachability,
+        relations::RelationsStoreExtensions,
+    },
+};
+
+use super::PruningProofManager;
+
+impl PruningProofManager {
+    pub fn apply_proof(&self, mut proof: PruningPointProof, trusted_set: &[TrustedBlock]) -> PruningImportResult<()> {
+        let pruning_point_header = proof[0].last().unwrap().clone();
+        let pruning_point = pruning_point_header.hash;
+
+        // Create a copy of the proof, since we're going to be mutating the proof passed to us
+        let proof_sets = (0..=self.max_block_level)
+            .map(|level| BlockHashSet::from_iter(proof[level as usize].iter().map(|header| header.hash)))
+            .collect_vec();
+
+        let mut trusted_gd_map: BlockHashMap<GhostdagData> = BlockHashMap::new();
+        for tb in trusted_set.iter() {
+            trusted_gd_map.insert(tb.block.hash(), tb.ghostdag.clone().into());
+            let tb_block_level = calc_block_level(&tb.block.header, self.max_block_level);
+
+            (0..=tb_block_level).for_each(|current_proof_level| {
+                // If this block was in the original proof, ignore it
+                if proof_sets[current_proof_level as usize].contains(&tb.block.hash()) {
+                    return;
+                }
+
+                proof[current_proof_level as usize].push(tb.block.header.clone());
+            });
+        }
+
+        proof.iter_mut().for_each(|level_proof| {
+            level_proof.sort_by(|a, b| a.blue_work.cmp(&b.blue_work));
+        });
+
+        self.populate_reachability_and_headers(&proof);
+
+        {
+            let reachability_read = self.reachability_store.read();
+            for tb in trusted_set.iter() {
+                // Header-only trusted blocks are expected to be in pruning point past
+                if tb.block.is_header_only() && !reachability_read.is_dag_ancestor_of(tb.block.hash(), pruning_point) {
+                    return Err(PruningImportError::PruningPointPastMissingReachability(tb.block.hash()));
+                }
+            }
+        }
+
+        for (level, headers) in proof.iter().enumerate() {
+            trace!("Applying level {} from the pruning point proof", level);
+            let mut level_ancestors: HashSet<Hash> = HashSet::new();
+            level_ancestors.insert(ORIGIN);
+
+            for header in headers.iter() {
+                let parents = Arc::new(
+                    self.parents_manager
+                        .parents_at_level(header, level as BlockLevel)
+                        .iter()
+                        .copied()
+                        .filter(|parent| level_ancestors.contains(parent))
+                        .collect_vec()
+                        .push_if_empty(ORIGIN),
+                );
+
+                self.relations_stores.write()[level].insert(header.hash, parents.clone()).unwrap();
+
+                if level == 0 {
+                    let gd = if let Some(gd) = trusted_gd_map.get(&header.hash) {
+                        gd.clone()
+                    } else {
+                        let calculated_gd = self.ghostdag_manager.ghostdag(&parents);
+                        // Override the ghostdag data with the real blue score and blue work
+                        GhostdagData {
+                            blue_score: header.blue_score,
+                            blue_work: header.blue_work,
+                            selected_parent: calculated_gd.selected_parent,
+                            mergeset_blues: calculated_gd.mergeset_blues,
+                            mergeset_reds: calculated_gd.mergeset_reds,
+                            blues_anticone_sizes: calculated_gd.blues_anticone_sizes,
+                        }
+                    };
+                    self.ghostdag_store.insert(header.hash, Arc::new(gd)).unwrap();
+                }
+
+                level_ancestors.insert(header.hash);
+            }
+        }
+
+        let virtual_parents = vec![pruning_point];
+        let virtual_state = Arc::new(VirtualState {
+            parents: virtual_parents.clone(),
+            ghostdag_data: self.ghostdag_manager.ghostdag(&virtual_parents),
+            ..VirtualState::default()
+        });
+        self.virtual_stores.write().state.set(virtual_state).unwrap();
+
+        let mut batch = WriteBatch::default();
+        self.body_tips_store.write().init_batch(&mut batch, &virtual_parents).unwrap();
+        self.headers_selected_tip_store
+            .write()
+            .set_batch(&mut batch, SortableBlock { hash: pruning_point, blue_work: pruning_point_header.blue_work })
+            .unwrap();
+        self.selected_chain_store.write().init_with_pruning_point(&mut batch, pruning_point).unwrap();
+        self.depth_store.insert_batch(&mut batch, pruning_point, ORIGIN, ORIGIN).unwrap();
+        self.db.write(batch).unwrap();
+
+        Ok(())
+    }
+
+    pub fn populate_reachability_and_headers(&self, proof: &PruningPointProof) {
+        let capacity_estimate = self.estimate_proof_unique_size(proof);
+        let mut dag = BlockHashMap::with_capacity(capacity_estimate);
+        let mut up_heap = BinaryHeap::with_capacity(capacity_estimate);
+        for header in proof.iter().flatten().cloned() {
+            if let Vacant(e) = dag.entry(header.hash) {
+                let block_level = calc_block_level(&header, self.max_block_level);
+                self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
+
+                let mut parents = BlockHashSet::with_capacity(header.direct_parents().len() * 2);
+                // We collect all available parent relations in order to maximize reachability information.
+                // By taking into account parents from all levels we ensure that the induced DAG has valid
+                // reachability information for each level-specific sub-DAG -- hence a single reachability
+                // oracle can serve them all
+                for level in 0..=self.max_block_level {
+                    for parent in self.parents_manager.parents_at_level(&header, level) {
+                        parents.insert(*parent);
+                    }
+                }
+
+                struct DagEntry {
+                    header: Arc<Header>,
+                    parents: Arc<BlockHashSet>,
+                }
+
+                up_heap.push(Reverse(SortableBlock { hash: header.hash, blue_work: header.blue_work }));
+                e.insert(DagEntry { header, parents: Arc::new(parents) });
+            }
+        }
+
+        debug!("Estimated proof size: {}, actual size: {}", capacity_estimate, dag.len());
+
+        for reverse_sortable_block in up_heap.into_sorted_iter() {
+            // TODO: Convert to into_iter_sorted once it gets stable
+            let hash = reverse_sortable_block.0.hash;
+            let dag_entry = dag.get(&hash).unwrap();
+
+            // Filter only existing parents
+            let parents_in_dag = BinaryHeap::from_iter(
+                dag_entry
+                    .parents
+                    .iter()
+                    .cloned()
+                    .filter(|parent| dag.contains_key(parent))
+                    .map(|parent| SortableBlock { hash: parent, blue_work: dag.get(&parent).unwrap().header.blue_work }),
+            );
+
+            let reachability_read = self.reachability_store.upgradable_read();
+
+            // Find the maximal parent antichain from the possibly redundant set of existing parents
+            let mut reachability_parents: Vec<SortableBlock> = Vec::new();
+            for parent in parents_in_dag.into_sorted_iter() {
+                if reachability_read.is_dag_ancestor_of_any(parent.hash, &mut reachability_parents.iter().map(|parent| parent.hash)) {
+                    continue;
+                }
+
+                reachability_parents.push(parent);
+            }
+            let reachability_parents_hashes =
+                BlockHashes::new(reachability_parents.iter().map(|parent| parent.hash).collect_vec().push_if_empty(ORIGIN));
+            let selected_parent = reachability_parents.iter().max().map(|parent| parent.hash).unwrap_or(ORIGIN);
+
+            // Prepare batch
+            let mut batch = WriteBatch::default();
+            let mut reachability_relations_write = self.reachability_relations_store.write();
+            let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
+            let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write);
+
+            // Stage
+            staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap();
+            let mergeset = unordered_mergeset_without_selected_parent(
+                &staging_reachability_relations,
+                &staging_reachability,
+                selected_parent,
+                &reachability_parents_hashes,
+            );
+            reachability::add_block(&mut staging_reachability, hash, selected_parent, &mut mergeset.iter().copied()).unwrap();
+
+            // Commit
+            let reachability_write = staging_reachability.commit(&mut batch).unwrap();
+            staging_reachability_relations.commit(&mut batch).unwrap();
+
+            // Write
+            self.db.write(batch).unwrap();
+
+            // Drop
+            drop(reachability_write);
+            drop(reachability_relations_write);
+        }
+    }
+}
diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs
index f151b28508..de28ae4991 100644
--- a/consensus/src/processes/pruning_proof/mod.rs
+++ b/consensus/src/processes/pruning_proof/mod.rs
@@ -1,10 +1,10 @@
+mod apply;
 mod build;
 
 use std::{
-    cmp::Reverse,
     collections::{
-        hash_map::Entry::{self, Vacant},
-        BinaryHeap, HashSet, VecDeque,
+        hash_map::Entry::{self},
+        VecDeque,
     },
     ops::{Deref, DerefMut},
     sync::{
@@ -26,17 +26,17 @@ use kaspa_consensus_core::{
     },
     header::Header,
     pruning::{PruningPointProof, PruningPointTrustedData},
-    trusted::{TrustedBlock, TrustedGhostdagData, TrustedHeader},
+    trusted::{TrustedGhostdagData, TrustedHeader},
     BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
 };
-use kaspa_core::{debug, info, trace};
+use kaspa_core::info;
 use kaspa_database::{
     prelude::{CachePolicy, ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions},
     utils::DbLifetime,
 };
 use kaspa_hashes::Hash;
 use kaspa_pow::calc_block_level;
-use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};
+use kaspa_utils::vec::VecExtensions;
 use thiserror::Error;
 
 use crate::{
@@ -45,35 +45,26 @@ use crate::{
         storage::ConsensusStorage,
     },
     model::{
-        services::{
-            reachability::{MTReachabilityService, ReachabilityService},
-            relations::MTRelationsService,
-        },
+        services::{reachability::MTReachabilityService, relations::MTRelationsService},
         stores::{
             depth::DbDepthStore,
-            ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStore, GhostdagStoreReader},
+            ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagStore, GhostdagStoreReader},
             headers::{DbHeadersStore, HeaderStore, HeaderStoreReader},
             headers_selected_tip::DbHeadersSelectedTipStore,
             past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStore},
             pruning::{DbPruningStore, PruningStoreReader},
-            reachability::{DbReachabilityStore, ReachabilityStoreReader, StagingReachabilityStore},
-            relations::{DbRelationsStore, RelationsStoreReader, StagingRelationsStore},
-            selected_chain::{DbSelectedChainStore, SelectedChainStore},
+            reachability::{DbReachabilityStore, ReachabilityStoreReader},
+            relations::{DbRelationsStore, RelationsStoreReader},
+            selected_chain::DbSelectedChainStore,
             tips::DbTipsStore,
-            virtual_state::{VirtualState, VirtualStateStore, VirtualStateStoreReader, VirtualStores},
+            virtual_state::{VirtualStateStoreReader, VirtualStores},
             DB,
         },
     },
-    processes::{
-        ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions,
-        window::WindowType,
-    },
+    processes::{reachability::inquirer as reachability, relations::RelationsStoreExtensions, window::WindowType},
 };
 
-use super::{
-    ghostdag::{mergeset::unordered_mergeset_without_selected_parent, protocol::GhostdagManager},
-    window::WindowManager,
-};
+use super::{ghostdag::protocol::GhostdagManager, window::WindowManager};
 
 #[derive(Error, Debug)]
 enum PruningProofManagerInternalError {
@@ -225,204 +216,12 @@ impl PruningProofManager {
         drop(pruning_point_write);
     }
 
-    pub fn apply_proof(&self, mut proof: PruningPointProof, trusted_set: &[TrustedBlock]) -> PruningImportResult<()> {
-        let pruning_point_header = proof[0].last().unwrap().clone();
-        let pruning_point = pruning_point_header.hash;
-
-        // Create a copy of the proof, since we're going to be mutating the proof passed to us
-        let proof_sets = (0..=self.max_block_level)
-            .map(|level| BlockHashSet::from_iter(proof[level as usize].iter().map(|header| header.hash)))
-            .collect_vec();
-
-        let mut trusted_gd_map: BlockHashMap<GhostdagData> = BlockHashMap::new();
-        for tb in trusted_set.iter() {
-            trusted_gd_map.insert(tb.block.hash(), tb.ghostdag.clone().into());
-            let tb_block_level = calc_block_level(&tb.block.header, self.max_block_level);
-
-            (0..=tb_block_level).for_each(|current_proof_level| {
-                // If this block was in the original proof, ignore it
-                if proof_sets[current_proof_level as usize].contains(&tb.block.hash()) {
-                    return;
-                }
-
-                proof[current_proof_level as usize].push(tb.block.header.clone());
-            });
-        }
-
-        proof.iter_mut().for_each(|level_proof| {
-            level_proof.sort_by(|a, b| a.blue_work.cmp(&b.blue_work));
-        });
-
-        self.populate_reachability_and_headers(&proof);
-
-        {
-            let reachability_read = self.reachability_store.read();
-            for tb in trusted_set.iter() {
-                // Header-only trusted blocks are expected to be in pruning point past
-                if tb.block.is_header_only() && !reachability_read.is_dag_ancestor_of(tb.block.hash(), pruning_point) {
-                    return Err(PruningImportError::PruningPointPastMissingReachability(tb.block.hash()));
-                }
-            }
-        }
-
-        for (level, headers) in proof.iter().enumerate() {
-            trace!("Applying level {} from the pruning point proof", level);
-            let mut level_ancestors: HashSet<Hash> = HashSet::new();
-            level_ancestors.insert(ORIGIN);
-
-            for header in headers.iter() {
-                let parents = Arc::new(
-                    self.parents_manager
-                        .parents_at_level(header, level as BlockLevel)
-                        .iter()
-                        .copied()
-                        .filter(|parent| level_ancestors.contains(parent))
-                        .collect_vec()
-                        .push_if_empty(ORIGIN),
-                );
-
-                self.relations_stores.write()[level].insert(header.hash, parents.clone()).unwrap();
-
-                if level == 0 {
-                    let gd = if let Some(gd) = trusted_gd_map.get(&header.hash) {
-                        gd.clone()
-                    } else {
-                        let calculated_gd = self.ghostdag_manager.ghostdag(&parents);
-                        // Override the ghostdag data with the real blue score and blue work
-                        GhostdagData {
-                            blue_score: header.blue_score,
-                            blue_work: header.blue_work,
-                            selected_parent: calculated_gd.selected_parent,
-                            mergeset_blues: calculated_gd.mergeset_blues,
-                            mergeset_reds: calculated_gd.mergeset_reds,
-                            blues_anticone_sizes: calculated_gd.blues_anticone_sizes,
-                        }
-                    };
-                    self.ghostdag_store.insert(header.hash, Arc::new(gd)).unwrap();
-                }
-
-                level_ancestors.insert(header.hash);
-            }
-        }
-
-        let virtual_parents = vec![pruning_point];
-        let virtual_state = Arc::new(VirtualState {
-            parents: virtual_parents.clone(),
-            ghostdag_data: self.ghostdag_manager.ghostdag(&virtual_parents),
-            ..VirtualState::default()
-        });
-        self.virtual_stores.write().state.set(virtual_state).unwrap();
-
-        let mut batch = WriteBatch::default();
-        self.body_tips_store.write().init_batch(&mut batch, &virtual_parents).unwrap();
-        self.headers_selected_tip_store
-            .write()
-            .set_batch(&mut batch, SortableBlock { hash: pruning_point, blue_work: pruning_point_header.blue_work })
-            .unwrap();
-        self.selected_chain_store.write().init_with_pruning_point(&mut batch, pruning_point).unwrap();
-        self.depth_store.insert_batch(&mut batch, pruning_point, ORIGIN, ORIGIN).unwrap();
-        self.db.write(batch).unwrap();
-
-        Ok(())
-    }
-
     fn estimate_proof_unique_size(&self, proof: &PruningPointProof) -> usize {
         let approx_history_size = proof[0][0].daa_score;
         let approx_unique_full_levels = f64::log2(approx_history_size as f64 / self.pruning_proof_m as f64).max(0f64) as usize;
         proof.iter().map(|l| l.len()).sum::<usize>().min((approx_unique_full_levels + 1) * self.pruning_proof_m as usize)
     }
 
-    pub fn populate_reachability_and_headers(&self, proof: &PruningPointProof) {
-        let capacity_estimate = self.estimate_proof_unique_size(proof);
-        let mut dag = BlockHashMap::with_capacity(capacity_estimate);
-        let mut up_heap = BinaryHeap::with_capacity(capacity_estimate);
-        for header in proof.iter().flatten().cloned() {
-            if let Vacant(e) = dag.entry(header.hash) {
-                let block_level = calc_block_level(&header, self.max_block_level);
-                self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
-
-                let mut parents = BlockHashSet::with_capacity(header.direct_parents().len() * 2);
-                // We collect all available parent relations in order to maximize reachability information.
-                // By taking into account parents from all levels we ensure that the induced DAG has valid
-                // reachability information for each level-specific sub-DAG -- hence a single reachability
-                // oracle can serve them all
-                for level in 0..=self.max_block_level {
-                    for parent in self.parents_manager.parents_at_level(&header, level) {
-                        parents.insert(*parent);
-                    }
-                }
-
-                struct DagEntry {
-                    header: Arc<Header>,
-                    parents: Arc<BlockHashSet>,
-                }
-
-                up_heap.push(Reverse(SortableBlock { hash: header.hash, blue_work: header.blue_work }));
-                e.insert(DagEntry { header, parents: Arc::new(parents) });
-            }
-        }
-
-        debug!("Estimated proof size: {}, actual size: {}", capacity_estimate, dag.len());
-
-        for reverse_sortable_block in up_heap.into_sorted_iter() {
-            // TODO: Convert to into_iter_sorted once it gets stable
-            let hash = reverse_sortable_block.0.hash;
-            let dag_entry = dag.get(&hash).unwrap();
-
-            // Filter only existing parents
-            let parents_in_dag = BinaryHeap::from_iter(
-                dag_entry
-                    .parents
-                    .iter()
-                    .cloned()
-                    .filter(|parent| dag.contains_key(parent))
-                    .map(|parent| SortableBlock { hash: parent, blue_work: dag.get(&parent).unwrap().header.blue_work }),
-            );
-
-            let reachability_read = self.reachability_store.upgradable_read();
-
-            // Find the maximal parent antichain from the possibly redundant set of existing parents
-            let mut reachability_parents: Vec<SortableBlock> = Vec::new();
-            for parent in parents_in_dag.into_sorted_iter() {
-                if reachability_read.is_dag_ancestor_of_any(parent.hash, &mut reachability_parents.iter().map(|parent| parent.hash)) {
-                    continue;
-                }
-
-                reachability_parents.push(parent);
-            }
-            let reachability_parents_hashes =
-                BlockHashes::new(reachability_parents.iter().map(|parent| parent.hash).collect_vec().push_if_empty(ORIGIN));
-            let selected_parent = reachability_parents.iter().max().map(|parent| parent.hash).unwrap_or(ORIGIN);
-
-            // Prepare batch
-            let mut batch = WriteBatch::default();
-            let mut reachability_relations_write = self.reachability_relations_store.write();
-            let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
-            let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write);
-
-            // Stage
-            staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap();
-            let mergeset = unordered_mergeset_without_selected_parent(
-                &staging_reachability_relations,
-                &staging_reachability,
-                selected_parent,
-                &reachability_parents_hashes,
-            );
-            reachability::add_block(&mut staging_reachability, hash, selected_parent, &mut mergeset.iter().copied()).unwrap();
-
-            // Commit
-            let reachability_write = staging_reachability.commit(&mut batch).unwrap();
-            staging_reachability_relations.commit(&mut batch).unwrap();
-
-            // Write
-            self.db.write(batch).unwrap();
-
-            // Drop
-            drop(reachability_write);
-            drop(reachability_relations_write);
-        }
-    }
-
     fn init_validate_pruning_point_proof_stores_and_processes(
         &self,
         proof: &PruningPointProof,

From c3f4f89525e0a74a752079803d99640f4ed0ffeb Mon Sep 17 00:00:00 2001
From: coderofstuff <114628839+coderofstuff@users.noreply.github.com>
Date: Tue, 5 Nov 2024 18:39:01 -0700
Subject: [PATCH 4/5] Refactor validate pruning functions

---
 consensus/src/processes/pruning_proof/mod.rs  | 365 +----------------
 .../src/processes/pruning_proof/validate.rs   | 376 ++++++++++++++++++
 2 files changed, 385 insertions(+), 356 deletions(-)
 create mode 100644 consensus/src/processes/pruning_proof/validate.rs

diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs
index de28ae4991..458b1ba7eb 100644
--- a/consensus/src/processes/pruning_proof/mod.rs
+++ b/consensus/src/processes/pruning_proof/mod.rs
@@ -1,42 +1,32 @@
 mod apply;
 mod build;
+mod validate;
 
 use std::{
     collections::{
         hash_map::Entry::{self},
         VecDeque,
     },
-    ops::{Deref, DerefMut},
-    sync::{
-        atomic::{AtomicBool, Ordering},
-        Arc,
-    },
+    ops::Deref,
+    sync::{atomic::AtomicBool, Arc},
 };
 
 use itertools::Itertools;
-use kaspa_math::int::SignedInteger;
 use parking_lot::{Mutex, RwLock};
 use rocksdb::WriteBatch;
 
 use kaspa_consensus_core::{
-    blockhash::{self, BlockHashExtensions, BlockHashes, ORIGIN},
-    errors::{
-        consensus::{ConsensusError, ConsensusResult},
-        pruning::{PruningImportError, PruningImportResult},
-    },
+    blockhash::{self, BlockHashExtensions},
+    errors::consensus::{ConsensusError, ConsensusResult},
     header::Header,
     pruning::{PruningPointProof, PruningPointTrustedData},
     trusted::{TrustedGhostdagData, TrustedHeader},
     BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
 };
 use kaspa_core::info;
-use kaspa_database::{
-    prelude::{CachePolicy, ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions},
-    utils::DbLifetime,
-};
+use kaspa_database::{prelude::StoreResultExtensions, utils::DbLifetime};
 use kaspa_hashes::Hash;
 use kaspa_pow::calc_block_level;
-use kaspa_utils::vec::VecExtensions;
 use thiserror::Error;
 
 use crate::{
@@ -48,12 +38,12 @@ use crate::{
         services::{reachability::MTReachabilityService, relations::MTRelationsService},
         stores::{
             depth::DbDepthStore,
-            ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagStore, GhostdagStoreReader},
+            ghostdag::{DbGhostdagStore, GhostdagStoreReader},
             headers::{DbHeadersStore, HeaderStore, HeaderStoreReader},
             headers_selected_tip::DbHeadersSelectedTipStore,
             past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStore},
             pruning::{DbPruningStore, PruningStoreReader},
-            reachability::{DbReachabilityStore, ReachabilityStoreReader},
+            reachability::DbReachabilityStore,
             relations::{DbRelationsStore, RelationsStoreReader},
             selected_chain::DbSelectedChainStore,
             tips::DbTipsStore,
@@ -61,7 +51,7 @@ use crate::{
             DB,
         },
     },
-    processes::{reachability::inquirer as reachability, relations::RelationsStoreExtensions, window::WindowType},
+    processes::window::WindowType,
 };
 
 use super::{ghostdag::protocol::GhostdagManager, window::WindowManager};
@@ -222,343 +212,6 @@ impl PruningProofManager {
         proof.iter().map(|l| l.len()).sum::<usize>().min((approx_unique_full_levels + 1) * self.pruning_proof_m as usize)
     }
 
-    fn init_validate_pruning_point_proof_stores_and_processes(
-        &self,
-        proof: &PruningPointProof,
-    ) -> PruningImportResult<TempProofContext> {
-        if proof[0].is_empty() {
-            return Err(PruningImportError::PruningProofNotEnoughHeaders);
-        }
-
-        let headers_estimate = self.estimate_proof_unique_size(proof);
-
-        let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
-        let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize);
-        let headers_store =
-            Arc::new(DbHeadersStore::new(db.clone(), CachePolicy::Count(headers_estimate), CachePolicy::Count(headers_estimate)));
-        let ghostdag_stores = (0..=self.max_block_level)
-            .map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, cache_policy, cache_policy)))
-            .collect_vec();
-        let mut relations_stores =
-            (0..=self.max_block_level).map(|level| DbRelationsStore::new(db.clone(), level, cache_policy, cache_policy)).collect_vec();
-        let reachability_stores = (0..=self.max_block_level)
-            .map(|level| Arc::new(RwLock::new(DbReachabilityStore::with_block_level(db.clone(), cache_policy, cache_policy, level))))
-            .collect_vec();
-
-        let reachability_services = (0..=self.max_block_level)
-            .map(|level| MTReachabilityService::new(reachability_stores[level as usize].clone()))
-            .collect_vec();
-
-        let ghostdag_managers = ghostdag_stores
-            .iter()
-            .cloned()
-            .enumerate()
-            .map(|(level, ghostdag_store)| {
-                GhostdagManager::new(
-                    self.genesis_hash,
-                    self.ghostdag_k,
-                    ghostdag_store,
-                    relations_stores[level].clone(),
-                    headers_store.clone(),
-                    reachability_services[level].clone(),
-                    level != 0,
-                )
-            })
-            .collect_vec();
-
-        {
-            let mut batch = WriteBatch::default();
-            for level in 0..=self.max_block_level {
-                let level = level as usize;
-                reachability::init(reachability_stores[level].write().deref_mut()).unwrap();
-                relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap();
-                ghostdag_stores[level].insert(ORIGIN, ghostdag_managers[level].origin_ghostdag_data()).unwrap();
-            }
-
-            db.write(batch).unwrap();
-        }
-
-        Ok(TempProofContext { db_lifetime, headers_store, ghostdag_stores, relations_stores, reachability_stores, ghostdag_managers })
-    }
-
-    fn populate_stores_for_validate_pruning_point_proof(
-        &self,
-        proof: &PruningPointProof,
-        ctx: &mut TempProofContext,
-        log_validating: bool,
-    ) -> PruningImportResult<Vec<Hash>> {
-        let headers_store = &ctx.headers_store;
-        let ghostdag_stores = &ctx.ghostdag_stores;
-        let mut relations_stores = ctx.relations_stores.clone();
-        let reachability_stores = &ctx.reachability_stores;
-        let ghostdag_managers = &ctx.ghostdag_managers;
-
-        let proof_pp_header = proof[0].last().expect("checked if empty");
-        let proof_pp = proof_pp_header.hash;
-
-        let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
-        for level in (0..=self.max_block_level).rev() {
-            // Before processing this level, check if the process is exiting so we can end early
-            if self.is_consensus_exiting.load(Ordering::Relaxed) {
-                return Err(PruningImportError::PruningValidationInterrupted);
-            }
-
-            if log_validating {
-                info!("Validating level {level} from the pruning point proof ({} headers)", proof[level as usize].len());
-            }
-            let level_idx = level as usize;
-            let mut selected_tip = None;
-            for (i, header) in proof[level as usize].iter().enumerate() {
-                let header_level = calc_block_level(header, self.max_block_level);
-                if header_level < level {
-                    return Err(PruningImportError::PruningProofWrongBlockLevel(header.hash, header_level, level));
-                }
-
-                headers_store.insert(header.hash, header.clone(), header_level).unwrap_or_exists();
-
-                let parents = self
-                    .parents_manager
-                    .parents_at_level(header, level)
-                    .iter()
-                    .copied()
-                    .filter(|parent| ghostdag_stores[level_idx].has(*parent).unwrap())
-                    .collect_vec();
-
-                // Only the first block at each level is allowed to have no known parents
-                if parents.is_empty() && i != 0 {
-                    return Err(PruningImportError::PruningProofHeaderWithNoKnownParents(header.hash, level));
-                }
-
-                let parents: BlockHashes = parents.push_if_empty(ORIGIN).into();
-
-                if relations_stores[level_idx].has(header.hash).unwrap() {
-                    return Err(PruningImportError::PruningProofDuplicateHeaderAtLevel(header.hash, level));
-                }
-
-                relations_stores[level_idx].insert(header.hash, parents.clone()).unwrap();
-                let ghostdag_data = Arc::new(ghostdag_managers[level_idx].ghostdag(&parents));
-                ghostdag_stores[level_idx].insert(header.hash, ghostdag_data.clone()).unwrap();
-                selected_tip = Some(match selected_tip {
-                    Some(tip) => ghostdag_managers[level_idx].find_selected_parent([tip, header.hash]),
-                    None => header.hash,
-                });
-
-                let mut reachability_mergeset = {
-                    let reachability_read = reachability_stores[level_idx].read();
-                    ghostdag_data
-                        .unordered_mergeset_without_selected_parent()
-                        .filter(|hash| reachability_read.has(*hash).unwrap())
-                        .collect_vec() // We collect to vector so reachability_read can be released and let `reachability::add_block` use a write lock.
-                        .into_iter()
-                };
-                reachability::add_block(
-                    reachability_stores[level_idx].write().deref_mut(),
-                    header.hash,
-                    ghostdag_data.selected_parent,
-                    &mut reachability_mergeset,
-                )
-                .unwrap();
-
-                if selected_tip.unwrap() == header.hash {
-                    reachability::hint_virtual_selected_parent(reachability_stores[level_idx].write().deref_mut(), header.hash)
-                        .unwrap();
-                }
-            }
-
-            if level < self.max_block_level {
-                let block_at_depth_m_at_next_level = self
-                    .block_at_depth(
-                        &*ghostdag_stores[level_idx + 1],
-                        selected_tip_by_level[level_idx + 1].unwrap(),
-                        self.pruning_proof_m,
-                    )
-                    .unwrap();
-                if !relations_stores[level_idx].has(block_at_depth_m_at_next_level).unwrap() {
-                    return Err(PruningImportError::PruningProofMissingBlockAtDepthMFromNextLevel(level, level + 1));
-                }
-            }
-
-            if selected_tip.unwrap() != proof_pp
-                && !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&selected_tip.unwrap())
-            {
-                return Err(PruningImportError::PruningProofMissesBlocksBelowPruningPoint(selected_tip.unwrap(), level));
-            }
-
-            selected_tip_by_level[level_idx] = selected_tip;
-        }
-
-        Ok(selected_tip_by_level.into_iter().map(|selected_tip| selected_tip.unwrap()).collect())
-    }
-
-    fn validate_proof_selected_tip(
-        &self,
-        proof_selected_tip: Hash,
-        level: BlockLevel,
-        proof_pp_level: BlockLevel,
-        proof_pp: Hash,
-        proof_pp_header: &Header,
-    ) -> PruningImportResult<()> {
-        // A proof selected tip of some level has to be the proof suggested prunint point itself if its level
-        // is lower or equal to the pruning point level, or a parent of the pruning point on the relevant level
-        // otherwise.
-        if level <= proof_pp_level {
-            if proof_selected_tip != proof_pp {
-                return Err(PruningImportError::PruningProofSelectedTipIsNotThePruningPoint(proof_selected_tip, level));
-            }
-        } else if !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&proof_selected_tip) {
-            return Err(PruningImportError::PruningProofSelectedTipNotParentOfPruningPoint(proof_selected_tip, level));
-        }
-
-        Ok(())
-    }
-
-    // find_proof_and_consensus_common_chain_ancestor_ghostdag_data returns an option of a tuple
-    // that contains the ghostdag data of the proof and current consensus common ancestor. If no
-    // such ancestor exists, it returns None.
-    fn find_proof_and_consensus_common_ancestor_ghostdag_data(
-        &self,
-        proof_ghostdag_stores: &[Arc<DbGhostdagStore>],
-        current_consensus_ghostdag_stores: &[Arc<DbGhostdagStore>],
-        proof_selected_tip: Hash,
-        level: BlockLevel,
-        proof_selected_tip_gd: CompactGhostdagData,
-    ) -> Option<(CompactGhostdagData, CompactGhostdagData)> {
-        let mut proof_current = proof_selected_tip;
-        let mut proof_current_gd = proof_selected_tip_gd;
-        loop {
-            match current_consensus_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap_option() {
-                Some(current_gd) => {
-                    break Some((proof_current_gd, current_gd));
-                }
-                None => {
-                    proof_current = proof_current_gd.selected_parent;
-                    if proof_current.is_origin() {
-                        break None;
-                    }
-                    proof_current_gd = proof_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap();
-                }
-            };
-        }
-    }
-
-    pub fn validate_pruning_point_proof(&self, proof: &PruningPointProof) -> PruningImportResult<()> {
-        if proof.len() != self.max_block_level as usize + 1 {
-            return Err(PruningImportError::ProofNotEnoughLevels(self.max_block_level as usize + 1));
-        }
-
-        // Initialize the stores for the proof
-        let mut proof_stores_and_processes = self.init_validate_pruning_point_proof_stores_and_processes(proof)?;
-        let proof_pp_header = proof[0].last().expect("checked if empty");
-        let proof_pp = proof_pp_header.hash;
-        let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);
-        let proof_selected_tip_by_level =
-            self.populate_stores_for_validate_pruning_point_proof(proof, &mut proof_stores_and_processes, true)?;
-        let proof_ghostdag_stores = proof_stores_and_processes.ghostdag_stores;
-
-        // Get the proof for the current consensus and recreate the stores for it
-        // This is expected to be fast because if a proof exists, it will be cached.
-        // If no proof exists, this is empty
-        let mut current_consensus_proof = self.get_pruning_point_proof();
-        if current_consensus_proof.is_empty() {
-            // An empty proof can only happen if we're at genesis. We're going to create a proof for this case that contains the genesis header only
-            let genesis_header = self.headers_store.get_header(self.genesis_hash).unwrap();
-            current_consensus_proof = Arc::new((0..=self.max_block_level).map(|_| vec![genesis_header.clone()]).collect_vec());
-        }
-        let mut current_consensus_stores_and_processes =
-            self.init_validate_pruning_point_proof_stores_and_processes(&current_consensus_proof)?;
-        let _ = self.populate_stores_for_validate_pruning_point_proof(
-            &current_consensus_proof,
-            &mut current_consensus_stores_and_processes,
-            false,
-        )?;
-        let current_consensus_ghostdag_stores = current_consensus_stores_and_processes.ghostdag_stores;
-
-        let pruning_read = self.pruning_point_store.read();
-        let relations_read = self.relations_stores.read();
-        let current_pp = pruning_read.get().unwrap().pruning_point;
-        let current_pp_header = self.headers_store.get_header(current_pp).unwrap();
-
-        for (level_idx, selected_tip) in proof_selected_tip_by_level.iter().copied().enumerate() {
-            let level = level_idx as BlockLevel;
-            self.validate_proof_selected_tip(selected_tip, level, proof_pp_level, proof_pp, proof_pp_header)?;
-
-            let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(selected_tip).unwrap();
-
-            // Next check is to see if this proof is "better" than what's in the current consensus
-            // Step 1 - look at only levels that have a full proof (least 2m blocks in the proof)
-            if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m {
-                continue;
-            }
-
-            // Step 2 - if we can find a common ancestor between the proof and current consensus
-            // we can determine if the proof is better. The proof is better if the blue work* difference between the
-            // old current consensus's tips and the common ancestor is less than the blue work difference between the
-            // proof's tip and the common ancestor.
-            // *Note: blue work is the same as blue score on levels higher than 0
-            if let Some((proof_common_ancestor_gd, common_ancestor_gd)) = self.find_proof_and_consensus_common_ancestor_ghostdag_data(
-                &proof_ghostdag_stores,
-                &current_consensus_ghostdag_stores,
-                selected_tip,
-                level,
-                proof_selected_tip_gd,
-            ) {
-                let selected_tip_blue_work_diff =
-                    SignedInteger::from(proof_selected_tip_gd.blue_work) - SignedInteger::from(proof_common_ancestor_gd.blue_work);
-                for parent in self.parents_manager.parents_at_level(&current_pp_header, level).iter().copied() {
-                    let parent_blue_work = current_consensus_ghostdag_stores[level_idx].get_blue_work(parent).unwrap();
-                    let parent_blue_work_diff =
-                        SignedInteger::from(parent_blue_work) - SignedInteger::from(common_ancestor_gd.blue_work);
-                    if parent_blue_work_diff >= selected_tip_blue_work_diff {
-                        return Err(PruningImportError::PruningProofInsufficientBlueWork);
-                    }
-                }
-
-                return Ok(());
-            }
-        }
-
-        if current_pp == self.genesis_hash {
-            // If the proof has better tips and the current pruning point is still
-            // genesis, we consider the proof state to be better.
-            return Ok(());
-        }
-
-        // If we got here it means there's no level with shared blocks
-        // between the proof and the current consensus. In this case we
-        // consider the proof to be better if it has at least one level
-        // with 2*self.pruning_proof_m blue blocks where consensus doesn't.
-        for level in (0..=self.max_block_level).rev() {
-            let level_idx = level as usize;
-
-            let proof_selected_tip = proof_selected_tip_by_level[level_idx];
-            let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(proof_selected_tip).unwrap();
-            if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m {
-                continue;
-            }
-
-            match relations_read[level_idx].get_parents(current_pp).unwrap_option() {
-                Some(parents) => {
-                    if parents.iter().copied().any(|parent| {
-                        current_consensus_ghostdag_stores[level_idx].get_blue_score(parent).unwrap() < 2 * self.pruning_proof_m
-                    }) {
-                        return Ok(());
-                    }
-                }
-                None => {
-                    // If the current pruning point doesn't have a parent at this level, we consider the proof state to be better.
-                    return Ok(());
-                }
-            }
-        }
-
-        drop(pruning_read);
-        drop(relations_read);
-        drop(proof_stores_and_processes.db_lifetime);
-        drop(current_consensus_stores_and_processes.db_lifetime);
-
-        Err(PruningImportError::PruningProofNotEnoughHeaders)
-    }
-
     fn block_at_depth(
         &self,
         ghostdag_store: &impl GhostdagStoreReader,
diff --git a/consensus/src/processes/pruning_proof/validate.rs b/consensus/src/processes/pruning_proof/validate.rs
new file mode 100644
index 0000000000..63650cdc53
--- /dev/null
+++ b/consensus/src/processes/pruning_proof/validate.rs
@@ -0,0 +1,376 @@
+use std::{
+    ops::DerefMut,
+    sync::{atomic::Ordering, Arc},
+};
+
+use itertools::Itertools;
+use kaspa_consensus_core::{
+    blockhash::{BlockHashExtensions, BlockHashes, ORIGIN},
+    errors::pruning::{PruningImportError, PruningImportResult},
+    header::Header,
+    pruning::PruningPointProof,
+    BlockLevel,
+};
+use kaspa_core::info;
+use kaspa_database::prelude::{CachePolicy, ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions};
+use kaspa_hashes::Hash;
+use kaspa_math::int::SignedInteger;
+use kaspa_pow::calc_block_level;
+use kaspa_utils::vec::VecExtensions;
+use parking_lot::lock_api::RwLock;
+use rocksdb::WriteBatch;
+
+use crate::{
+    model::{
+        services::reachability::MTReachabilityService,
+        stores::{
+            ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagStore, GhostdagStoreReader},
+            headers::{DbHeadersStore, HeaderStore, HeaderStoreReader},
+            pruning::PruningStoreReader,
+            reachability::{DbReachabilityStore, ReachabilityStoreReader},
+            relations::{DbRelationsStore, RelationsStoreReader},
+        },
+    },
+    processes::{ghostdag::protocol::GhostdagManager, reachability::inquirer as reachability, relations::RelationsStoreExtensions},
+};
+
+use super::{PruningProofManager, TempProofContext};
+
+impl PruningProofManager {
+    pub fn validate_pruning_point_proof(&self, proof: &PruningPointProof) -> PruningImportResult<()> {
+        if proof.len() != self.max_block_level as usize + 1 {
+            return Err(PruningImportError::ProofNotEnoughLevels(self.max_block_level as usize + 1));
+        }
+
+        // Initialize the stores for the proof
+        let mut proof_stores_and_processes = self.init_validate_pruning_point_proof_stores_and_processes(proof)?;
+        let proof_pp_header = proof[0].last().expect("checked if empty");
+        let proof_pp = proof_pp_header.hash;
+        let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);
+        let proof_selected_tip_by_level =
+            self.populate_stores_for_validate_pruning_point_proof(proof, &mut proof_stores_and_processes, true)?;
+        let proof_ghostdag_stores = proof_stores_and_processes.ghostdag_stores;
+
+        // Get the proof for the current consensus and recreate the stores for it
+        // This is expected to be fast because if a proof exists, it will be cached.
+        // If no proof exists, this is empty
+        let mut current_consensus_proof = self.get_pruning_point_proof();
+        if current_consensus_proof.is_empty() {
+            // An empty proof can only happen if we're at genesis. We're going to create a proof for this case that contains the genesis header only
+            let genesis_header = self.headers_store.get_header(self.genesis_hash).unwrap();
+            current_consensus_proof = Arc::new((0..=self.max_block_level).map(|_| vec![genesis_header.clone()]).collect_vec());
+        }
+        let mut current_consensus_stores_and_processes =
+            self.init_validate_pruning_point_proof_stores_and_processes(&current_consensus_proof)?;
+        let _ = self.populate_stores_for_validate_pruning_point_proof(
+            &current_consensus_proof,
+            &mut current_consensus_stores_and_processes,
+            false,
+        )?;
+        let current_consensus_ghostdag_stores = current_consensus_stores_and_processes.ghostdag_stores;
+
+        let pruning_read = self.pruning_point_store.read();
+        let relations_read = self.relations_stores.read();
+        let current_pp = pruning_read.get().unwrap().pruning_point;
+        let current_pp_header = self.headers_store.get_header(current_pp).unwrap();
+
+        for (level_idx, selected_tip) in proof_selected_tip_by_level.iter().copied().enumerate() {
+            let level = level_idx as BlockLevel;
+            self.validate_proof_selected_tip(selected_tip, level, proof_pp_level, proof_pp, proof_pp_header)?;
+
+            let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(selected_tip).unwrap();
+
+            // Next check is to see if this proof is "better" than what's in the current consensus
+            // Step 1 - look at only levels that have a full proof (least 2m blocks in the proof)
+            if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m {
+                continue;
+            }
+
+            // Step 2 - if we can find a common ancestor between the proof and current consensus
+            // we can determine if the proof is better. The proof is better if the blue work* difference between the
+            // old current consensus's tips and the common ancestor is less than the blue work difference between the
+            // proof's tip and the common ancestor.
+            // *Note: blue work is the same as blue score on levels higher than 0
+            if let Some((proof_common_ancestor_gd, common_ancestor_gd)) = self.find_proof_and_consensus_common_ancestor_ghostdag_data(
+                &proof_ghostdag_stores,
+                &current_consensus_ghostdag_stores,
+                selected_tip,
+                level,
+                proof_selected_tip_gd,
+            ) {
+                let selected_tip_blue_work_diff =
+                    SignedInteger::from(proof_selected_tip_gd.blue_work) - SignedInteger::from(proof_common_ancestor_gd.blue_work);
+                for parent in self.parents_manager.parents_at_level(&current_pp_header, level).iter().copied() {
+                    let parent_blue_work = current_consensus_ghostdag_stores[level_idx].get_blue_work(parent).unwrap();
+                    let parent_blue_work_diff =
+                        SignedInteger::from(parent_blue_work) - SignedInteger::from(common_ancestor_gd.blue_work);
+                    if parent_blue_work_diff >= selected_tip_blue_work_diff {
+                        return Err(PruningImportError::PruningProofInsufficientBlueWork);
+                    }
+                }
+
+                return Ok(());
+            }
+        }
+
+        if current_pp == self.genesis_hash {
+            // If the proof has better tips and the current pruning point is still
+            // genesis, we consider the proof state to be better.
+            return Ok(());
+        }
+
+        // If we got here it means there's no level with shared blocks
+        // between the proof and the current consensus. In this case we
+        // consider the proof to be better if it has at least one level
+        // with 2*self.pruning_proof_m blue blocks where consensus doesn't.
+        for level in (0..=self.max_block_level).rev() {
+            let level_idx = level as usize;
+
+            let proof_selected_tip = proof_selected_tip_by_level[level_idx];
+            let proof_selected_tip_gd = proof_ghostdag_stores[level_idx].get_compact_data(proof_selected_tip).unwrap();
+            if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m {
+                continue;
+            }
+
+            match relations_read[level_idx].get_parents(current_pp).unwrap_option() {
+                Some(parents) => {
+                    if parents.iter().copied().any(|parent| {
+                        current_consensus_ghostdag_stores[level_idx].get_blue_score(parent).unwrap() < 2 * self.pruning_proof_m
+                    }) {
+                        return Ok(());
+                    }
+                }
+                None => {
+                    // If the current pruning point doesn't have a parent at this level, we consider the proof state to be better.
+                    return Ok(());
+                }
+            }
+        }
+
+        drop(pruning_read);
+        drop(relations_read);
+        drop(proof_stores_and_processes.db_lifetime);
+        drop(current_consensus_stores_and_processes.db_lifetime);
+
+        Err(PruningImportError::PruningProofNotEnoughHeaders)
+    }
+
+    fn init_validate_pruning_point_proof_stores_and_processes(
+        &self,
+        proof: &PruningPointProof,
+    ) -> PruningImportResult<TempProofContext> {
+        if proof[0].is_empty() {
+            return Err(PruningImportError::PruningProofNotEnoughHeaders);
+        }
+
+        let headers_estimate = self.estimate_proof_unique_size(proof);
+
+        let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
+        let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize);
+        let headers_store =
+            Arc::new(DbHeadersStore::new(db.clone(), CachePolicy::Count(headers_estimate), CachePolicy::Count(headers_estimate)));
+        let ghostdag_stores = (0..=self.max_block_level)
+            .map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, cache_policy, cache_policy)))
+            .collect_vec();
+        let mut relations_stores =
+            (0..=self.max_block_level).map(|level| DbRelationsStore::new(db.clone(), level, cache_policy, cache_policy)).collect_vec();
+        let reachability_stores = (0..=self.max_block_level)
+            .map(|level| Arc::new(RwLock::new(DbReachabilityStore::with_block_level(db.clone(), cache_policy, cache_policy, level))))
+            .collect_vec();
+
+        let reachability_services = (0..=self.max_block_level)
+            .map(|level| MTReachabilityService::new(reachability_stores[level as usize].clone()))
+            .collect_vec();
+
+        let ghostdag_managers = ghostdag_stores
+            .iter()
+            .cloned()
+            .enumerate()
+            .map(|(level, ghostdag_store)| {
+                GhostdagManager::new(
+                    self.genesis_hash,
+                    self.ghostdag_k,
+                    ghostdag_store,
+                    relations_stores[level].clone(),
+                    headers_store.clone(),
+                    reachability_services[level].clone(),
+                    level != 0,
+                )
+            })
+            .collect_vec();
+
+        {
+            let mut batch = WriteBatch::default();
+            for level in 0..=self.max_block_level {
+                let level = level as usize;
+                reachability::init(reachability_stores[level].write().deref_mut()).unwrap();
+                relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap();
+                ghostdag_stores[level].insert(ORIGIN, ghostdag_managers[level].origin_ghostdag_data()).unwrap();
+            }
+
+            db.write(batch).unwrap();
+        }
+
+        Ok(TempProofContext { db_lifetime, headers_store, ghostdag_stores, relations_stores, reachability_stores, ghostdag_managers })
+    }
+
+    fn populate_stores_for_validate_pruning_point_proof(
+        &self,
+        proof: &PruningPointProof,
+        ctx: &mut TempProofContext,
+        log_validating: bool,
+    ) -> PruningImportResult<Vec<Hash>> {
+        let headers_store = &ctx.headers_store;
+        let ghostdag_stores = &ctx.ghostdag_stores;
+        let mut relations_stores = ctx.relations_stores.clone();
+        let reachability_stores = &ctx.reachability_stores;
+        let ghostdag_managers = &ctx.ghostdag_managers;
+
+        let proof_pp_header = proof[0].last().expect("checked if empty");
+        let proof_pp = proof_pp_header.hash;
+
+        let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
+        for level in (0..=self.max_block_level).rev() {
+            // Before processing this level, check if the process is exiting so we can end early
+            if self.is_consensus_exiting.load(Ordering::Relaxed) {
+                return Err(PruningImportError::PruningValidationInterrupted);
+            }
+
+            if log_validating {
+                info!("Validating level {level} from the pruning point proof ({} headers)", proof[level as usize].len());
+            }
+            let level_idx = level as usize;
+            let mut selected_tip = None;
+            for (i, header) in proof[level as usize].iter().enumerate() {
+                let header_level = calc_block_level(header, self.max_block_level);
+                if header_level < level {
+                    return Err(PruningImportError::PruningProofWrongBlockLevel(header.hash, header_level, level));
+                }
+
+                headers_store.insert(header.hash, header.clone(), header_level).unwrap_or_exists();
+
+                let parents = self
+                    .parents_manager
+                    .parents_at_level(header, level)
+                    .iter()
+                    .copied()
+                    .filter(|parent| ghostdag_stores[level_idx].has(*parent).unwrap())
+                    .collect_vec();
+
+                // Only the first block at each level is allowed to have no known parents
+                if parents.is_empty() && i != 0 {
+                    return Err(PruningImportError::PruningProofHeaderWithNoKnownParents(header.hash, level));
+                }
+
+                let parents: BlockHashes = parents.push_if_empty(ORIGIN).into();
+
+                if relations_stores[level_idx].has(header.hash).unwrap() {
+                    return Err(PruningImportError::PruningProofDuplicateHeaderAtLevel(header.hash, level));
+                }
+
+                relations_stores[level_idx].insert(header.hash, parents.clone()).unwrap();
+                let ghostdag_data = Arc::new(ghostdag_managers[level_idx].ghostdag(&parents));
+                ghostdag_stores[level_idx].insert(header.hash, ghostdag_data.clone()).unwrap();
+                selected_tip = Some(match selected_tip {
+                    Some(tip) => ghostdag_managers[level_idx].find_selected_parent([tip, header.hash]),
+                    None => header.hash,
+                });
+
+                let mut reachability_mergeset = {
+                    let reachability_read = reachability_stores[level_idx].read();
+                    ghostdag_data
+                        .unordered_mergeset_without_selected_parent()
+                        .filter(|hash| reachability_read.has(*hash).unwrap())
+                        .collect_vec() // We collect to vector so reachability_read can be released and let `reachability::add_block` use a write lock.
+                        .into_iter()
+                };
+                reachability::add_block(
+                    reachability_stores[level_idx].write().deref_mut(),
+                    header.hash,
+                    ghostdag_data.selected_parent,
+                    &mut reachability_mergeset,
+                )
+                .unwrap();
+
+                if selected_tip.unwrap() == header.hash {
+                    reachability::hint_virtual_selected_parent(reachability_stores[level_idx].write().deref_mut(), header.hash)
+                        .unwrap();
+                }
+            }
+
+            if level < self.max_block_level {
+                let block_at_depth_m_at_next_level = self
+                    .block_at_depth(
+                        &*ghostdag_stores[level_idx + 1],
+                        selected_tip_by_level[level_idx + 1].unwrap(),
+                        self.pruning_proof_m,
+                    )
+                    .unwrap();
+                if !relations_stores[level_idx].has(block_at_depth_m_at_next_level).unwrap() {
+                    return Err(PruningImportError::PruningProofMissingBlockAtDepthMFromNextLevel(level, level + 1));
+                }
+            }
+
+            if selected_tip.unwrap() != proof_pp
+                && !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&selected_tip.unwrap())
+            {
+                return Err(PruningImportError::PruningProofMissesBlocksBelowPruningPoint(selected_tip.unwrap(), level));
+            }
+
+            selected_tip_by_level[level_idx] = selected_tip;
+        }
+
+        Ok(selected_tip_by_level.into_iter().map(|selected_tip| selected_tip.unwrap()).collect())
+    }
+
+    fn validate_proof_selected_tip(
+        &self,
+        proof_selected_tip: Hash,
+        level: BlockLevel,
+        proof_pp_level: BlockLevel,
+        proof_pp: Hash,
+        proof_pp_header: &Header,
+    ) -> PruningImportResult<()> {
+        // A proof selected tip of some level has to be the proof suggested prunint point itself if its level
+        // is lower or equal to the pruning point level, or a parent of the pruning point on the relevant level
+        // otherwise.
+        if level <= proof_pp_level {
+            if proof_selected_tip != proof_pp {
+                return Err(PruningImportError::PruningProofSelectedTipIsNotThePruningPoint(proof_selected_tip, level));
+            }
+        } else if !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&proof_selected_tip) {
+            return Err(PruningImportError::PruningProofSelectedTipNotParentOfPruningPoint(proof_selected_tip, level));
+        }
+
+        Ok(())
+    }
+
+    // find_proof_and_consensus_common_chain_ancestor_ghostdag_data returns an option of a tuple
+    // that contains the ghostdag data of the proof and current consensus common ancestor. If no
+    // such ancestor exists, it returns None.
+    fn find_proof_and_consensus_common_ancestor_ghostdag_data(
+        &self,
+        proof_ghostdag_stores: &[Arc<DbGhostdagStore>],
+        current_consensus_ghostdag_stores: &[Arc<DbGhostdagStore>],
+        proof_selected_tip: Hash,
+        level: BlockLevel,
+        proof_selected_tip_gd: CompactGhostdagData,
+    ) -> Option<(CompactGhostdagData, CompactGhostdagData)> {
+        let mut proof_current = proof_selected_tip;
+        let mut proof_current_gd = proof_selected_tip_gd;
+        loop {
+            match current_consensus_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap_option() {
+                Some(current_gd) => {
+                    break Some((proof_current_gd, current_gd));
+                }
+                None => {
+                    proof_current = proof_current_gd.selected_parent;
+                    if proof_current.is_origin() {
+                        break None;
+                    }
+                    proof_current_gd = proof_ghostdag_stores[level as usize].get_compact_data(proof_current).unwrap();
+                }
+            };
+        }
+    }
+}

From 0e4111622adba50a8161462eeb9013e032827125 Mon Sep 17 00:00:00 2001
From: coderofstuff <114628839+coderofstuff@users.noreply.github.com>
Date: Tue, 5 Nov 2024 18:45:40 -0700
Subject: [PATCH 5/5] Add comments for clarity

---
 consensus/src/processes/pruning_proof/apply.rs | 1 +
 consensus/src/processes/pruning_proof/mod.rs   | 2 ++
 2 files changed, 3 insertions(+)

diff --git a/consensus/src/processes/pruning_proof/apply.rs b/consensus/src/processes/pruning_proof/apply.rs
index f3d56d797e..8463d64647 100644
--- a/consensus/src/processes/pruning_proof/apply.rs
+++ b/consensus/src/processes/pruning_proof/apply.rs
@@ -148,6 +148,7 @@ impl PruningProofManager {
         let mut up_heap = BinaryHeap::with_capacity(capacity_estimate);
         for header in proof.iter().flatten().cloned() {
             if let Vacant(e) = dag.entry(header.hash) {
+                // TODO: Check if pow passes
                 let block_level = calc_block_level(&header, self.max_block_level);
                 self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
 
diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs
index 458b1ba7eb..2b3ba5f9d8 100644
--- a/consensus/src/processes/pruning_proof/mod.rs
+++ b/consensus/src/processes/pruning_proof/mod.rs
@@ -206,12 +206,14 @@ impl PruningProofManager {
         drop(pruning_point_write);
     }
 
+    // Used in apply and validate
     fn estimate_proof_unique_size(&self, proof: &PruningPointProof) -> usize {
         let approx_history_size = proof[0][0].daa_score;
         let approx_unique_full_levels = f64::log2(approx_history_size as f64 / self.pruning_proof_m as f64).max(0f64) as usize;
         proof.iter().map(|l| l.len()).sum::<usize>().min((approx_unique_full_levels + 1) * self.pruning_proof_m as usize)
     }
 
+    // Used in build and validate
     fn block_at_depth(
         &self,
         ghostdag_store: &impl GhostdagStoreReader,