Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cumulus-pov-recovery: check pov_hash instead of reencoding data #2287

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions cumulus/client/pov-recovery/src/active_candidate_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

use sp_runtime::traits::Block as BlockT;

use polkadot_node_primitives::AvailableData;
use polkadot_node_primitives::PoV;
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;

use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};

use std::{collections::HashSet, pin::Pin};
use std::{collections::HashSet, pin::Pin, sync::Arc};

use crate::RecoveryHandle;

Expand All @@ -30,9 +30,8 @@ use crate::RecoveryHandle;
/// This handles the candidate recovery and tracks the activate recoveries.
pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
/// The recoveries that are currently being executed.
recoveries: FuturesUnordered<
Pin<Box<dyn Future<Output = (Block::Hash, Option<AvailableData>)> + Send>>,
>,
recoveries:
FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
/// The block hashes of the candidates currently being recovered.
candidates: HashSet<Block::Hash>,
recovery_handle: Box<dyn RecoveryHandle>,
Expand Down Expand Up @@ -68,7 +67,7 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
self.recoveries.push(
async move {
match rx.await {
Ok(Ok(res)) => (block_hash, Some(res)),
Ok(Ok(res)) => (block_hash, Some(res.pov)),
Ok(Err(error)) => {
tracing::debug!(
target: crate::LOG_TARGET,
Expand All @@ -93,8 +92,8 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {

/// Waits for the next recovery.
///
/// If the returned [`AvailableData`] is `None`, it means that the recovery failed.
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<AvailableData>) {
/// If the returned [`PoV`] is `None`, it means that the recovery failed.
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
loop {
if let Some(res) = self.recoveries.next().await {
self.candidates.remove(&res.0);
Expand Down
38 changes: 16 additions & 22 deletions cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};

use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
Expand Down Expand Up @@ -346,15 +346,11 @@ where
}

/// Handle a recovered candidate.
async fn handle_candidate_recovered(
&mut self,
block_hash: Block::Hash,
available_data: Option<AvailableData>,
) {
let available_data = match available_data {
Some(data) => {
async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
let pov = match pov {
Some(pov) => {
self.candidates_in_retry.remove(&block_hash);
data
pov
},
None =>
if self.candidates_in_retry.insert(block_hash) {
Expand All @@ -373,18 +369,16 @@ where
},
};

let raw_block_data = match sp_maybe_compressed_blob::decompress(
&available_data.pov.block_data.0,
POV_BOMB_LIMIT,
) {
Ok(r) => r,
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
let raw_block_data =
match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
Ok(r) => r,
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");

self.reset_candidate(block_hash);
return
},
};
self.reset_candidate(block_hash);
return
},
};

let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
Ok(d) => d,
Expand Down Expand Up @@ -595,10 +589,10 @@ where
next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
self.recover_candidate(next_to_recover).await;
},
(block_hash, available_data) =
(block_hash, pov) =
self.active_candidate_recovery.wait_for_recovery().fuse() =>
{
self.handle_candidate_recovered(block_hash, available_data).await;
self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn build_overseer(
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let builder = Overseer::builder()
.availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip(
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
available_data_req_receiver,
Metrics::register(registry)?,
))
Expand Down
39 changes: 33 additions & 6 deletions polkadot/node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ pub struct AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
/// Metrics for this subsystem.
metrics: Metrics,
/// The type of check to perform after available data was recovered.
post_recovery_check: PostRecoveryCheck,
}

#[derive(Clone, PartialEq, Debug)]
/// The type of check to perform after available data was recovered.
pub enum PostRecoveryCheck {
/// Reencode the data and check erasure root. For validators.
Reencode,
/// Only check the pov hash. For collators only.
PovHash,
}

/// Expensive erasure coding computations that we want to run on a blocking thread.
Expand Down Expand Up @@ -344,6 +355,7 @@ async fn launch_recovery_task<Context>(
metrics: &Metrics,
recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
bypass_availability_store: bool,
post_recovery_check: PostRecoveryCheck,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
let params = RecoveryParams {
Expand All @@ -354,6 +366,8 @@ async fn launch_recovery_task<Context>(
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
bypass_availability_store,
post_recovery_check,
pov_hash: receipt.descriptor.pov_hash,
};

let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
Expand Down Expand Up @@ -390,6 +404,7 @@ async fn handle_recover<Context>(
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
recovery_strategy_kind: RecoveryStrategyKind,
bypass_availability_store: bool,
post_recovery_check: PostRecoveryCheck,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

Expand Down Expand Up @@ -486,6 +501,7 @@ async fn handle_recover<Context>(
metrics,
recovery_strategies,
bypass_availability_store,
post_recovery_check,
)
.await
},
Expand Down Expand Up @@ -527,15 +543,17 @@ async fn query_chunk_size<Context>(

#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
/// `AvailabilityStoreSubsystem` subsystem.
pub fn with_availability_store_skip(
/// Create a new instance of `AvailabilityRecoverySubsystem` suitable for collator nodes,
/// which never requests the `AvailabilityStoreSubsystem` subsystem and only checks the POV hash
/// instead of reencoding the available data.
pub fn for_collator(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self {
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
bypass_availability_store: true,
post_recovery_check: PostRecoveryCheck::PovHash,
req_receiver,
metrics,
}
Expand All @@ -550,6 +568,7 @@ impl AvailabilityRecoverySubsystem {
Self {
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
bypass_availability_store: false,
post_recovery_check: PostRecoveryCheck::Reencode,
req_receiver,
metrics,
}
Expand All @@ -563,6 +582,7 @@ impl AvailabilityRecoverySubsystem {
Self {
recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
bypass_availability_store: false,
post_recovery_check: PostRecoveryCheck::Reencode,
req_receiver,
metrics,
}
Expand All @@ -577,15 +597,21 @@ impl AvailabilityRecoverySubsystem {
Self {
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
bypass_availability_store: false,
post_recovery_check: PostRecoveryCheck::Reencode,
req_receiver,
metrics,
}
}

async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
self;
let Self {
mut req_receiver,
metrics,
recovery_strategy_kind,
bypass_availability_store,
post_recovery_check,
} = self;

let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
let mut erasure_task_rx = erasure_task_rx.fuse();
Expand Down Expand Up @@ -675,7 +701,8 @@ impl AvailabilityRecoverySubsystem {
&metrics,
erasure_task_tx.clone(),
recovery_strategy_kind.clone(),
bypass_availability_store
bypass_availability_store,
post_recovery_check.clone()
).await {
gum::warn!(
target: LOG_TARGET,
Expand Down
Loading
Loading