From 1e3964ec9787b99a6da9a73189df85661417f708 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 25 Jun 2024 00:02:19 +1000 Subject: [PATCH] Reconstruct columns without blocking processing and import. --- beacon_node/beacon_chain/src/beacon_chain.rs | 103 ++++++----- .../src/block_verification_types.rs | 35 ++-- .../src/data_availability_checker.rs | 46 ++--- .../overflow_lru_cache.rs | 173 ++++++++++-------- .../src/data_column_verification.rs | 2 - .../gossip_methods.rs | 35 +--- .../src/network_beacon_processor/mod.rs | 71 ++++++- .../network_beacon_processor/sync_methods.rs | 43 +++-- .../src/sync/block_sidecar_coupling.rs | 17 +- .../network/src/sync/network_context.rs | 2 +- 10 files changed, 295 insertions(+), 232 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5cf2d14dd3d..730c2ef647a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, - DataColumnsToPublish, }; use crate::data_column_verification::{ CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn, @@ -3057,13 +3056,7 @@ impl BeaconChain { pub async fn process_gossip_data_columns( self: &Arc, data_columns: Vec>, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result> { let Ok(block_root) = data_columns .iter() .map(|c| c.block_root()) @@ -3131,13 +3124,7 @@ impl BeaconChain { self: &Arc, block_root: Hash256, custody_columns: Vec>, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result> { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its columns again. if self @@ -3162,6 +3149,52 @@ impl BeaconChain { self.remove_notified_custody_columns(&block_root, r) } + pub async fn reconstruct_data_columns( + self: &Arc, + block_root: Hash256, + ) -> Result< + Option<( + AvailabilityProcessingStatus, + DataColumnSidecarVec, + )>, + BlockError, + > { + // As of now we only reconstruct data columns on supernodes, so if the block is already + // available on a supernode, there's no need to reconstruct as the node must already have + // all columns. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Ok(None); + } + + let Some((availability, data_column_to_publish)) = self + .data_availability_checker + .reconstruct_data_columns(block_root)? + else { + return Ok(None); + }; + + let Ok(slot) = data_column_to_publish + .iter() + .map(|c| c.slot()) + .unique() + .exactly_one() + else { + return Err(BlockError::InternalError( + "Columns for the same block should have matching slot".to_string(), + )); + }; + + let r = self.process_availability(slot, availability).await; + self.remove_notified_custody_columns(&block_root, r) + .map(|availability_processing_status| { + Some((availability_processing_status, data_column_to_publish)) + }) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3179,15 +3212,13 @@ impl BeaconChain { /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. - fn remove_notified_custody_columns

( + fn remove_notified_custody_columns( &self, block_root: &Hash256, - r: Result<(AvailabilityProcessingStatus, P), BlockError>, - ) -> Result<(AvailabilityProcessingStatus, P), BlockError> { - let has_missing_components = matches!( - r, - Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _)) - ); + r: Result>, + ) -> Result> { + let has_missing_components = + matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); if !has_missing_components { self.reqresp_pre_import_cache.write().remove(block_root); } @@ -3436,13 +3467,7 @@ impl BeaconChain { async fn check_gossip_data_columns_availability_and_import( self: &Arc, data_columns: Vec>, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result> { if let Some(slasher) = self.slasher.as_ref() { for data_colum in &data_columns { slasher.accept_block_header(data_colum.signed_block_header()); @@ -3455,13 +3480,11 @@ impl BeaconChain { )); }; - let (availability, data_columns_to_publish) = self + let availability = self .data_availability_checker .put_gossip_data_columns(data_columns)?; - self.process_availability(slot, availability) - .await - .map(|result| (result, data_columns_to_publish)) + self.process_availability(slot, availability).await } /// Checks if the provided blobs can make any cached blocks available, and imports immediately @@ -3509,13 +3532,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, custody_columns: Vec>, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result> { // Need to scope this to ensure the lock is dropped before calling `process_availability` // Even an explicit drop is not enough to convince the borrow checker. { @@ -3539,13 +3556,11 @@ impl BeaconChain { } } } - let (availability, data_columns_to_publish) = self + let availability = self .data_availability_checker .put_rpc_custody_columns(block_root, custody_columns)?; - self.process_availability(slot, availability) - .await - .map(|result| (result, data_columns_to_publish)) + self.process_availability(slot, availability).await } /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 3723b22730a..855b0128b0b 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -3,7 +3,7 @@ use crate::block_verification::BlockError; use crate::data_availability_checker::AvailabilityCheckError; pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; use crate::data_column_verification::{ - CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList, + CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumnList, }; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome}; @@ -15,8 +15,8 @@ use std::sync::Arc; use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList}; use types::data_column_sidecar::{self}; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, - Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: @@ -74,7 +74,7 @@ impl RpcBlock { } } - pub fn custody_columns(&self) -> Option<&CustodyDataColumnList> { + pub fn custody_columns(&self) -> Option<&Vec>> { match &self.block { RpcBlockInner::Block(_) => None, RpcBlockInner::BlockAndBlobs(_, _) => None, @@ -96,7 +96,7 @@ enum RpcBlockInner { BlockAndBlobs(Arc>, BlobSidecarList), /// This variant is used with parent lookups and by-range responses. It should have all /// requested data columns, all block roots matching for this block. - BlockAndCustodyColumns(Arc>, CustodyDataColumnList), + BlockAndCustodyColumns(Arc>, Vec>), } impl RpcBlock { @@ -158,7 +158,6 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, - spec: &ChainSpec, ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); @@ -168,10 +167,7 @@ impl RpcBlock { } // Treat empty data column lists as if they are missing. let inner = if !custody_columns.is_empty() { - RpcBlockInner::BlockAndCustodyColumns( - block, - RuntimeVariableList::new(custody_columns, spec.number_of_columns)?, - ) + RpcBlockInner::BlockAndCustodyColumns(block, custody_columns) } else { RpcBlockInner::Block(block) }; @@ -205,7 +201,7 @@ impl RpcBlock { Hash256, Arc>, Option>, - Option>, + Option>>, ) { let block_root = self.block_root(); match self.block { @@ -596,7 +592,6 @@ impl AsBlock for AvailableBlock { } fn into_rpc_block(self) -> RpcBlock { - let number_of_columns = self.spec.number_of_columns; let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct(); // Circumvent the constructor here, because an Available block will have already had // consistency checks performed. @@ -605,18 +600,10 @@ impl AsBlock for AvailableBlock { (Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs), (_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns( block, - RuntimeVariableList::new( - data_columns - .into_iter() - // TODO(das): This is an ugly hack that should be removed. After updating - // store types to handle custody data columns this should not be required. - // It's okay-ish because available blocks must have all the required custody - // columns. - .map(|d| CustodyDataColumn::from_asserted_custody(d)) - .collect(), - number_of_columns, - ) - .expect("data column list is within bounds"), + data_columns + .into_iter() + .map(CustodyDataColumn::from_asserted_custody) + .collect(), ), }; RpcBlock { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f4316506510..e9ee34bc8db 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -23,6 +23,7 @@ mod error; mod overflow_lru_cache; mod state_lru_cache; +use crate::data_availability_checker::error::Error; use crate::data_column_verification::{ verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, @@ -31,8 +32,6 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh use types::data_column_sidecar::DataColumnIdentifier; use types::non_zero_usize::new_non_zero_usize; -pub use self::overflow_lru_cache::DataColumnsToPublish; - /// The LRU Cache stores `PendingComponents` which can store up to /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this @@ -159,6 +158,24 @@ impl DataAvailabilityChecker { self.availability_cache.peek_data_column(data_column_id) } + #[allow(clippy::type_complexity)] + pub fn reconstruct_data_columns( + &self, + block_root: Hash256, + ) -> Result< + Option<( + Availability<::EthSpec>, + DataColumnSidecarVec<::EthSpec>, + )>, + Error, + > { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + self.availability_cache + .reconstruct_data_columns(kzg, block_root) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( @@ -190,8 +207,7 @@ impl DataAvailabilityChecker { &self, block_root: Hash256, custody_columns: Vec>, - ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> - { + ) -> Result, AvailabilityCheckError> { let Some(kzg) = self.kzg.as_ref() else { return Err(AvailabilityCheckError::KzgNotInitialized); }; @@ -203,11 +219,8 @@ impl DataAvailabilityChecker { .map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg)) .collect::, _>>()?; - self.availability_cache.put_kzg_verified_data_columns( - kzg, - block_root, - verified_custody_columns, - ) + self.availability_cache + .put_kzg_verified_data_columns(block_root, verified_custody_columns) } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -232,11 +245,7 @@ impl DataAvailabilityChecker { pub fn put_gossip_data_columns( &self, gossip_data_columns: Vec>, - ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> - { - let Some(kzg) = self.kzg.as_ref() else { - return Err(AvailabilityCheckError::KzgNotInitialized); - }; + ) -> Result, AvailabilityCheckError> { let block_root = gossip_data_columns .first() .ok_or(AvailabilityCheckError::MissingCustodyColumns)? @@ -248,7 +257,7 @@ impl DataAvailabilityChecker { .collect::>(); self.availability_cache - .put_kzg_verified_data_columns(kzg, block_root, custody_columns) + .put_kzg_verified_data_columns(block_root, custody_columns) } /// Check if we have all the blobs for a block. Returns `Availability` which has information @@ -314,12 +323,7 @@ impl DataAvailabilityChecker { block, blobs: None, blobs_available_timestamp: None, - data_columns: Some( - data_column_list - .into_iter() - .map(|d| d.clone_arc()) - .collect(), - ), + data_columns: Some(data_column_list.iter().map(|d| d.clone_arc()).collect()), spec: self.spec.clone(), })) } else { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 88d622bd629..ec44ebf9c6b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -35,9 +35,8 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; -use crate::metrics; use crate::store::{DBColumn, KeyValueStore}; -use crate::BeaconChainTypes; +use crate::{metrics, BeaconChainTypes}; use kzg::Kzg; use lru::LruCache; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; @@ -49,12 +48,10 @@ use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; use types::data_column_sidecar::DataColumnIdentifier; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, - RuntimeVariableList, SignedBeaconBlock, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarVec, Epoch, EthSpec, + Hash256, RuntimeVariableList, SignedBeaconBlock, }; -pub type DataColumnsToPublish = Option>>>; - /// This represents the components of a partially available block /// /// The blobs are all gossip and kzg verified. @@ -67,7 +64,6 @@ pub struct PendingComponents { pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, pub verified_data_columns: RuntimeVariableList>, pub executed_block: Option>, - pub reconstruction_started: bool, } pub enum BlockImportRequirement { @@ -281,7 +277,6 @@ impl PendingComponents { verified_blobs: FixedVector::default(), verified_data_columns: RuntimeVariableList::empty(spec.number_of_columns), executed_block: None, - reconstruction_started: false, } } @@ -362,10 +357,6 @@ impl PendingComponents { ))) } - pub fn reconstruction_started(&mut self) { - self.reconstruction_started = true; - } - /// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob. pub fn epoch(&self) -> Option { self.executed_block @@ -654,11 +645,11 @@ impl Critical { pub fn peek_data_column( &self, data_column_id: &DataColumnIdentifier, - ) -> Result>>, AvailabilityCheckError> { + ) -> Option>> { if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { - Ok(pending_components.get_cached_data_column(data_column_id.index)) + pending_components.get_cached_data_column(data_column_id.index) } else { - Ok(None) + None } } @@ -753,6 +744,8 @@ pub struct OverflowLRUCache { capacity: NonZeroUsize, /// The number of data columns the node is custodying. custody_column_count: usize, + /// The block root of data columns currently being reconstructed, if any. + reconstructing_block_root: Mutex>, log: Logger, spec: Arc, } @@ -775,6 +768,7 @@ impl OverflowLRUCache { maintenance_lock: Mutex::new(()), capacity, custody_column_count, + reconstructing_block_root: Mutex::new(None), log, spec, }) @@ -826,7 +820,7 @@ impl OverflowLRUCache { data_column_id: &DataColumnIdentifier, ) -> Result>>, AvailabilityCheckError> { let read_lock = self.critical.read(); - if let Some(data_column) = read_lock.peek_data_column(data_column_id)? { + if let Some(data_column) = read_lock.peek_data_column(data_column_id) { Ok(Some(data_column)) } else if read_lock.store_keys.contains(&data_column_id.block_root) { drop(read_lock); @@ -854,16 +848,96 @@ impl OverflowLRUCache { } } + #[allow(clippy::type_complexity)] + pub fn reconstruct_data_columns( + &self, + kzg: &Kzg, + block_root: Hash256, + ) -> Result< + Option<(Availability, DataColumnSidecarVec)>, + AvailabilityCheckError, + > { + // Clone the pending components, so we don't hold the read lock during reconstruction + let Some(pending_components) = self + .peek_pending_components(&block_root, |pending_components_opt| { + pending_components_opt.cloned() + }) + else { + // Block may have been imported as it does not exist in availability cache. + return Ok(None); + }; + + let should_reconstruct = self + .block_import_requirement(&pending_components) + .map(|r| self.should_reconstruct(&r, &pending_components))?; + + if should_reconstruct { + *self.reconstructing_block_root.lock() = Some(block_root); + + let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + + // Will only return an error if: + // - < 50% of columns + // - There are duplicates + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( + kzg, + pending_components.verified_data_columns.as_slice(), + &self.spec, + )?; + + // Check indices from cache again to make sure we don't publish components we've already received. + let Some(existing_column_indices) = + self.peek_pending_components(&block_root, |pending_components_opt| { + pending_components_opt.map(|pending_components| { + pending_components + .verified_data_columns + .iter() + .map(|d| d.index()) + .collect::>() + }) + }) + else { + // If block is already imported (no longer in cache), abort publishing data columns + return Ok(None); + }; + + let data_columns_to_publish = all_data_columns + .into_iter() + .filter(|d| !existing_column_indices.contains(&d.index())) + .collect::>(); + + metrics::stop_timer(timer); + metrics::inc_counter_by( + &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, + data_columns_to_publish.len() as u64, + ); + debug!(self.log, "Reconstructed columns"; "count" => data_columns_to_publish.len()); + + // There is data columns to import and publish + self.put_kzg_verified_data_columns(block_root, data_columns_to_publish.clone()) + .map(|availability| { + ( + availability, + data_columns_to_publish + .into_iter() + .map(|d| d.clone_arc()) + .collect::>(), + ) + }) + .map(Some) + } else { + Ok(None) + } + } + #[allow(clippy::type_complexity)] pub fn put_kzg_verified_data_columns< I: IntoIterator>, >( &self, - kzg: &Kzg, block_root: Hash256, kzg_verified_data_columns: I, - ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> - { + ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); // Grab existing entry or create a new entry. @@ -876,51 +950,6 @@ impl OverflowLRUCache { let block_import_requirement = self.block_import_requirement(&pending_components)?; - // Potentially trigger reconstruction if: - // - Our custody requirement is all columns - // - We >= 50% of columns - let data_columns_to_publish = - if self.should_reconstruct(&block_import_requirement, &pending_components) { - pending_components.reconstruction_started(); - - let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); - - let existing_column_indices = pending_components - .verified_data_columns - .iter() - .map(|d| d.index()) - .collect::>(); - - // Will only return an error if: - // - < 50% of columns - // - There are duplicates - let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( - kzg, - pending_components.verified_data_columns.as_slice(), - &self.spec, - )?; - - let data_columns_to_publish = all_data_columns - .iter() - .filter(|d| !existing_column_indices.contains(&d.index())) - .map(|d| d.clone_arc()) - .collect::>(); - - pending_components.verified_data_columns = - RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns); - - metrics::stop_timer(timer); - metrics::inc_counter_by( - &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, - data_columns_to_publish.len() as u64, - ); - debug!(self.log, "Reconstructed columns"; "count" => data_columns_to_publish.len()); - - Some(data_columns_to_publish) - } else { - None - }; - if pending_components.is_available(&block_import_requirement, &self.log) { write_lock.put_pending_components( block_root, @@ -929,21 +958,16 @@ impl OverflowLRUCache { )?; // No need to hold the write lock anymore drop(write_lock); - pending_components - .make_available(&self.spec, |diet_block| { - self.state_cache.recover_pending_executed_block(diet_block) - }) - .map(|availability| (availability, data_columns_to_publish)) + pending_components.make_available(&self.spec, |diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) } else { write_lock.put_pending_components( block_root, pending_components, &self.overflow_store, )?; - Ok(( - Availability::MissingComponents(block_root), - data_columns_to_publish, - )) + Ok(Availability::MissingComponents(block_root)) } } @@ -964,7 +988,8 @@ impl OverflowLRUCache { let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns; has_missing_columns - && !pending_components.reconstruction_started + // for simplicity now, we only reconstruct columns for one block at a time. + && self.reconstructing_block_root.lock().is_none() && *num_expected_columns == num_of_columns && pending_components.verified_data_columns.len() >= num_of_columns / 2 } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 5448e1a8be7..3c15ae898db 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -239,8 +239,6 @@ impl KzgVerifiedDataColumn { } } -pub type CustodyDataColumnList = RuntimeVariableList>; - /// Data column that we must custody #[derive(Debug, Derivative, Clone, Encode, Decode)] #[derivative(PartialEq, Eq, Hash(bound = "E: EthSpec"))] diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 5b2ba93e920..220f796a58e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,6 +4,7 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; +use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; @@ -18,13 +19,7 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; -use beacon_chain::{ - blob_verification::{GossipBlobError, GossipVerifiedBlob}, - data_availability_checker::DataColumnsToPublish, -}; -use lighthouse_network::{ - Client, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage, ReportSource, -}; +use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; @@ -170,26 +165,6 @@ impl NetworkBeaconProcessor { }) } - pub(crate) fn handle_data_columns_to_publish( - &self, - data_columns_to_publish: DataColumnsToPublish, - ) { - if let Some(data_columns_to_publish) = data_columns_to_publish { - self.send_network_message(NetworkMessage::Publish { - messages: data_columns_to_publish - .iter() - .map(|d| { - let subnet = DataColumnSubnetId::from_column_index::( - d.index as usize, - &self.chain.spec, - ); - PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) - }) - .collect(), - }); - } - } - /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on /// the gossip network. /// @@ -1019,9 +994,7 @@ impl NetworkBeaconProcessor { .process_gossip_data_columns(vec![verified_data_column]) .await { - Ok((availability, data_columns_to_publish)) => { - self.handle_data_columns_to_publish(data_columns_to_publish); - + Ok(availability) => { match availability { AvailabilityProcessingStatus::Imported(block_root) => { // Note: Reusing block imported metric here @@ -1044,7 +1017,7 @@ impl NetworkBeaconProcessor { "block_root" => %block_root, ); - // Potentially trigger reconstruction + self.attempt_data_column_reconstruction(block_root).await; } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 969c604da49..a3c69205e13 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -3,7 +3,9 @@ use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; +use beacon_chain::{ + builder::Witness, eth1_chain::CachingEth1Backend, AvailabilityProcessingStatus, BeaconChain, +}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend, @@ -15,9 +17,9 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, - Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, + Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, }; -use slog::{debug, Logger}; +use slog::{debug, error, Logger}; use slot_clock::ManualSlotClock; use std::path::PathBuf; use std::sync::Arc; @@ -775,6 +777,69 @@ impl NetworkBeaconProcessor { "error" => %e) }); } + + /// Re-seed the network with reconstructed data columns. + fn handle_data_columns_to_publish( + &self, + data_columns_to_publish: DataColumnSidecarVec, + ) { + self.send_network_message(NetworkMessage::Publish { + messages: data_columns_to_publish + .iter() + .map(|d| { + let subnet = DataColumnSubnetId::from_column_index::( + d.index as usize, + &self.chain.spec, + ); + PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) + }) + .collect(), + }); + } + + async fn attempt_data_column_reconstruction(&self, block_root: Hash256) { + let result = self.chain.reconstruct_data_columns(block_root).await; + match result { + Ok(Some((availability, data_columns_to_publish))) => { + match availability { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components available via reconstruction"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + // TODO: confirm we only perform reconstruction after block is received + debug!( + self.log, + "Block components still missing block after reconstruction"; + "result" => "imported all custody columns", + "block_hash" => %block_root, + ); + } + } + + self.handle_data_columns_to_publish(data_columns_to_publish); + } + Ok(None) => { + debug!( + self.log, + "Reconstruction not required for block"; + "block_hash" => %block_root, + ) + } + Err(e) => { + error!( + self.log, + "Error during data column reconstruction"; + "error" => ?e + ); + } + } + } } type TestBeaconChainType = diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index c93483c02f8..bef179c708b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -334,28 +334,27 @@ impl NetworkBeaconProcessor { .await; match &result { - Ok((availability, data_columns_to_publish)) => { - self.handle_data_columns_to_publish(data_columns_to_publish.clone()); - - match availability { - AvailabilityProcessingStatus::Imported(hash) => { - debug!( - self.log, - "Block components retrieved"; - "result" => "imported block and custody columns", - "block_hash" => %hash, - ); - self.chain.recompute_head_at_current_slot().await; - } - AvailabilityProcessingStatus::MissingComponents(_, _) => { - debug!( - self.log, - "Missing components over rpc"; - "block_hash" => %block_root, - ); - } + Ok(availability) => match availability { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; } - } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + ); + // Attempt reconstruction here before notifying sync, to avoid sending out more requests + // that we may no longer need. + self.attempt_data_column_reconstruction(block_root).await; + } + }, Err(BlockError::BlockIsAlreadyKnown(_)) => { debug!( self.log, @@ -375,7 +374,7 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.map(|(r, _)| r).into(), + result: result.into(), }); } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 56e0a51522f..456226a7ec4 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -6,9 +6,7 @@ use std::{ collections::{HashMap, VecDeque}, sync::Arc, }; -use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, -}; +use types::{BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock}; #[derive(Debug)] pub struct RangeBlockComponentsRequest { @@ -71,9 +69,9 @@ impl RangeBlockComponentsRequest { } } - pub fn into_responses(self, spec: &ChainSpec) -> Result>, String> { + pub fn into_responses(self) -> Result>, String> { if let Some(expects_custody_columns) = self.expects_custody_columns.clone() { - self.into_responses_with_custody_columns(expects_custody_columns, spec) + self.into_responses_with_custody_columns(expects_custody_columns) } else { self.into_responses_with_blobs() } @@ -125,7 +123,6 @@ impl RangeBlockComponentsRequest { fn into_responses_with_custody_columns( self, expects_custody_columns: Vec, - spec: &ChainSpec, ) -> Result>, String> { let RangeBlockComponentsRequest { blocks, @@ -188,7 +185,7 @@ impl RangeBlockComponentsRequest { )); } - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns) .map_err(|e| format!("{e:?}"))? } else { RpcBlock::new_without_blobs(Some(block_root), block) @@ -245,7 +242,7 @@ mod tests { // Assert response is finished and RpcBlocks can be constructed assert!(info.is_finished()); - info.into_responses(&test_spec::()).unwrap(); + info.into_responses().unwrap(); } #[test] @@ -271,7 +268,7 @@ mod tests { // This makes sure we don't expect blobs here when they have expired. Checking this logic should // be hendled elsewhere. assert!(info.is_finished()); - info.into_responses(&test_spec::()).unwrap(); + info.into_responses().unwrap(); } #[test] @@ -327,6 +324,6 @@ mod tests { } // All completed construct response - info.into_responses(&spec).unwrap(); + info.into_responses().unwrap(); } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d9b2a9444f6..a2ffb41187e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -446,7 +446,7 @@ impl SyncNetworkContext { let (expects_blobs, expects_custody_columns) = info.get_requirements(); Some(BlocksAndBlobsByRangeResponse { sender_id, - responses: info.into_responses(&self.chain.spec), + responses: info.into_responses(), expects_blobs, expects_custody_columns, })