Skip to content

Commit

Permalink
Only start requesting batches when there are good peers across all cu…
Browse files Browse the repository at this point in the history
…stody columns to avoid spaming block requests.
  • Loading branch information
jimmygchen committed Jun 27, 2024
1 parent 733b1df commit edc584a
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 41 deletions.
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns(block.epoch(), &chain.spec);
let custody_columns_indices = network_globals.custody_columns(&chain.spec);

let custody_columns = gossip_verified_data_columns
.into_iter()
Expand Down
17 changes: 12 additions & 5 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, EthSpec};

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -112,14 +112,22 @@ impl<E: EthSpec> NetworkGlobals<E> {
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec<ColumnIndex> {
pub fn custody_columns(&self, spec: &ChainSpec) -> Vec<ColumnIndex> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, spec)
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -143,7 +151,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use types::{Epoch, EthSpec, MainnetEthSpec as E};
use types::{EthSpec, MainnetEthSpec as E};

#[test]
fn test_custody_count_default() {
Expand All @@ -153,8 +161,7 @@ mod test {
/ spec.data_column_sidecar_subnet_count
* spec.custody_requirement;
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);
let any_epoch = Epoch::new(0);
let columns = globals.custody_columns(any_epoch, &spec);
let columns = globals.custody_columns(&spec);
assert_eq!(
columns.len(),
default_custody_requirement_column_count as usize
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
SyncMessage::SampleBlock(block_root, block_slot) => {
debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root);
if let Some((requester, result)) =
self.sampling
.on_new_sample_request(block_root, block_slot, &mut self.network)
debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root, "block_slot" => block_slot);
if let Some((requester, result)) = self
.sampling
.on_new_sample_request(block_root, &mut self.network)
{
self.on_sampling_result(requester, result)
}
Expand Down
16 changes: 5 additions & 11 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256,
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256,
SignedBeaconBlock, Slot,
};

Expand Down Expand Up @@ -239,7 +239,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}

// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec<PeerId> {
let mut peer_ids = vec![];

for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
Expand Down Expand Up @@ -357,12 +357,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns)
{
let custody_indexes = self
.network_globals()
.custody_columns(epoch, &self.chain.spec);
let custody_indexes = self.network_globals().custody_columns(&self.chain.spec);

for column_index in &custody_indexes {
let custody_peer_ids = self.get_custodial_peers(epoch, *column_index);
let custody_peer_ids = self.get_custodial_peers(*column_index);
let Some(custody_peer) = custody_peer_ids.first().cloned() else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
Expand Down Expand Up @@ -682,11 +680,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.imported_custody_column_indexes(&block_root)
.unwrap_or_default();

// TODO(das): figure out how to pass block.slot if we end up doing rotation
let block_epoch = Epoch::new(0);
let custody_indexes_duty = self
.network_globals()
.custody_columns(block_epoch, &self.chain.spec);
let custody_indexes_duty = self.network_globals().custody_columns(&self.chain.spec);

// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = custody_indexes_duty
Expand Down
7 changes: 2 additions & 5 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use slog::{debug, warn};
use std::time::Duration;
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use types::EthSpec;
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256};
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256};

use super::{LookupRequestResult, PeerGroup, ReqId, RpcResponseResult, SyncNetworkContext};

Expand All @@ -34,7 +34,6 @@ type DataColumnSidecarVec<E> = Vec<Arc<DataColumnSidecar<E>>>;

pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_epoch: Epoch,
custody_id: CustodyId,
/// List of column indices this request needs to download to complete successfully
column_requests: FnvHashMap<ColumnIndex, ColumnRequest<T::EthSpec>>,
Expand Down Expand Up @@ -80,8 +79,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
) -> Self {
Self {
block_root,
// TODO(das): use actual epoch if there's rotation
block_epoch: Epoch::new(0),
custody_id,
column_requests: HashMap::from_iter(
column_indices
Expand Down Expand Up @@ -248,7 +245,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {

// TODO: When is a fork and only a subset of your peers know about a block, we should only
// query the peers on that fork. Should this case be handled? How to handle it?
let custodial_peers = cx.get_custodial_peers(self.block_epoch, *column_index);
let custodial_peers = cx.get_custodial_peers(*column_index);

// TODO(das): cache this computation in a OneCell or similar to prevent having to
// run it every loop
Expand Down
25 changes: 24 additions & 1 deletion beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sync::{
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_network::{PeerAction, PeerId, Subnet};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
Expand Down Expand Up @@ -884,6 +884,29 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();

let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let has_peers_for_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.next()
.is_some()
});

if !has_peers_for_all_custody_subnets {
debug!(self.log, "Waiting for peers to appear on all custody subnets before requesting batches");
return Ok(KeepChain);
}
}

match network.block_components_by_range_request(
peer,
batch_type,
Expand Down
17 changes: 4 additions & 13 deletions beacon_node/network/src/sync/sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc,
time::Duration,
};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256, Slot};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256};

pub type SamplingResult = Result<(), SamplingError>;

Expand Down Expand Up @@ -57,15 +57,13 @@ impl<T: BeaconChainTypes> Sampling<T> {
pub fn on_new_sample_request(
&mut self,
block_root: Hash256,
block_slot: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Option<(SamplingRequester, SamplingResult)> {
let id = SamplingRequester::ImportedBlock(block_root);

let request = match self.requests.entry(id) {
Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new(
block_root,
block_slot,
id,
&self.sampling_config,
self.log.clone(),
Expand Down Expand Up @@ -163,7 +161,6 @@ impl<T: BeaconChainTypes> Sampling<T> {

pub struct ActiveSamplingRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_slot: Slot,
requester_id: SamplingRequester,
column_requests: FnvHashMap<ColumnIndex, ActiveColumnSampleRequest>,
column_shuffle: Vec<ColumnIndex>,
Expand Down Expand Up @@ -198,7 +195,6 @@ pub enum SamplingConfig {
impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
fn new(
block_root: Hash256,
block_slot: Slot,
requester_id: SamplingRequester,
sampling_config: &SamplingConfig,
log: slog::Logger,
Expand All @@ -212,7 +208,6 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {

Self {
block_root,
block_slot,
requester_id,
column_requests: <_>::default(),
column_shuffle,
Expand Down Expand Up @@ -401,7 +396,7 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
.entry(column_index)
.or_insert(ActiveColumnSampleRequest::new(column_index));

if request.request(self.block_root, self.block_slot, self.requester_id, cx)? {
if request.request(self.block_root, self.requester_id, cx)? {
sent_requests += 1
}
}
Expand All @@ -427,7 +422,7 @@ mod request {
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot};
use types::{data_column_sidecar::ColumnIndex, Hash256};

pub(crate) struct ActiveColumnSampleRequest {
column_index: ColumnIndex,
Expand Down Expand Up @@ -478,7 +473,6 @@ mod request {
pub(crate) fn request<T: BeaconChainTypes>(
&mut self,
block_root: Hash256,
block_slot: Slot,
requester: SamplingRequester,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, SamplingError> {
Expand All @@ -490,10 +484,7 @@ mod request {

// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
// be queried on the peers on that fork. Should this case be handled? How to handle it?
let peer_ids = cx.get_custodial_peers(
block_slot.epoch(<T::EthSpec as EthSpec>::slots_per_epoch()),
self.column_index,
);
let peer_ids = cx.get_custodial_peers(self.column_index);

// TODO(das) randomize custodial peer and avoid failing peers
if let Some(peer_id) = peer_ids.first().cloned() {
Expand Down
1 change: 0 additions & 1 deletion consensus/types/src/data_column_subnet_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl DataColumnSubnetId {
}

/// Compute required subnets to subscribe to given the node id.
/// TODO(das): Add epoch param
#[allow(clippy::arithmetic_side_effects)]
pub fn compute_custody_subnets<E: EthSpec>(
node_id: U256,
Expand Down

0 comments on commit edc584a

Please sign in to comment.