Skip to content

Commit

Permalink
Various fixes to custody range sync (#6004)
Browse files Browse the repository at this point in the history
* Only start requesting batches when there are good peers across all custody columns to avoid spaming block requests.

* Add custody peer check before mutating `BatchInfo` to avoid inconsistent state.

* Add check to cover a case where batch is not processed while waiting for custody peers to become available.

* Fix lint and logic error

* Fix `good_peers_on_subnet` always returning false for `DataColumnSubnet`.

* Add test for `get_custody_peers_for_column`

* Revert epoch parameter refactor.

* Fall back to default custody requiremnt if peer ENR is not present.

* Add metrics and update code comment.

* Add more debug logs.

* Use subscribed peers on subnet before MetaDataV3 is implemented. Remove peer_id matching when injecting error because multiple peers are used for range requests. Use randomized custodial peer to avoid repeatedly sending requests to failing peers. Batch by range request where possible.

* Remove unused code and update docs.

* Add comment
  • Loading branch information
jimmygchen authored Jul 16, 2024
1 parent 018f382 commit 55a3be7
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ impl<E: EthSpec> PeerInfo<E> {
.syncnets()
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
}
// TODO(das) Add data column nets bitfield
Subnet::DataColumn(_) => return false,
Subnet::DataColumn(_) => {
// TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821
// We should use MetaDataV3 for peer selection rather than
// looking at subscribed peers (current behavior). Until MetaDataV3 is
// implemented, this is the perhaps the only viable option on the current devnet
// as the peer count is low and it's important to identify supernodes to get a
// good distribution of peers across subnets.
return true;
}
}
}
false
Expand Down
32 changes: 30 additions & 2 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use crate::{EnrExt, Subnet};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
Expand Down Expand Up @@ -120,6 +120,34 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
}

/// Returns a connected peer that:
/// 1. is connected
/// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP)
/// 3. has a good score
/// 4. subscribed to the specified column - this condition can be removed later, so we can
/// identify and penalise peers that are supposed to custody the column.
pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
self.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(
DataColumnSubnetId::from_column_index::<E>(column_index as usize, spec),
))
.cloned()
.collect::<Vec<_>>()
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -142,7 +170,7 @@ impl<E: EthSpec> NetworkGlobals<E> {

#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use super::*;
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ lazy_static! {
"Number of connected peers per sync status type",
&["sync_status"]
);
pub static ref PEERS_PER_COLUMN_SUBNET: Result<IntGaugeVec> = try_create_int_gauge_vec(
"peers_per_column_subnet",
"Number of connected peers per column subnet",
&["subnet_id"]
);
pub static ref SYNCING_CHAINS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_range_chains",
"Number of Syncing chains in range, per range type",
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(());
}
debug!(self.log, "Batch failed"; "batch_epoch" => batch_id, "error" => "rpc_error");
Expand Down Expand Up @@ -419,7 +421,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
batch
Expand Down
108 changes: 59 additions & 49 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{
Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
use rand::seq::SliceRandom;
use rand::thread_rng;
pub use requests::LookupVerifyError;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
Expand All @@ -38,8 +38,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -280,29 +279,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
let mut peer_ids = vec![];

for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<T::EthSpec>(&self.chain.spec);
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
let mut subnets = DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
enr.node_id().raw().into(),
custody_subnet_count,
&self.chain.spec,
);
if subnets.any(|subnet| {
subnet
.columns::<T::EthSpec>(&self.chain.spec)
.any(|index| index == column_index)
}) {
peer_ids.push(*peer_id)
}
}
}
self.network_globals()
.custody_peers_for_column(column_index, &self.chain.spec)
}

peer_ids
pub fn get_random_custodial_peer(
&self,
epoch: Epoch,
column_index: ColumnIndex,
) -> Option<PeerId> {
self.get_custodial_peers(epoch, column_index)
.choose(&mut thread_rng())
.cloned()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down Expand Up @@ -402,37 +390,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.network_globals()
.custody_columns(epoch, &self.chain.spec);

for column_index in &custody_indexes {
let custody_peer_ids = self.get_custodial_peers(epoch, *column_index);
let Some(custody_peer) = custody_peer_ids.first().cloned() else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoCustodyPeers);
};

