Skip to content

Commit

Permalink
Avoid spamming blocks by range request until there's available peer o…
Browse files Browse the repository at this point in the history
…n all custody subnets (#6004)

Squashed commit of the following:

commit 129ac92
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Mon Jul 1 12:49:03 2024 +1000

    Add metrics and update code comment.

commit 03a9dce
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Mon Jul 1 12:29:30 2024 +1000

    Fall back to default custody requiremnt if peer ENR is not present.

commit 4373a28
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Sat Jun 29 01:05:52 2024 +1000

    Revert epoch parameter refactor.

commit 876ea3b
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 17:21:04 2024 +1000

    Add test for `get_custody_peers_for_column`

commit de05355
Merge: 4079d2e 7206909
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 16:06:28 2024 +1000

    Merge branch 'das' into custody-sync-peers

commit 4079d2e
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 15:35:39 2024 +1000

    Fix `good_peers_on_subnet` always returning false for `DataColumnSubnet`.

commit 9f20029
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 14:27:04 2024 +1000

    Fix lint and logic error

commit 05608b0
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 13:51:33 2024 +1000

    Add check to cover a case where batch is not processed while waiting for custody peers to become available.

commit 9f82497
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 12:57:06 2024 +1000

    Add custody peer check before mutating `BatchInfo` to avoid inconsistent state.

commit edc584a
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Thu Jun 27 10:32:19 2024 +1000

    Only start requesting batches when there are good peers across all custody columns to avoid spaming block requests.
  • Loading branch information
jimmygchen committed Jul 1, 2024
1 parent b06c23e commit 733e9b7
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 32 deletions.
32 changes: 32 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
#[cfg(test)]
use crate::EnrExt;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -702,6 +704,36 @@ impl<E: EthSpec> PeerDB<E> {
)
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
#[cfg(test)]
pub(crate) fn __add_connected_peer_enr_testing_only(
&mut self,
enr: Enr,
) -> Option<BanOperation> {
let peer_id = enr.peer_id();
let new_state = NewConnectionState::Connected {
enr: Some(enr),
seen_address: Multiaddr::empty(),
direction: ConnectionDirection::Outgoing,
};
self.update_connection_state(&peer_id, new_state)
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
#[cfg(test)]
pub(crate) fn __add_connected_peer_multiaddr_testing_only(
&mut self,
peer_id: &PeerId,
multiaddr: Multiaddr,
) -> Option<BanOperation> {
let new_state = NewConnectionState::Connected {
enr: None,
seen_address: multiaddr,
direction: ConnectionDirection::Outgoing,
};
self.update_connection_state(peer_id, new_state)
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
/// variables are in sync with libp2p.
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ impl<E: EthSpec> PeerInfo<E> {
.syncnets()
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
}
// TODO(das) Add data column nets bitfield
Subnet::DataColumn(_) => return false,
Subnet::DataColumn(_) => {
// There's no metadata field for data column subnets, we use the `csc` enr field
// along with `node_id` to determine whether peer SHOULD be subscribed to a
// given subnet.
return true;
}
}
}
false
Expand Down
123 changes: 122 additions & 1 deletion beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use discv5::handler::NodeContact;
use itertools::Itertools;
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
Expand Down Expand Up @@ -120,6 +122,55 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
}

pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
self.peers
.read()
.connected_peers()
.filter_map(|(peer_id, peer_info)| {
let node_id_and_csc = if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
Some((enr.node_id(), custody_subnet_count))
} else if let Some(node_contact) = peer_info
.seen_multiaddrs()
.last()
.cloned()
.and_then(|multiaddr| NodeContact::try_from_multiaddr(multiaddr).ok())
{
let node_id = node_contact.node_id();
// TODO(das): Use `custody_subnet_count` from `MetaDataV3` before
// falling back to minimum custody requirement.
Some((node_id, spec.custody_requirement))
} else {
None
};

node_id_and_csc.and_then(|(node_id, custody_subnet_count)| {
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
DataColumnSubnetId::compute_custody_columns::<E>(
node_id.raw().into(),
custody_subnet_count,
spec,
)
.contains(&column_index)
.then_some(*peer_id)
})
})
.collect::<Vec<_>>()
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -142,7 +193,8 @@ impl<E: EthSpec> NetworkGlobals<E> {

#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use super::*;
use std::str::FromStr;
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
Expand All @@ -160,4 +212,73 @@ mod test {
default_custody_requirement_column_count as usize
);
}

#[test]
fn custody_peers_for_column_enr_present() {
let spec = E::default_spec();
let log = logging::test_logger();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);

