From 73eadfa7cc0d39ebeee0541864767cc400ad9d6a Mon Sep 17 00:00:00 2001 From: alindima Date: Mon, 13 Nov 2023 11:35:10 +0200 Subject: [PATCH] cumulus-pov-recovery: check pov_hash instead of reencoding data Signed-off-by: alindima --- .../src/active_candidate_recovery.rs | 15 +- cumulus/client/pov-recovery/src/lib.rs | 38 ++--- .../src/collator_overseer.rs | 2 +- .../network/availability-recovery/src/lib.rs | 39 ++++- .../network/availability-recovery/src/task.rs | 138 +++++++++++------- 5 files changed, 141 insertions(+), 91 deletions(-) diff --git a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs index 322b19c796a8..2c635320ff4a 100644 --- a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs +++ b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs @@ -16,12 +16,12 @@ use sp_runtime::traits::Block as BlockT; -use polkadot_node_primitives::AvailableData; +use polkadot_node_primitives::PoV; use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt}; -use std::{collections::HashSet, pin::Pin}; +use std::{collections::HashSet, pin::Pin, sync::Arc}; use crate::RecoveryHandle; @@ -30,9 +30,8 @@ use crate::RecoveryHandle; /// This handles the candidate recovery and tracks the activate recoveries. pub(crate) struct ActiveCandidateRecovery { /// The recoveries that are currently being executed. - recoveries: FuturesUnordered< - Pin)> + Send>>, - >, + recoveries: + FuturesUnordered>)> + Send>>>, /// The block hashes of the candidates currently being recovered. candidates: HashSet, recovery_handle: Box, @@ -68,7 +67,7 @@ impl ActiveCandidateRecovery { self.recoveries.push( async move { match rx.await { - Ok(Ok(res)) => (block_hash, Some(res)), + Ok(Ok(res)) => (block_hash, Some(res.pov)), Ok(Err(error)) => { tracing::debug!( target: crate::LOG_TARGET, @@ -93,8 +92,8 @@ impl ActiveCandidateRecovery { /// Waits for the next recovery. /// - /// If the returned [`AvailableData`] is `None`, it means that the recovery failed. - pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option) { + /// If the returned [`PoV`] is `None`, it means that the recovery failed. + pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option>) { loop { if let Some(res) = self.recoveries.next().await { self.candidates.remove(&res.0); diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs index b050bc66799c..b9a140f55c60 100644 --- a/cumulus/client/pov-recovery/src/lib.rs +++ b/cumulus/client/pov-recovery/src/lib.rs @@ -51,7 +51,7 @@ use sc_consensus::import_queue::{ImportQueueService, IncomingBlock}; use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT}; +use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT}; use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{ @@ -346,15 +346,11 @@ where } /// Handle a recovered candidate. - async fn handle_candidate_recovered( - &mut self, - block_hash: Block::Hash, - available_data: Option, - ) { - let available_data = match available_data { - Some(data) => { + async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) { + let pov = match pov { + Some(pov) => { self.candidates_in_retry.remove(&block_hash); - data + pov }, None => if self.candidates_in_retry.insert(block_hash) { @@ -373,18 +369,16 @@ where }, }; - let raw_block_data = match sp_maybe_compressed_blob::decompress( - &available_data.pov.block_data.0, - POV_BOMB_LIMIT, - ) { - Ok(r) => r, - Err(error) => { - tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV"); + let raw_block_data = + match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { + Ok(r) => r, + Err(error) => { + tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV"); - self.reset_candidate(block_hash); - return - }, - }; + self.reset_candidate(block_hash); + return + }, + }; let block_data = match ParachainBlockData::::decode(&mut &raw_block_data[..]) { Ok(d) => d, @@ -595,10 +589,10 @@ where next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => { self.recover_candidate(next_to_recover).await; }, - (block_hash, available_data) = + (block_hash, pov) = self.active_candidate_recovery.wait_for_recovery().fuse() => { - self.handle_candidate_recovered(block_hash, available_data).await; + self.handle_candidate_recovered(block_hash, pov.as_deref()).await; }, } } diff --git a/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs b/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs index 379217e4a638..945344f85e97 100644 --- a/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs +++ b/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs @@ -102,7 +102,7 @@ fn build_overseer( let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?; let builder = Overseer::builder() .availability_distribution(DummySubsystem) - .availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip( + .availability_recovery(AvailabilityRecoverySubsystem::for_collator( available_data_req_receiver, Metrics::register(registry)?, )) diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 9acc48ea92e0..4a658449f09c 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -105,6 +105,17 @@ pub struct AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, /// Metrics for this subsystem. metrics: Metrics, + /// The type of check to perform after available data was recovered. + post_recovery_check: PostRecoveryCheck, +} + +#[derive(Clone, PartialEq, Debug)] +/// The type of check to perform after available data was recovered. +pub enum PostRecoveryCheck { + /// Reencode the data and check erasure root. For validators. + Reencode, + /// Only check the pov hash. For collators only. + PovHash, } /// Expensive erasure coding computations that we want to run on a blocking thread. @@ -344,6 +355,7 @@ async fn launch_recovery_task( metrics: &Metrics, recovery_strategies: VecDeque::Sender>>>, bypass_availability_store: bool, + post_recovery_check: PostRecoveryCheck, ) -> error::Result<()> { let candidate_hash = receipt.hash(); let params = RecoveryParams { @@ -354,6 +366,8 @@ async fn launch_recovery_task( erasure_root: receipt.descriptor.erasure_root, metrics: metrics.clone(), bypass_availability_store, + post_recovery_check, + pov_hash: receipt.descriptor.pov_hash, }; let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies); @@ -390,6 +404,7 @@ async fn handle_recover( erasure_task_tx: futures::channel::mpsc::Sender, recovery_strategy_kind: RecoveryStrategyKind, bypass_availability_store: bool, + post_recovery_check: PostRecoveryCheck, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -486,6 +501,7 @@ async fn handle_recover( metrics, recovery_strategies, bypass_availability_store, + post_recovery_check, ) .await }, @@ -527,15 +543,17 @@ async fn query_chunk_size( #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] impl AvailabilityRecoverySubsystem { - /// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the - /// `AvailabilityStoreSubsystem` subsystem. - pub fn with_availability_store_skip( + /// Create a new instance of `AvailabilityRecoverySubsystem` suitable for collator nodes, + /// which never requests the `AvailabilityStoreSubsystem` subsystem and only checks the POV hash + /// instead of reencoding the available data. + pub fn for_collator( req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { Self { recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT), bypass_availability_store: true, + post_recovery_check: PostRecoveryCheck::PovHash, req_receiver, metrics, } @@ -550,6 +568,7 @@ impl AvailabilityRecoverySubsystem { Self { recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways, bypass_availability_store: false, + post_recovery_check: PostRecoveryCheck::Reencode, req_receiver, metrics, } @@ -563,6 +582,7 @@ impl AvailabilityRecoverySubsystem { Self { recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways, bypass_availability_store: false, + post_recovery_check: PostRecoveryCheck::Reencode, req_receiver, metrics, } @@ -577,6 +597,7 @@ impl AvailabilityRecoverySubsystem { Self { recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT), bypass_availability_store: false, + post_recovery_check: PostRecoveryCheck::Reencode, req_receiver, metrics, } @@ -584,8 +605,13 @@ impl AvailabilityRecoverySubsystem { async fn run(self, mut ctx: Context) -> SubsystemResult<()> { let mut state = State::default(); - let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } = - self; + let Self { + mut req_receiver, + metrics, + recovery_strategy_kind, + bypass_availability_store, + post_recovery_check, + } = self; let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); let mut erasure_task_rx = erasure_task_rx.fuse(); @@ -675,7 +701,8 @@ impl AvailabilityRecoverySubsystem { &metrics, erasure_task_tx.clone(), recovery_strategy_kind.clone(), - bypass_availability_store + bypass_availability_store, + post_recovery_check.clone() ).await { gum::warn!( target: LOG_TARGET, diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index d5bc2da84944..f705d5c0e4cf 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -20,7 +20,7 @@ use crate::{ futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask, - LOG_TARGET, + PostRecoveryCheck, LOG_TARGET, }; use futures::{channel::oneshot, SinkExt}; #[cfg(not(test))] @@ -95,6 +95,12 @@ pub struct RecoveryParams { /// Do not request data from availability-store. Useful for collators. pub bypass_availability_store: bool, + + /// The type of check to perform after available data was recovered. + pub post_recovery_check: PostRecoveryCheck, + + /// The blake2-256 hash of the PoV. + pub pov_hash: Hash, } /// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the @@ -501,39 +507,48 @@ impl RecoveryStrategy match response.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { - let (reencode_tx, reencode_rx) = oneshot::channel(); - self.params - .erasure_task_tx - .send(ErasureTask::Reencode( - common_params.n_validators, - common_params.erasure_root, - data, - reencode_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - if let Some(data) = reencode_response { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - "Received full data", - ); + let maybe_data = match common_params.post_recovery_check { + PostRecoveryCheck::Reencode => { + let (reencode_tx, reencode_rx) = oneshot::channel(); + self.params + .erasure_task_tx + .send(ErasureTask::Reencode( + common_params.n_validators, + common_params.erasure_root, + data, + reencode_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)? + }, + PostRecoveryCheck::PovHash => + (data.pov.hash() == common_params.pov_hash).then_some(data), + }; - return Ok(data) - } else { - gum::debug!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - ?validator_index, - "Invalid data response", - ); + match maybe_data { + Some(data) => { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + "Received full data", + ); - // it doesn't help to report the peer with req/res. - } + return Ok(data) + }, + None => { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + ?validator_index, + "Invalid data response", + ); + + // it doesn't help to report the peer with req/res. + // we'll try the next backer. + }, + }; }, Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {}, Err(e) => gum::debug!( @@ -647,22 +662,43 @@ impl FetchChunks { match available_data_response { Ok(data) => { - // Send request to re-encode the chunks and check merkle root. - let (reencode_tx, reencode_rx) = oneshot::channel(); - self.erasure_task_tx - .send(ErasureTask::Reencode( - common_params.n_validators, - common_params.erasure_root, - data, - reencode_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - if let Some(data) = reencode_response { + let maybe_data = match common_params.post_recovery_check { + PostRecoveryCheck::Reencode => { + // Send request to re-encode the chunks and check merkle root. + let (reencode_tx, reencode_rx) = oneshot::channel(); + self.erasure_task_tx + .send(ErasureTask::Reencode( + common_params.n_validators, + common_params.erasure_root, + data, + reencode_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?.or_else(|| { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Data recovery error - root mismatch", + ); + None + }) + }, + PostRecoveryCheck::PovHash => + (data.pov.hash() == common_params.pov_hash).then_some(data).or_else(|| { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + pov_hash = ?common_params.pov_hash, + "Data recovery error - PoV hash mismatch", + ); + None + }), + }; + + if let Some(data) = maybe_data { gum::trace!( target: LOG_TARGET, candidate_hash = ?common_params.candidate_hash, @@ -673,12 +709,6 @@ impl FetchChunks { Ok(data) } else { recovery_duration.map(|rd| rd.stop_and_discard()); - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - erasure_root = ?common_params.erasure_root, - "Data recovery error - root mismatch", - ); Err(RecoveryError::Invalid) }