Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 142 additions & 30 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
io::ErrorKind,
path::{Path, PathBuf},
sync::atomic::Ordering,
Expand All @@ -12,6 +12,7 @@ use ethrex_common::{
types::{AccountState, BlockBody, BlockHeader, Receipt, validate_block_body},
};
use ethrex_rlp::encode::RLPEncode;
use ethrex_storage::Store;
use ethrex_trie::Nibbles;
use ethrex_trie::{Node, verify_range};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -1279,16 +1280,18 @@ impl PeerHandler {
account_storages_snapshots_dir: &Path,
mut chunk_index: u64,
pivot_header: &mut BlockHeader,
store: Store,
) -> Result<u64, PeerHandlerError> {
*METRICS.current_step.lock().await = "Requesting Storage Ranges".to_string();
debug!("Starting request_storage_ranges function");
// 1) split the range in chunks of same length
let chunk_size = 300;
let chunk_size = 1600;
let chunk_count = (account_storage_roots.accounts_with_storage_root.len() / chunk_size) + 1;

// list of tasks to be executed
// Types are (start_index, end_index, starting_hash)
// NOTE: end_index is NOT inclusive

let mut tasks_queue_not_started = VecDeque::<StorageTask>::new();
for i in 0..chunk_count {
let chunk_start = chunk_size * i;
Expand Down Expand Up @@ -1318,7 +1321,7 @@ impl PeerHandler {
let mut completed_tasks = 0;

// TODO: in a refactor, delete this replace with a structure that can handle removes
let mut accounts_done: Vec<H256> = Vec::new();
let mut accounts_done: HashMap<H256, Vec<(H256, H256)>> = HashMap::new();
let current_account_hashes = account_storage_roots
.accounts_with_storage_root
.iter()
Expand Down Expand Up @@ -1384,8 +1387,10 @@ impl PeerHandler {

self.peer_table.free_peer(peer_id).await;

for account in &current_account_hashes[start_index..remaining_start] {
accounts_done.push(*account);
for account in current_account_hashes[start_index..remaining_start].iter() {
if !accounts_done.contains_key(account) {
accounts_done.insert(*account, vec![]);
}
}

if remaining_start < remaining_end {
Expand All @@ -1411,10 +1416,35 @@ impl PeerHandler {
};
tasks_queue_not_started.push_back(task);
task_count += 1;
accounts_done.push(current_account_hashes[remaining_start]);
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&current_account_hashes[remaining_start]).ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
for (old_start, end) in old_intervals {
if end == &hash_end {
*old_start = hash_start;
}
}
account_storage_roots
.healed_accounts
.insert(current_account_hashes[start_index]);
} else {
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&current_account_hashes[remaining_start])
.ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
old_intervals.remove(
old_intervals
.iter()
.position(|(_old_start, end)| end == &hash_end)
.ok_or(PeerHandlerError::UnrecoverableError(
"Could not find an old interval that we were tracking"
.to_owned(),
))?,
);
if old_intervals.is_empty() {
accounts_done
.insert(current_account_hashes[remaining_start], vec![]);
}
}
} else {
if remaining_start + 1 < remaining_end {
Expand Down Expand Up @@ -1445,27 +1475,95 @@ impl PeerHandler {

let chunk_count = (missing_storage_range / chunk_size).as_usize().max(1);

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
let start_hash = H256::from_uint(&start_hash_u256);
let end_hash = if i == chunk_count - 1 {
H256::repeat_byte(0xff)
let maybe_old_intervals = account_storage_roots
.accounts_with_storage_root
.get(&current_account_hashes[remaining_start]);

if let Some((_, old_intervals)) = maybe_old_intervals {
if !old_intervals.is_empty() {
for (start_hash, end_hash) in old_intervals {
let task = StorageTask {
start_index: remaining_start,
end_index: remaining_start + 1,
start_hash: *start_hash,
end_hash: Some(*end_hash),
};

tasks_queue_not_started.push_back(task);
task_count += 1;
}
} else {
let end_hash_u256 =
start_hash_u256.checked_add(chunk_size).unwrap_or(U256::MAX);
H256::from_uint(&end_hash_u256)
};

let task = StorageTask {
start_index: remaining_start,
end_index: remaining_start + 1,
start_hash,
end_hash: Some(end_hash),
};
tasks_queue_not_started.push_back(task);
task_count += 1;
// TODO: DRY
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please dry this

account_storage_roots.accounts_with_storage_root.insert(
current_account_hashes[remaining_start],
(None, vec![]),
);
let (_, intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&current_account_hashes[remaining_start])
.ok_or(PeerHandlerError::UnrecoverableError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
let start_hash = H256::from_uint(&start_hash_u256);
let end_hash = if i == chunk_count - 1 {
H256::repeat_byte(0xff)
} else {
let end_hash_u256 = start_hash_u256
.checked_add(chunk_size)
.unwrap_or(U256::MAX);
H256::from_uint(&end_hash_u256)
};

let task = StorageTask {
start_index: remaining_start,
end_index: remaining_start + 1,
start_hash,
end_hash: Some(end_hash),
};

intervals.push((start_hash, end_hash));

tasks_queue_not_started.push_back(task);
task_count += 1;
}
debug!("Split big storage account into {chunk_count} chunks.");
}
} else {
account_storage_roots
.accounts_with_storage_root
.insert(current_account_hashes[remaining_start], (None, vec![]));
let (_, intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&current_account_hashes[remaining_start])
.ok_or(PeerHandlerError::UnrecoverableError("Trie to get the old download intervals for an account but did not find them".to_owned()))?;

for i in 0..chunk_count {
let start_hash_u256 = start_hash_u256 + chunk_size * i;
let start_hash = H256::from_uint(&start_hash_u256);
let end_hash = if i == chunk_count - 1 {
H256::repeat_byte(0xff)
} else {
let end_hash_u256 = start_hash_u256
.checked_add(chunk_size)
.unwrap_or(U256::MAX);
H256::from_uint(&end_hash_u256)
};

let task = StorageTask {
start_index: remaining_start,
end_index: remaining_start + 1,
start_hash,
end_hash: Some(end_hash),
};

intervals.push((start_hash, end_hash));

tasks_queue_not_started.push_back(task);
task_count += 1;
}
debug!("Split big storage account into {chunk_count} chunks.");
}
debug!("Split big storage account into {chunk_count} chunks.");
}
}

Expand Down Expand Up @@ -1536,7 +1634,17 @@ impl PeerHandler {
.iter()
.skip(task.start_index)
.take(task.end_index - task.start_index)
.map(|(hash, root)| (*hash, *root))
.map(|(hash, (root, _))| match root {
Some(root) => (*hash, *root),
None => (
*hash,
store
.get_account_state_by_acc_hash(pivot_header.hash(), *hash)
.expect("Failed to get account in state trie")
.expect("Could not find account that should have been downloaded or healed")
.storage_root,
),
})
.unzip();

if task_count - completed_tasks < 30 {
Expand Down Expand Up @@ -1594,10 +1702,12 @@ impl PeerHandler {
.collect::<Result<Vec<()>, DumpError>>()
.map_err(PeerHandlerError::DumpError)?;

for account_done in accounts_done {
account_storage_roots
.accounts_with_storage_root
.remove(&account_done);
for (account_done, intervals) in accounts_done {
if intervals.is_empty() {
account_storage_roots
.accounts_with_storage_root
.remove(&account_done);
}
}

// Dropping the task sender so that the recv returns None
Expand Down Expand Up @@ -1990,6 +2100,8 @@ pub enum PeerHandlerError {
NoResponseFromPeer,
#[error("Dumping snapshots to disk failed {0:?}")]
DumpError(DumpError),
#[error("Encountered an unexpected error. This is a bug {0}")]
UnrecoverableError(String),
}

#[derive(Debug, Clone, std::hash::Hash)]
Expand Down
6 changes: 4 additions & 2 deletions crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ impl Syncer {
.zip(account_states.iter())
.filter_map(|(hash, state)| {
(state.storage_root != *EMPTY_TRIE_HASH)
.then_some((*hash, state.storage_root))
.then_some((*hash, (Some(state.storage_root), vec![])))
}),
);

Expand Down Expand Up @@ -999,6 +999,7 @@ impl Syncer {
account_storages_snapshots_dir.as_ref(),
chunk_index,
&mut pivot_header,
store.clone(),
)
.await
.map_err(SyncError::PeerHandler)?;
Expand Down Expand Up @@ -1338,11 +1339,12 @@ pub fn calculate_staleness_timestamp(timestamp: u64) -> u64 {
timestamp + (SNAP_LIMIT as u64 * 12)
}
#[derive(Debug, Default)]
#[allow(clippy::type_complexity)]
/// We store for optimization the accounts that need to heal storage
pub struct AccountStorageRoots {
/// The accounts that have not been healed are guaranteed to have the original storage root
/// we can read this storage root
pub accounts_with_storage_root: BTreeMap<H256, H256>,
pub accounts_with_storage_root: BTreeMap<H256, (Option<H256>, Vec<(H256, H256)>)>,
/// If an account has been healed, it may return to a previous state, so we just store the account
/// in a hashset
pub healed_accounts: HashSet<H256>,
Expand Down
7 changes: 5 additions & 2 deletions crates/networking/p2p/sync/state_healing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,12 @@ async fn heal_state_trie(
}

storage_accounts.healed_accounts.insert(account_hash);
storage_accounts
let old_value = storage_accounts
.accounts_with_storage_root
.remove(&account_hash);
.get_mut(&account_hash);
if let Some((old_root, _)) = old_value {
*old_root = None;
}
}
}
leafs_healed += nodes
Expand Down
20 changes: 0 additions & 20 deletions crates/networking/p2p/sync/storage_healing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,26 +543,6 @@ fn get_initial_downloads(
})
.collect::<VecDeque<_>>(),
);
initial_requests.extend(
account_paths
.accounts_with_storage_root
.par_iter()
.filter_map(|(acc_path, storage_root)| {
if store
.contains_storage_node(*acc_path, *storage_root)
.expect("We should be able to open the store")
{
return None;
}
Some(NodeRequest {
acc_path: Nibbles::from_bytes(&acc_path.0),
storage_path: Nibbles::default(), // We need to be careful, the root parent is a special case
parent: Nibbles::default(),
hash: *storage_root,
})
})
.collect::<VecDeque<_>>(),
);
initial_requests
}

Expand Down