diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index a56e968cba..bdd4c56507 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, io::ErrorKind, path::{Path, PathBuf}, sync::atomic::Ordering, @@ -12,6 +12,7 @@ use ethrex_common::{ types::{AccountState, BlockBody, BlockHeader, Receipt, validate_block_body}, }; use ethrex_rlp::encode::RLPEncode; +use ethrex_storage::Store; use ethrex_trie::Nibbles; use ethrex_trie::{Node, verify_range}; use rand::seq::SliceRandom; @@ -1279,16 +1280,18 @@ impl PeerHandler { account_storages_snapshots_dir: &Path, mut chunk_index: u64, pivot_header: &mut BlockHeader, + store: Store, ) -> Result { *METRICS.current_step.lock().await = "Requesting Storage Ranges".to_string(); debug!("Starting request_storage_ranges function"); // 1) split the range in chunks of same length - let chunk_size = 300; + let chunk_size = 1600; let chunk_count = (account_storage_roots.accounts_with_storage_root.len() / chunk_size) + 1; // list of tasks to be executed // Types are (start_index, end_index, starting_hash) // NOTE: end_index is NOT inclusive + let mut tasks_queue_not_started = VecDeque::::new(); for i in 0..chunk_count { let chunk_start = chunk_size * i; @@ -1318,7 +1321,7 @@ impl PeerHandler { let mut completed_tasks = 0; // TODO: in a refactor, delete this replace with a structure that can handle removes - let mut accounts_done: Vec = Vec::new(); + let mut accounts_done: HashMap> = HashMap::new(); let current_account_hashes = account_storage_roots .accounts_with_storage_root .iter() @@ -1384,8 +1387,10 @@ impl PeerHandler { self.peer_table.free_peer(peer_id).await; - for account in ¤t_account_hashes[start_index..remaining_start] { - accounts_done.push(*account); + for account in current_account_hashes[start_index..remaining_start].iter() { + if !accounts_done.contains_key(account) { + accounts_done.insert(*account, vec![]); + } } if remaining_start < remaining_end { @@ -1411,10 +1416,35 @@ impl PeerHandler { }; tasks_queue_not_started.push_back(task); task_count += 1; - accounts_done.push(current_account_hashes[remaining_start]); + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(¤t_account_hashes[remaining_start]).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + for (old_start, end) in old_intervals { + if end == &hash_end { + *old_start = hash_start; + } + } account_storage_roots .healed_accounts .insert(current_account_hashes[start_index]); + } else { + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(¤t_account_hashes[remaining_start]) + .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + old_intervals.remove( + old_intervals + .iter() + .position(|(_old_start, end)| end == &hash_end) + .ok_or(PeerHandlerError::UnrecoverableError( + "Could not find an old interval that we were tracking" + .to_owned(), + ))?, + ); + if old_intervals.is_empty() { + accounts_done + .insert(current_account_hashes[remaining_start], vec![]); + } } } else { if remaining_start + 1 < remaining_end { @@ -1445,27 +1475,95 @@ impl PeerHandler { let chunk_count = (missing_storage_range / chunk_size).as_usize().max(1); - for i in 0..chunk_count { - let start_hash_u256 = start_hash_u256 + chunk_size * i; - let start_hash = H256::from_uint(&start_hash_u256); - let end_hash = if i == chunk_count - 1 { - H256::repeat_byte(0xff) + let maybe_old_intervals = account_storage_roots + .accounts_with_storage_root + .get(¤t_account_hashes[remaining_start]); + + if let Some((_, old_intervals)) = maybe_old_intervals { + if !old_intervals.is_empty() { + for (start_hash, end_hash) in old_intervals { + let task = StorageTask { + start_index: remaining_start, + end_index: remaining_start + 1, + start_hash: *start_hash, + end_hash: Some(*end_hash), + }; + + tasks_queue_not_started.push_back(task); + task_count += 1; + } } else { - let end_hash_u256 = - start_hash_u256.checked_add(chunk_size).unwrap_or(U256::MAX); - H256::from_uint(&end_hash_u256) - }; - - let task = StorageTask { - start_index: remaining_start, - end_index: remaining_start + 1, - start_hash, - end_hash: Some(end_hash), - }; - tasks_queue_not_started.push_back(task); - task_count += 1; + // TODO: DRY + account_storage_roots.accounts_with_storage_root.insert( + current_account_hashes[remaining_start], + (None, vec![]), + ); + let (_, intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(¤t_account_hashes[remaining_start]) + .ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + + for i in 0..chunk_count { + let start_hash_u256 = start_hash_u256 + chunk_size * i; + let start_hash = H256::from_uint(&start_hash_u256); + let end_hash = if i == chunk_count - 1 { + H256::repeat_byte(0xff) + } else { + let end_hash_u256 = start_hash_u256 + .checked_add(chunk_size) + .unwrap_or(U256::MAX); + H256::from_uint(&end_hash_u256) + }; + + let task = StorageTask { + start_index: remaining_start, + end_index: remaining_start + 1, + start_hash, + end_hash: Some(end_hash), + }; + + intervals.push((start_hash, end_hash)); + + tasks_queue_not_started.push_back(task); + task_count += 1; + } + debug!("Split big storage account into {chunk_count} chunks."); + } + } else { + account_storage_roots + .accounts_with_storage_root + .insert(current_account_hashes[remaining_start], (None, vec![])); + let (_, intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(¤t_account_hashes[remaining_start]) + .ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?; + + for i in 0..chunk_count { + let start_hash_u256 = start_hash_u256 + chunk_size * i; + let start_hash = H256::from_uint(&start_hash_u256); + let end_hash = if i == chunk_count - 1 { + H256::repeat_byte(0xff) + } else { + let end_hash_u256 = start_hash_u256 + .checked_add(chunk_size) + .unwrap_or(U256::MAX); + H256::from_uint(&end_hash_u256) + }; + + let task = StorageTask { + start_index: remaining_start, + end_index: remaining_start + 1, + start_hash, + end_hash: Some(end_hash), + }; + + intervals.push((start_hash, end_hash)); + + tasks_queue_not_started.push_back(task); + task_count += 1; + } + debug!("Split big storage account into {chunk_count} chunks."); } - debug!("Split big storage account into {chunk_count} chunks."); } } @@ -1536,7 +1634,17 @@ impl PeerHandler { .iter() .skip(task.start_index) .take(task.end_index - task.start_index) - .map(|(hash, root)| (*hash, *root)) + .map(|(hash, (root, _))| match root { + Some(root) => (*hash, *root), + None => ( + *hash, + store + .get_account_state_by_acc_hash(pivot_header.hash(), *hash) + .expect("Failed to get account in state trie") + .expect("Could not find account that should have been downloaded or healed") + .storage_root, + ), + }) .unzip(); if task_count - completed_tasks < 30 { @@ -1594,10 +1702,12 @@ impl PeerHandler { .collect::, DumpError>>() .map_err(PeerHandlerError::DumpError)?; - for account_done in accounts_done { - account_storage_roots - .accounts_with_storage_root - .remove(&account_done); + for (account_done, intervals) in accounts_done { + if intervals.is_empty() { + account_storage_roots + .accounts_with_storage_root + .remove(&account_done); + } } // Dropping the task sender so that the recv returns None @@ -1990,6 +2100,8 @@ pub enum PeerHandlerError { NoResponseFromPeer, #[error("Dumping snapshots to disk failed {0:?}")] DumpError(DumpError), + #[error("Encountered an unexpected error. This is a bug {0}")] + UnrecoverableError(String), } #[derive(Debug, Clone, std::hash::Hash)] diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 1e03858383..100f45fed8 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -905,7 +905,7 @@ impl Syncer { .zip(account_states.iter()) .filter_map(|(hash, state)| { (state.storage_root != *EMPTY_TRIE_HASH) - .then_some((*hash, state.storage_root)) + .then_some((*hash, (Some(state.storage_root), vec![]))) }), ); @@ -999,6 +999,7 @@ impl Syncer { account_storages_snapshots_dir.as_ref(), chunk_index, &mut pivot_header, + store.clone(), ) .await .map_err(SyncError::PeerHandler)?; @@ -1338,11 +1339,12 @@ pub fn calculate_staleness_timestamp(timestamp: u64) -> u64 { timestamp + (SNAP_LIMIT as u64 * 12) } #[derive(Debug, Default)] +#[allow(clippy::type_complexity)] /// We store for optimization the accounts that need to heal storage pub struct AccountStorageRoots { /// The accounts that have not been healed are guaranteed to have the original storage root /// we can read this storage root - pub accounts_with_storage_root: BTreeMap, + pub accounts_with_storage_root: BTreeMap, Vec<(H256, H256)>)>, /// If an account has been healed, it may return to a previous state, so we just store the account /// in a hashset pub healed_accounts: HashSet, diff --git a/crates/networking/p2p/sync/state_healing.rs b/crates/networking/p2p/sync/state_healing.rs index 9ea8c6caa1..08214734b9 100644 --- a/crates/networking/p2p/sync/state_healing.rs +++ b/crates/networking/p2p/sync/state_healing.rs @@ -181,9 +181,12 @@ async fn heal_state_trie( } storage_accounts.healed_accounts.insert(account_hash); - storage_accounts + let old_value = storage_accounts .accounts_with_storage_root - .remove(&account_hash); + .get_mut(&account_hash); + if let Some((old_root, _)) = old_value { + *old_root = None; + } } } leafs_healed += nodes diff --git a/crates/networking/p2p/sync/storage_healing.rs b/crates/networking/p2p/sync/storage_healing.rs index 13474c4649..a331951d77 100644 --- a/crates/networking/p2p/sync/storage_healing.rs +++ b/crates/networking/p2p/sync/storage_healing.rs @@ -543,26 +543,6 @@ fn get_initial_downloads( }) .collect::>(), ); - initial_requests.extend( - account_paths - .accounts_with_storage_root - .par_iter() - .filter_map(|(acc_path, storage_root)| { - if store - .contains_storage_node(*acc_path, *storage_root) - .expect("We should be able to open the store") - { - return None; - } - Some(NodeRequest { - acc_path: Nibbles::from_bytes(&acc_path.0), - storage_path: Nibbles::default(), // We need to be careful, the root parent is a special case - parent: Nibbles::default(), - hash: *storage_root, - }) - }) - .collect::>(), - ); initial_requests }