let mut peers_db_write_lock = globals.peers.write();
let valid_enrs = [
"enr:-Mm4QDJpcg5mZ8EFeYuDcUX78tOTigHLz4_zJlCY7vOTd2-XPPqlAoWM02Us69c4ov85pHgTgeo77Z3_nAhJ4yF1y30Bh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR0iXNlY3AyNTZrMaECvF7Y-fD1MEEVQq3y5qW7C8UoTsq6J_tfwvQIJ5fo1TGIc3luY25ldHMAg3RjcIKUc4N1ZHCClHM",
"enr:-Mm4QBw4saycbk-Up2PvppJOv0KzBqgFFHl6_OfFlh8_HxtwWkZpSFgJ0hFV3qOelh_Ai4L9HhSAEJSG48LE8YJ-7WABh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR1iXNlY3AyNTZrMaECsRjhgRrAuRWelB9VTTzTa0tHtcWyLTLSReL4RNWhJgGIc3luY25ldHMAg3RjcIKUdIN1ZHCClHQ",
"enr:-Mm4QMFlqbpGrmN21EM-70_hDW9c3MrulhIZElmsP3kb7XSLOEmV7-Msj2jlwGR5C_TicwOXYsZrN6eEIJlGgluM_XgBh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU",
"enr:-Mm4QEHdVjmQ7mH2qIX7_6SDablQUcrZuA4Sxjprh9WGbipfHUjPrELtBaRIRJUrpI8cgJRoAF1wMwoeRS7j3d8xviRGh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU"
];
let peers = valid_enrs
.into_iter()
.map(|enr_str| {
let enr = Enr::from_str(enr_str).unwrap();
let peer_id = enr.peer_id();
peers_db_write_lock.__add_connected_peer_enr_testing_only(enr);
peer_id
})
.collect::<Vec<_>>();

drop(peers_db_write_lock);
let [supernode_peer_1, supernode_peer_2, _, _] =
peers.try_into().expect("expected exactly 4 peer ids");

for col_index in 0..spec.number_of_columns {
let custody_peers = globals.custody_peers_for_column(col_index as ColumnIndex, &spec);
assert!(
custody_peers.contains(&supernode_peer_1),
"must at least return supernode peer"
);
assert!(
custody_peers.contains(&supernode_peer_2),
"must at least return supernode peer"
);
}
}

// If ENR is not preset, fallback to deriving node_id and use `spec.custody_requirement`.
#[test]
fn custody_peers_for_column_no_enr_use_default() {
let spec = E::default_spec();
let log = logging::test_logger();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);

// Add peer without enr
let peer_id_str = "16Uiu2HAm86zWajwnBFD8uxkRpxhRzeUEf6Brfz2VBxGAaWx9ejyr";
let peer_id = PeerId::from_str(peer_id_str).unwrap();
let multiaddr =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/9000/p2p/{peer_id_str}")).unwrap();

let mut peers_db_write_lock = globals.peers.write();
peers_db_write_lock.__add_connected_peer_multiaddr_testing_only(&peer_id, multiaddr);
drop(peers_db_write_lock);

let custody_subnets = (0..spec.data_column_sidecar_subnet_count)
.filter(|col_index| {
!globals
.custody_peers_for_column(*col_index, &spec)
.is_empty()
})
.count();

// The single peer's custody subnet should match custody_requirement.
assert_eq!(custody_subnets, spec.custody_requirement as usize);
}
}
5 changes: 5 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ lazy_static! {
"Number of connected peers per sync status type",
&["sync_status"]
);
pub static ref PEERS_PER_COLUMN_SUBNET: Result<IntGaugeVec> = try_create_int_gauge_vec(
"peers_per_column_subnet",
"Number of connected peers per column subnet",
&["subnet_id"]
);
pub static ref SYNCING_CHAINS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_range_chains",
"Number of Syncing chains in range, per range type",
Expand Down
32 changes: 4 additions & 28 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{
Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
Expand All @@ -38,8 +36,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -240,29 +237,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
let mut peer_ids = vec![];

for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<T::EthSpec>(&self.chain.spec);
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
let mut subnets = DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
enr.node_id().raw().into(),
custody_subnet_count,
&self.chain.spec,
);
if subnets.any(|subnet| {
subnet
.columns::<T::EthSpec>(&self.chain.spec)
.any(|index| index == column_index)
}) {
peer_ids.push(*peer_id)
}
}
}

peer_ids
self.network_globals()
.custody_peers_for_column(column_index, &self.chain.spec)
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down
55 changes: 54 additions & 1 deletion beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::metrics::PEERS_PER_COLUMN_SUBNET;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{
Expand All @@ -7,7 +8,8 @@ use crate::sync::{
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_metrics::set_int_gauge;
use lighthouse_network::{PeerAction, PeerId, Subnet};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
Expand Down Expand Up @@ -396,6 +398,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.request_batches(network)?;
}
}
} else if !self.good_peers_on_custody_subnets(self.processing_target, network) {
// This is to handle the case where no batch was sent for the current processing
// target when there is no custody peers available. This is a valid state and should not
// return an error.
return Ok(KeepChain);
} else {
return Err(RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
Expand Down Expand Up @@ -994,6 +1001,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_custody_subnets(epoch, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
Expand All @@ -1018,6 +1033,35 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in
/// every custody column subnet.
fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext<T>) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.count();

set_int_gauge(
&PEERS_PER_COLUMN_SUBNET,
&[&subnet_id.to_string()],
peer_count as i64,
);
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1048,6 +1092,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on custody subnets
if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down

0 comments on commit 733e9b7

Please sign in to comment.