diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 3822b36b3d9..ac55d71dae4 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -273,7 +273,7 @@ pub async fn publish_block { /// The current local ENR. @@ -112,7 +112,7 @@ impl NetworkGlobals { } /// Compute custody data columns the node is assigned to custody. - pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec { + pub fn custody_columns(&self, spec: &ChainSpec) -> Vec { let enr = self.local_enr(); let node_id = enr.node_id().raw().into(); let custody_subnet_count = enr.custody_subnet_count::(spec); @@ -120,6 +120,14 @@ impl NetworkGlobals { .collect() } + /// Compute custody data column subnets the node is assigned to custody. + pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator { + let enr = self.local_enr(); + let node_id = enr.node_id().raw().into(); + let custody_subnet_count = enr.custody_subnet_count::(spec); + DataColumnSubnetId::compute_custody_subnets::(node_id, custody_subnet_count, spec) + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals(trusted_peers: Vec, log: &slog::Logger) -> NetworkGlobals { use crate::CombinedKeyExt; @@ -143,7 +151,7 @@ impl NetworkGlobals { #[cfg(test)] mod test { use crate::NetworkGlobals; - use types::{Epoch, EthSpec, MainnetEthSpec as E}; + use types::{EthSpec, MainnetEthSpec as E}; #[test] fn test_custody_count_default() { @@ -153,8 +161,7 @@ mod test { / spec.data_column_sidecar_subnet_count * spec.custody_requirement; let globals = NetworkGlobals::::new_test_globals(vec![], &log); - let any_epoch = Epoch::new(0); - let columns = globals.custody_columns(any_epoch, &spec); + let columns = globals.custody_columns(&spec); assert_eq!( columns.len(), default_custody_requirement_column_count as usize diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 573e465cb32..e601c897d98 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -728,10 +728,10 @@ impl SyncManager { } } SyncMessage::SampleBlock(block_root, block_slot) => { - debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root); - if let Some((requester, result)) = - self.sampling - .on_new_sample_request(block_root, block_slot, &mut self.network) + debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root, "block_slot" => block_slot); + if let Some((requester, result)) = self + .sampling + .on_new_sample_request(block_root, &mut self.network) { self.on_sampling_result(requester, result) } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d9b2a9444f6..312d4a3f0ac 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -38,7 +38,7 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256, + BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, SignedBeaconBlock, Slot, }; @@ -239,7 +239,7 @@ impl SyncNetworkContext { } // TODO(das): epoch argument left here in case custody rotation is implemented - pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec { + pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec { let mut peer_ids = vec![]; for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() { @@ -357,12 +357,10 @@ impl SyncNetworkContext { let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let custody_indexes = self - .network_globals() - .custody_columns(epoch, &self.chain.spec); + let custody_indexes = self.network_globals().custody_columns(&self.chain.spec); for column_index in &custody_indexes { - let custody_peer_ids = self.get_custodial_peers(epoch, *column_index); + let custody_peer_ids = self.get_custodial_peers(*column_index); let Some(custody_peer) = custody_peer_ids.first().cloned() else { // TODO(das): this will be pretty bad UX. To improve we should: // - Attempt to fetch custody requests first, before requesting blocks @@ -682,11 +680,7 @@ impl SyncNetworkContext { .imported_custody_column_indexes(&block_root) .unwrap_or_default(); - // TODO(das): figure out how to pass block.slot if we end up doing rotation - let block_epoch = Epoch::new(0); - let custody_indexes_duty = self - .network_globals() - .custody_columns(block_epoch, &self.chain.spec); + let custody_indexes_duty = self.network_globals().custody_columns(&self.chain.spec); // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = custody_indexes_duty diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index cc0fcec532a..75f9ac38839 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -13,7 +13,7 @@ use slog::{debug, warn}; use std::time::Duration; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use types::EthSpec; -use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; +use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256}; use super::{LookupRequestResult, PeerGroup, ReqId, RpcResponseResult, SyncNetworkContext}; @@ -34,7 +34,6 @@ type DataColumnSidecarVec = Vec>>; pub struct ActiveCustodyRequest { block_root: Hash256, - block_epoch: Epoch, custody_id: CustodyId, /// List of column indices this request needs to download to complete successfully column_requests: FnvHashMap>, @@ -80,8 +79,6 @@ impl ActiveCustodyRequest { ) -> Self { Self { block_root, - // TODO(das): use actual epoch if there's rotation - block_epoch: Epoch::new(0), custody_id, column_requests: HashMap::from_iter( column_indices @@ -248,7 +245,7 @@ impl ActiveCustodyRequest { // TODO: When is a fork and only a subset of your peers know about a block, we should only // query the peers on that fork. Should this case be handled? How to handle it? - let custodial_peers = cx.get_custodial_peers(self.block_epoch, *column_index); + let custodial_peers = cx.get_custodial_peers(*column_index); // TODO(das): cache this computation in a OneCell or similar to prevent having to // run it every loop diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index b6390f1a07f..8ae5b3fed6d 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -7,7 +7,7 @@ use crate::sync::{ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; -use lighthouse_network::{PeerAction, PeerId}; +use lighthouse_network::{PeerAction, PeerId, Subnet}; use rand::seq::SliceRandom; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; @@ -884,6 +884,29 @@ impl SyncingChain { ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); + + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { + // Require peers on all custody column subnets before sending batches + let has_peers_for_all_custody_subnets = network + .network_globals() + .custody_subnets(&network.chain.spec) + .all(|subnet_id| { + network + .network_globals() + .peers + .read() + .good_peers_on_subnet(Subnet::DataColumn(subnet_id)) + .next() + .is_some() + }); + + if !has_peers_for_all_custody_subnets { + debug!(self.log, "Waiting for peers to appear on all custody subnets before requesting batches"); + return Ok(KeepChain); + } + } + match network.block_components_by_range_request( peer, batch_type, diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 3fb21489d91..851e42d2bb2 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -10,7 +10,7 @@ use std::{ collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc, time::Duration, }; -use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256, Slot}; +use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256}; pub type SamplingResult = Result<(), SamplingError>; @@ -57,7 +57,6 @@ impl Sampling { pub fn on_new_sample_request( &mut self, block_root: Hash256, - block_slot: Slot, cx: &mut SyncNetworkContext, ) -> Option<(SamplingRequester, SamplingResult)> { let id = SamplingRequester::ImportedBlock(block_root); @@ -65,7 +64,6 @@ impl Sampling { let request = match self.requests.entry(id) { Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new( block_root, - block_slot, id, &self.sampling_config, self.log.clone(), @@ -163,7 +161,6 @@ impl Sampling { pub struct ActiveSamplingRequest { block_root: Hash256, - block_slot: Slot, requester_id: SamplingRequester, column_requests: FnvHashMap, column_shuffle: Vec, @@ -198,7 +195,6 @@ pub enum SamplingConfig { impl ActiveSamplingRequest { fn new( block_root: Hash256, - block_slot: Slot, requester_id: SamplingRequester, sampling_config: &SamplingConfig, log: slog::Logger, @@ -212,7 +208,6 @@ impl ActiveSamplingRequest { Self { block_root, - block_slot, requester_id, column_requests: <_>::default(), column_shuffle, @@ -401,7 +396,7 @@ impl ActiveSamplingRequest { .entry(column_index) .or_insert(ActiveColumnSampleRequest::new(column_index)); - if request.request(self.block_root, self.block_slot, self.requester_id, cx)? { + if request.request(self.block_root, self.requester_id, cx)? { sent_requests += 1 } } @@ -427,7 +422,7 @@ mod request { use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; use std::collections::HashSet; - use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot}; + use types::{data_column_sidecar::ColumnIndex, Hash256}; pub(crate) struct ActiveColumnSampleRequest { column_index: ColumnIndex, @@ -478,7 +473,6 @@ mod request { pub(crate) fn request( &mut self, block_root: Hash256, - block_slot: Slot, requester: SamplingRequester, cx: &mut SyncNetworkContext, ) -> Result { @@ -490,10 +484,7 @@ mod request { // TODO: When is a fork and only a subset of your peers know about a block, sampling should only // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers( - block_slot.epoch(::slots_per_epoch()), - self.column_index, - ); + let peer_ids = cx.get_custodial_peers(self.column_index); // TODO(das) randomize custodial peer and avoid failing peers if let Some(peer_id) = peer_ids.first().cloned() { diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 403216977df..cf5b1dd549d 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -36,7 +36,6 @@ impl DataColumnSubnetId { } /// Compute required subnets to subscribe to given the node id. - /// TODO(das): Add epoch param #[allow(clippy::arithmetic_side_effects)] pub fn compute_custody_subnets( node_id: U256,