Skip to content

Commit

Permalink
Add custody peer check before mutating BatchInfo to avoid inconsist…
Browse files Browse the repository at this point in the history
…ent state.
  • Loading branch information
jimmygchen committed Jun 27, 2024
1 parent edc584a commit 9f82497
Showing 1 changed file with 41 additions and 22 deletions.
63 changes: 41 additions & 22 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,28 +885,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();

let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let has_peers_for_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.next()
.is_some()
});

if !has_peers_for_all_custody_subnets {
debug!(self.log, "Waiting for peers to appear on all custody subnets before requesting batches");
return Ok(KeepChain);
}
}

match network.block_components_by_range_request(
peer,
batch_type,
Expand Down Expand Up @@ -1017,6 +995,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_custody_subnets(epoch, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
Expand All @@ -1041,6 +1027,30 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in
/// every custody column subnet.
fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext<T>) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.count();
debug!(self.log, "Peers found on custody subnet"; "subnet_id" => ?subnet_id, "peer_count" => peer_count);
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1071,6 +1081,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on custody subnets
if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down

0 comments on commit 9f82497

Please sign in to comment.