Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes to custody range sync #6004

Merged
merged 17 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ impl<E: EthSpec> PeerInfo<E> {
.syncnets()
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
}
// TODO(das) Add data column nets bitfield
Subnet::DataColumn(_) => return false,
Subnet::DataColumn(_) => {
// TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821
// We should use MetaDataV3 for peer selection rather than
// looking at subscribed peers (current behavior). Until MetaDataV3 is
// implemented, this is the perhaps the only viable option on the current devnet
// as the peer count is low and it's important to identify supernodes to get a
// good distribution of peers across subnets.
return true;
}
}
}
false
Expand Down
32 changes: 30 additions & 2 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use crate::{EnrExt, Subnet};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
Expand Down Expand Up @@ -120,6 +120,34 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
}

/// Returns a connected peer that:
/// 1. is connected
/// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP)
/// 3. has a good score
/// 4. subscribed to the specified column - this condition can be removed later, so we can
/// identify and penalise peers that are supposed to custody the column.
pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
self.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(
DataColumnSubnetId::from_column_index::<E>(column_index as usize, spec),
))
.cloned()
.collect::<Vec<_>>()
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -142,7 +170,7 @@ impl<E: EthSpec> NetworkGlobals<E> {

#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use super::*;
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ lazy_static! {
"Number of connected peers per sync status type",
&["sync_status"]
);
pub static ref PEERS_PER_COLUMN_SUBNET: Result<IntGaugeVec> = try_create_int_gauge_vec(
"peers_per_column_subnet",
"Number of connected peers per column subnet",
&["subnet_id"]
);
pub static ref SYNCING_CHAINS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_range_chains",
"Number of Syncing chains in range, per range type",
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(());
}
debug!(self.log, "Batch failed"; "batch_epoch" => batch_id, "error" => "rpc_error");
Expand Down Expand Up @@ -419,7 +421,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
batch
Expand Down
108 changes: 59 additions & 49 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{
Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
use rand::seq::SliceRandom;
use rand::thread_rng;
pub use requests::LookupVerifyError;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
Expand All @@ -38,8 +38,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -280,29 +279,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
let mut peer_ids = vec![];

for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<T::EthSpec>(&self.chain.spec);
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
let mut subnets = DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
enr.node_id().raw().into(),
custody_subnet_count,
&self.chain.spec,
);
if subnets.any(|subnet| {
subnet
.columns::<T::EthSpec>(&self.chain.spec)
.any(|index| index == column_index)
}) {
peer_ids.push(*peer_id)
}
}
}
self.network_globals()
.custody_peers_for_column(column_index, &self.chain.spec)
}

peer_ids
pub fn get_random_custodial_peer(
&self,
epoch: Epoch,
column_index: ColumnIndex,
) -> Option<PeerId> {
self.get_custodial_peers(epoch, column_index)
.choose(&mut thread_rng())
.cloned()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down Expand Up @@ -402,37 +390,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.network_globals()
.custody_columns(epoch, &self.chain.spec);

for column_index in &custody_indexes {
let custody_peer_ids = self.get_custodial_peers(epoch, *column_index);
let Some(custody_peer) = custody_peer_ids.first().cloned() else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoCustodyPeers);
};

requested_peers.push(custody_peer);
for (peer_id, columns_by_range_request) in
self.make_columns_by_range_requests(epoch, request, &custody_indexes)?
{
requested_peers.push(peer_id);

debug!(
self.log,
"Sending DataColumnsByRange requests";
"method" => "DataColumnsByRange",
"count" => request.count(),
"count" => columns_by_range_request.count,
"epoch" => epoch,
"index" => column_index,
"peer" => %custody_peer,
"columns" => ?columns_by_range_request.columns,
"peer" => %peer_id,
);

// Create the blob request based on the blocks request.
self.send_network_msg(NetworkMessage::SendRequest {
peer_id: custody_peer,
request: Request::DataColumnsByRange(DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns: vec![*column_index],
}),
peer_id,
request: Request::DataColumnsByRange(columns_by_range_request),
request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)),
})
.map_err(|_| RpcRequestSendError::NetworkSendError)?;
Expand All @@ -453,6 +428,41 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(id)
}

fn make_columns_by_range_requests(
&self,
epoch: Epoch,
request: BlocksByRangeRequest,
custody_indexes: &Vec<ColumnIndex>,
) -> Result<HashMap<PeerId, DataColumnsByRangeRequest>, RpcRequestSendError> {
let mut peer_id_to_request_map = HashMap::new();

for column_index in custody_indexes {
// TODO(das): The peer selection logic here needs to be improved - we should probably
// avoid retrying from failed peers, however `BatchState` currently only tracks the peer
// serving the blocks.
let Some(custody_peer) = self.get_random_custodial_peer(epoch, *column_index) else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's potentially an issue with the peer selection approach here - as peer selection was previously done one layer up:

Are these selection rules important? I've ran into some issues with concurrent requests to the same peer with the approach on this branch, and need to investigate further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with Age, and it should be fine to send requests to peers that aren't "idle".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The peer selection logic here isn't ideal - we should probably avoid retrying peers that failed to return a response. Perhaps we could pass the sorted peer list into this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment for this, probably requires some thoughts and refactoring as we don't currently track data_column_by_range peers in BatchState.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: we might also need to apply some of these changes to backfill sync.

// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
// minutes / seconds to the peer manager to locate peers on this subnet before
// abandoing progress on the chain completely.
return Err(RpcRequestSendError::NoCustodyPeers);
};

let columns_by_range_request = peer_id_to_request_map
.entry(custody_peer)
.or_insert_with(|| DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns: vec![],
});

columns_by_range_request.columns.push(*column_index);
}

Ok(peer_id_to_request_map)
}

pub fn range_request_failed(&mut self, request_id: Id) -> Option<RangeRequestId> {
let sender_id = self
.range_block_components_requests
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}

/// Verifies if an incoming block belongs to this batch.
pub fn is_expecting_block(&self, peer_id: &PeerId, request_id: &Id) -> bool {
if let BatchState::Downloading(expected_peer, expected_id) = &self.state {
return peer_id == expected_peer && expected_id == request_id;
pub fn is_expecting_block(&self, request_id: &Id) -> bool {
if let BatchState::Downloading(_, expected_id) = &self.state {
return expected_id == request_id;
}
false
}
Expand Down
63 changes: 60 additions & 3 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::metrics::PEERS_PER_COLUMN_SUBNET;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{
Expand All @@ -7,7 +8,8 @@ use crate::sync::{
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_metrics::set_int_gauge;
use lighthouse_network::{PeerAction, PeerId, Subnet};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
Expand Down Expand Up @@ -236,7 +238,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(KeepChain);
}
batch
Expand Down Expand Up @@ -414,6 +418,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.request_batches(network)?;
}
}
} else if !self.good_peers_on_custody_subnets(self.processing_target, network) {
// This is to handle the case where no batch was sent for the current processing
// target when there is no custody peers available. This is a valid state and should not
// return an error.
return Ok(KeepChain);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Err(RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
Expand Down Expand Up @@ -819,7 +828,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
if !batch.is_expecting_block(peer_id, &request_id) {
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
debug!(
self.log,
"Batch not expecting block";
Expand Down Expand Up @@ -1012,6 +1023,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_custody_subnets(epoch, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
Expand All @@ -1036,6 +1055,35 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in
/// every custody column subnet.
fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext<T>) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.count();

set_int_gauge(
&PEERS_PER_COLUMN_SUBNET,
&[&subnet_id.to_string()],
peer_count as i64,
);
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1066,6 +1114,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on custody subnets
if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down
Loading
Loading