Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconstruct data columns without blocking processing and import #5986

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnsToPublish,
};
use crate::data_column_verification::{
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn,
Expand Down Expand Up @@ -3057,13 +3056,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns
.iter()
.map(|c| c.block_root())
Expand Down Expand Up @@ -3131,13 +3124,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
if self
Expand All @@ -3162,6 +3149,52 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified_custody_columns(&block_root, r)
}

pub async fn reconstruct_data_columns(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<
Option<(
AvailabilityProcessingStatus,
DataColumnSidecarVec<T::EthSpec>,
)>,
BlockError<T::EthSpec>,
> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
// all columns.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Ok(None);
}

let Some((availability, data_column_to_publish)) = self
.data_availability_checker
.reconstruct_data_columns(block_root)?
else {
return Ok(None);
};

let Ok(slot) = data_column_to_publish
.iter()
.map(|c| c.slot())
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let r = self.process_availability(slot, availability).await;
self.remove_notified_custody_columns(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_column_to_publish))
})
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand All @@ -3179,15 +3212,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns<P>(
fn remove_notified_custody_columns(
&self,
block_root: &Hash256,
r: Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>>,
) -> Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>> {
let has_missing_components = matches!(
r,
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
);
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
Expand Down Expand Up @@ -3436,13 +3467,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
Expand All @@ -3455,13 +3480,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
));
};

let (availability, data_columns_to_publish) = self
let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;

self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3509,13 +3532,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3539,13 +3556,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
let (availability, data_columns_to_publish) = self
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand Down
35 changes: 11 additions & 24 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::block_verification::BlockError;
use crate::data_availability_checker::AvailabilityCheckError;
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
use crate::data_column_verification::{
CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList,
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumnList,
};
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
Expand All @@ -15,8 +15,8 @@ use std::sync::Arc;
use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList};
use types::data_column_sidecar::{self};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<E: EthSpec> RpcBlock<E> {
}
}

pub fn custody_columns(&self) -> Option<&CustodyDataColumnList<E>> {
pub fn custody_columns(&self) -> Option<&Vec<CustodyDataColumn<E>>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, _) => None,
Expand All @@ -96,7 +96,7 @@ enum RpcBlockInner<E: EthSpec> {
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all
/// requested data columns, all block roots matching for this block.
BlockAndCustodyColumns(Arc<SignedBeaconBlock<E>>, CustodyDataColumnList<E>),
BlockAndCustodyColumns(Arc<SignedBeaconBlock<E>>, Vec<CustodyDataColumn<E>>),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -158,7 +158,6 @@ impl<E: EthSpec> RpcBlock<E> {
block_root: Option<Hash256>,
block: Arc<SignedBeaconBlock<E>>,
custody_columns: Vec<CustodyDataColumn<E>>,
spec: &ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

Expand All @@ -168,10 +167,7 @@ impl<E: EthSpec> RpcBlock<E> {
}
// Treat empty data column lists as if they are missing.
let inner = if !custody_columns.is_empty() {
RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(custody_columns, spec.number_of_columns)?,
)
RpcBlockInner::BlockAndCustodyColumns(block, custody_columns)
} else {
RpcBlockInner::Block(block)
};
Expand Down Expand Up @@ -205,7 +201,7 @@ impl<E: EthSpec> RpcBlock<E> {
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<CustodyDataColumnList<E>>,
Option<Vec<CustodyDataColumn<E>>>,
) {
let block_root = self.block_root();
match self.block {
Expand Down Expand Up @@ -596,7 +592,6 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let number_of_columns = self.spec.number_of_columns;
let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
Expand All @@ -605,18 +600,10 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(
data_columns
.into_iter()
// TODO(das): This is an ugly hack that should be removed. After updating
// store types to handle custody data columns this should not be required.
// It's okay-ish because available blocks must have all the required custody
// columns.
.map(|d| CustodyDataColumn::from_asserted_custody(d))
.collect(),
number_of_columns,
)
.expect("data column list is within bounds"),
data_columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
.collect(),
),
};
RpcBlock {
Expand Down
46 changes: 25 additions & 21 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_availability_checker::error::Error;
use crate::data_column_verification::{
verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn,
KzgVerifiedCustodyDataColumn,
Expand All @@ -31,8 +32,6 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh
use types::data_column_sidecar::DataColumnIdentifier;
use types::non_zero_usize::new_non_zero_usize;

pub use self::overflow_lru_cache::DataColumnsToPublish;

/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
Expand Down Expand Up @@ -159,6 +158,24 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_data_column(data_column_id)
}

#[allow(clippy::type_complexity)]
pub fn reconstruct_data_columns(
&self,
block_root: Hash256,
) -> Result<
Option<(
Availability<<T as BeaconChainTypes>::EthSpec>,
DataColumnSidecarVec<<T as BeaconChainTypes>::EthSpec>,
)>,
Error,
> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
self.availability_cache
.reconstruct_data_columns(kzg, block_root)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -190,8 +207,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
Expand All @@ -203,11 +219,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg))
.collect::<Result<Vec<_>, _>>()?;

self.availability_cache.put_kzg_verified_data_columns(
kzg,
block_root,
verified_custody_columns,
)
self.availability_cache
.put_kzg_verified_data_columns(block_root, verified_custody_columns)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -232,11 +245,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_gossip_data_columns(
&self,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let block_root = gossip_data_columns
.first()
.ok_or(AvailabilityCheckError::MissingCustodyColumns)?
Expand All @@ -248,7 +257,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(kzg, block_root, custody_columns)
.put_kzg_verified_data_columns(block_root, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down Expand Up @@ -314,12 +323,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block,
blobs: None,
blobs_available_timestamp: None,
data_columns: Some(
data_column_list
.into_iter()
.map(|d| d.clone_arc())
.collect(),
),
data_columns: Some(data_column_list.iter().map(|d| d.clone_arc()).collect()),
spec: self.spec.clone(),
}))
} else {
Expand Down
Loading
Loading