Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reconstructing data columns if we already have all the columns #5806

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3030,19 +3030,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the data column in the processing cache, process it, then evict it from the cache if it was
/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_column(
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
let block_root = data_column.block_root();
let Ok(block_root) = data_columns
.iter()
.map(|c| c.block_root())
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
Expand All @@ -3055,7 +3064,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let r = self
.check_gossip_data_column_availability_and_import(data_column)
.check_gossip_data_columns_availability_and_import(data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
Expand Down Expand Up @@ -3402,23 +3411,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_column_availability_and_import(
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
let slot = data_column.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_gossip_data_column(data_column)?;
.put_gossip_data_columns(data_columns)?;

self.process_availability(slot, availability)
.await
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ pub enum BlockError<E: EthSpec> {
///
/// This indicates the peer is sending an unexpected gossip blob and should be penalised.
BlobNotRequired(Slot),
/// An internal error has occurred when processing the block or sidecars.
///
/// ## Peer scoring
///
/// We were unable to process this block due to an internal error. It's unclear if the block is
/// valid.
InternalError(String),
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockError<E> {
Expand Down
18 changes: 11 additions & 7 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,22 +233,26 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
///
/// This should only accept gossip verified data columns, so we should not have to worry about dupes.
#[allow(clippy::type_complexity)]
pub fn put_gossip_data_column(
pub fn put_gossip_data_columns(
&self,
gossip_data_column: GossipVerifiedDataColumn<T>,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
let block_root = gossip_data_column.block_root();
let block_root = gossip_data_columns
.first()
.ok_or(AvailabilityCheckError::MissingCustodyColumns)?
.block_root();

// TODO(das): ensure that our custody requirements include this column
let custody_column =
KzgVerifiedCustodyDataColumn::from_asserted_custody(gossip_data_column.into_inner());
let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(kzg, block_root, vec![custody_column])
.put_kzg_verified_data_columns(kzg, block_root, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

/// Potentially trigger reconstruction if:
/// - Our custody requirement is all columns
/// - We >= 50% of columns
/// - We >= 50% of columns, but not all columns
fn should_reconstruct(
&self,
block_import_requirement: &BlockImportRequirement,
Expand All @@ -917,9 +917,13 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
return false;
};

!pending_components.reconstruction_started
&& *num_expected_columns == T::EthSpec::number_of_columns()
&& pending_components.verified_data_columns.len() >= T::EthSpec::number_of_columns() / 2
let num_of_columns = T::EthSpec::number_of_columns();
let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns;

has_missing_columns
&& !pending_components.reconstruction_started
&& *num_expected_columns == num_of_columns
&& pending_components.verified_data_columns.len() >= num_of_columns / 2
}

pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
Expand Down
52 changes: 27 additions & 25 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,31 +273,33 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns = network_globals
.custody_columns(block.epoch())
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"Failed to compute custody column indices: {:?}",
e
))
})?;

for data_column in gossip_verified_data_columns {
if custody_columns.contains(&data_column.index()) {
if let Err(e) = Box::pin(chain.process_gossip_data_column(data_column)).await {
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
let custody_columns_indices =
network_globals
.custody_columns(block.epoch())
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"Failed to compute custody column indices: {:?}",
e
))
})?;

let custody_columns = gossip_verified_data_columns
.into_iter()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.collect();

if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await {
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

match self
.chain
.process_gossip_data_column(verified_data_column)
.process_gossip_data_columns(vec![verified_data_column])
.await
{
Ok((availability, data_columns_to_publish)) => {
Expand Down Expand Up @@ -1266,6 +1266,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
return None;
}
Err(e @ BlockError::InternalError(_)) => {
error!(self.log, "Internal block gossip validation error";
"error" => %e
);
return None;
}
Err(e @ BlockError::BlobNotRequired(_)) => {
// TODO(das): penalty not implemented yet as other clients may still send us blobs
// during early stage of implementation.
Expand Down
Loading