requested_peers.push(custody_peer);
for (peer_id, columns_by_range_request) in
self.make_columns_by_range_requests(epoch, request, &custody_indexes)?
{
requested_peers.push(peer_id);

debug!(
self.log,
"Sending DataColumnsByRange requests";
"method" => "DataColumnsByRange",
"count" => request.count(),
"count" => columns_by_range_request.count,
"epoch" => epoch,
"index" => column_index,
"peer" => %custody_peer,
"columns" => ?columns_by_range_request.columns,
"peer" => %peer_id,
);

// Create the blob request based on the blocks request.
self.send_network_msg(NetworkMessage::SendRequest {
peer_id: custody_peer,
request: Request::DataColumnsByRange(DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns: vec![*column_index],
}),
peer_id,
request: Request::DataColumnsByRange(columns_by_range_request),
request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
Expand All @@ -453,6 +428,41 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(id)
}

fn make_columns_by_range_requests(
&self,
epoch: Epoch,
request: BlocksByRangeRequest,
custody_indexes: &Vec<ColumnIndex>,
) -> Result<HashMap<PeerId, DataColumnsByRangeRequest>, RpcRequestSendError> {
let mut peer_id_to_request_map = HashMap::new();

for column_index in custody_indexes {
// TODO(das): The peer selection logic here needs to be improved - we should probably
// avoid retrying from failed peers, however `BatchState` currently only tracks the peer
// serving the blocks.
let Some(custody_peer) = self.get_random_custodial_peer(epoch, *column_index) else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoCustodyPeers);
};

let columns_by_range_request = peer_id_to_request_map
.entry(custody_peer)
.or_insert_with(|| DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns: vec![],
});

columns_by_range_request.columns.push(*column_index);
}

Ok(peer_id_to_request_map)
}

pub fn range_request_failed(&mut self, request_id: Id) -> Option<RangeRequestId> {
let sender_id = self
.range_block_components_requests
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}

/// Verifies if an incoming block belongs to this batch.
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool {
if let BatchState::Downloading(expected_peer, expected_id) = &self.state {
return peer_id == expected_peer && expected_id == request_id;
pub fn is_expecting_block(&self, request_id: &Id) -> bool {
if let BatchState::Downloading(_, expected_id) = &self.state {
return expected_id == request_id;
}
false
}
Expand Down
63 changes: 60 additions & 3 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::metrics::PEERS_PER_COLUMN_SUBNET;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{
Expand All @@ -7,7 +8,8 @@ use crate::sync::{
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_metrics::set_int_gauge;
use lighthouse_network::{PeerAction, PeerId, Subnet};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
Expand Down Expand Up @@ -236,7 +238,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(KeepChain);
}
batch
Expand Down Expand Up @@ -414,6 +418,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.request_batches(network)?;
}
}
} else if !self.good_peers_on_custody_subnets(self.processing_target, network) {
// This is to handle the case where no batch was sent for the current processing
// target when there is no custody peers available. This is a valid state and should not
// return an error.
return Ok(KeepChain);
} else {
return Err(RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
Expand Down Expand Up @@ -819,7 +828,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
debug!(
self.log,
"Batch not expecting block";
Expand Down Expand Up @@ -1012,6 +1023,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_custody_subnets(epoch, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
Expand All @@ -1036,6 +1055,35 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in
/// every custody column subnet.
fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext<T>) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.count();

set_int_gauge(
&PEERS_PER_COLUMN_SUBNET,
&[&subnet_id.to_string()],
peer_count as i64,
);
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1066,6 +1114,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on custody subnets
if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down
Loading

0 comments on commit 55a3be7

Please sign in to comment.