Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add peers to parent lookups #5858

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ pub struct BlockLookups<T: BeaconChainTypes> {
log: Logger,
}

#[cfg(test)]
/// Tuple of `SingleLookupId`, requested block root, awaiting parent block root (if any),
/// and list of peers that claim to have imported this set of block components.
pub(crate) type BlockLookupSummary = (Id, Hash256, Option<Hash256>, Vec<PeerId>);
dapplion marked this conversation as resolved.
Show resolved Hide resolved

impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn new(log: Logger) -> Self {
Self {
Expand All @@ -107,10 +112,17 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

#[cfg(test)]
pub(crate) fn active_single_lookups(&self) -> Vec<(Id, Hash256, Option<Hash256>)> {
pub(crate) fn active_single_lookups(&self) -> Vec<BlockLookupSummary> {
self.single_block_lookups
.iter()
.map(|(id, e)| (*id, e.block_root(), e.awaiting_parent()))
.map(|(id, l)| {
(
*id,
l.block_root(),
l.awaiting_parent(),
l.all_peers().copied().collect(),
)
})
.collect()
}

Expand Down Expand Up @@ -244,24 +256,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

// Do not re-request a block that is already being requested
if let Some((_, lookup)) = self
if let Some((&lookup_id, lookup)) = self
.single_block_lookups
.iter_mut()
.find(|(_id, lookup)| lookup.is_for_block(block_root))
{
for peer in peers {
if lookup.add_peer(*peer) {
debug!(self.log, "Adding peer to existing single block lookup"; "block_root" => ?block_root, "peer" => ?peer);
}
}

if let Some(block_component) = block_component {
let component_type = block_component.get_type();
let imported = lookup.add_child_components(block_component);
if !imported {
debug!(self.log, "Lookup child component ignored"; "block_root" => ?block_root, "type" => component_type);
}
}

if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers) {
warn!(self.log, "Error adding peers to ancestor lookup"; "error" => ?e);
}

return true;
}

Expand Down Expand Up @@ -797,9 +808,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Recursively find the oldest ancestor lookup of another lookup
fn find_oldest_ancestor_lookup<'a>(
&'a self,
stuck_lookup: &'a SingleBlockLookup<T>,
lookup: &'a SingleBlockLookup<T>,
) -> Result<&'a SingleBlockLookup<T>, String> {
if let Some(awaiting_parent) = stuck_lookup.awaiting_parent() {
if let Some(awaiting_parent) = lookup.awaiting_parent() {
if let Some(lookup) = self
.single_block_lookups
.values()
Expand All @@ -812,7 +823,50 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
))
}
} else {
Ok(stuck_lookup)
Ok(lookup)
}
}

/// Adds peers to a lookup and its ancestors recursively.
/// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having
/// to duplicate the code to add peers to a lookup
fn add_peers_to_lookup_and_ancestors(
&mut self,
lookup_id: SingleLookupId,
peers: &[PeerId],
) -> Result<(), String> {
let lookup = self
.single_block_lookups
.get_mut(&lookup_id)
.ok_or(format!("Unknown lookup for id {lookup_id}"))?;

for peer in peers {
if lookup.add_peer(*peer) {
debug!(self.log, "Adding peer to existing single block lookup";
"block_root" => ?lookup.block_root(),
"peer" => ?peer
);
}
}

// We may choose to attempt to continue a lookup here. It is possible that a lookup had zero
// peers and after adding this set of peers it can make progress again. Note that this
// recursive function iterates from child to parent, so continuing the child first is weird.
// However, we choose to not attempt to continue the lookup for simplicity. It's not
// strictly required and just and optimization for a rare corner case.

if let Some(parent_root) = lookup.awaiting_parent() {
if let Some((&child_id, _)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == parent_root)
{
self.add_peers_to_lookup_and_ancestors(child_id, peers)
} else {
Err(format!("Lookup references unknown parent {parent_root:?}"))
}
} else {
Ok(())
}
}
}
39 changes: 37 additions & 2 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl TestRig {
self.sync_manager.handle_message(sync_message);
}

fn active_single_lookups(&self) -> Vec<(Id, Hash256, Option<Hash256>)> {
fn active_single_lookups(&self) -> Vec<BlockLookupSummary> {
self.sync_manager.active_single_lookups()
}

Expand Down Expand Up @@ -252,6 +252,21 @@ impl TestRig {
}
}

fn assert_lookup_peers(&self, block_root: Hash256, mut expected_peers: Vec<PeerId>) {
let mut lookup = self
.sync_manager
.active_single_lookups()
.into_iter()
.find(|l| l.1 == block_root)
.unwrap_or_else(|| panic!("no lookup for {block_root}"));
lookup.3.sort();
expected_peers.sort();
assert_eq!(
lookup.3, expected_peers,
"unexpected peers on lookup {block_root}"
);
}

fn insert_failed_chain(&mut self, block_root: Hash256) {
self.sync_manager.insert_failed_chain(block_root);
}
Expand All @@ -270,7 +285,7 @@ impl TestRig {
fn find_single_lookup_for(&self, block_root: Hash256) -> Id {
self.active_single_lookups()
.iter()
.find(|(_, b, _)| b == &block_root)
.find(|l| l.1 == block_root)
.unwrap_or_else(|| panic!("no single block lookup found for {block_root}"))
.0
}
Expand Down Expand Up @@ -1305,6 +1320,26 @@ fn test_lookup_disconnection_peer_left() {
rig.assert_single_lookups_count(1);
}

#[test]
fn test_lookup_add_peers_to_parent() {
let mut r = TestRig::test_setup();
let peer_id_1 = r.new_connected_peer();
let peer_id_2 = r.new_connected_peer();
let blocks = r.rand_blockchain(5);
let last_block_root = blocks.last().unwrap().canonical_root();
// Create a chain of lookups
for block in &blocks {
r.trigger_unknown_parent_block(peer_id_1, block.clone());
}
r.trigger_unknown_block_from_attestation(last_block_root, peer_id_2);
for block in blocks.iter().take(blocks.len() - 1) {
// Parent has the original unknown parent event peer + new peer
r.assert_lookup_peers(block.canonical_root(), vec![peer_id_1, peer_id_2]);
}
// Child lookup only has the unknown attestation peer
r.assert_lookup_peers(last_block_root, vec![peer_id_2]);
}

#[test]
fn test_skip_creating_failed_parent_lookup() {
let mut rig = TestRig::test_setup();
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}

#[cfg(test)]
pub(crate) fn active_single_lookups(&self) -> Vec<(Id, Hash256, Option<Hash256>)> {
pub(crate) fn active_single_lookups(&self) -> Vec<super::block_lookups::BlockLookupSummary> {
self.block_lookups.active_single_lookups()
}

Expand Down
Loading