From 051328a50106cbd01c745ce3dca95a5b9ef9291a Mon Sep 17 00:00:00 2001 From: alindima Date: Wed, 6 Sep 2023 17:58:49 +0300 Subject: [PATCH 01/12] Draft of RecoveryStrategy based on linked enums --- .../network/availability-recovery/src/lib.rs | 809 ++------------- .../network/availability-recovery/src/task.rs | 938 ++++++++++++++++++ .../availability-recovery/src/tests.rs | 49 +- 3 files changed, 1045 insertions(+), 751 deletions(-) create mode 100644 polkadot/node/network/availability-recovery/src/task.rs diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 99f42f4bf9fe..63bfc82e3d7b 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -18,16 +18,10 @@ #![warn(missing_docs)] -use std::{ - collections::{HashMap, VecDeque}, - iter::Iterator, - num::NonZeroUsize, - pin::Pin, - time::Duration, -}; +use std::{collections::HashMap, iter::Iterator, num::NonZeroUsize, pin::Pin}; use futures::{ - channel::oneshot::{self, channel}, + channel::oneshot, future::{Future, FutureExt, RemoteHandle}, pin_mut, prelude::*, @@ -35,77 +29,55 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, task::{Context, Poll}, }; -use rand::seq::SliceRandom; use schnellru::{ByLength, LruMap}; +use task::{FetchChunksParams, FetchFullParams}; use fatality::Nested; use polkadot_erasure_coding::{ branch_hash, branches, obtain_chunks_v1, recovery_threshold, Error as ErasureEncodingError, }; -#[cfg(not(test))] -use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT; +use task::{RecoveryParams, RecoveryStrategy, RecoveryTask}; + use polkadot_node_network_protocol::{ - request_response::{ - self as req_res, outgoing::RequestError, v1 as request_v1, IncomingRequestReceiver, - OutgoingRequest, Recipient, Requests, - }, - IfDisconnected, UnifiedReputationChange as Rep, + request_response::{v1 as request_v1, IncomingRequestReceiver}, + UnifiedReputationChange as Rep, }; use polkadot_node_primitives::{AvailableData, ErasureChunk}; use polkadot_node_subsystem::{ errors::RecoveryError, jaeger, - messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage, NetworkBridgeTxMessage}, + messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage}, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::request_session_info; use polkadot_primitives::{ - AuthorityDiscoveryId, BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, - Hash, HashT, IndexedVec, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, + BlakeTwo256, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HashT, + SessionIndex, SessionInfo, ValidatorIndex, }; mod error; mod futures_undead; mod metrics; +mod task; use metrics::Metrics; -use futures_undead::FuturesUndead; -use sc_network::{OutboundFailure, RequestFailure}; - #[cfg(test)] mod tests; const LOG_TARGET: &str = "parachain::availability-recovery"; -// How many parallel recovery tasks should be running at once. -const N_PARALLEL: usize = 50; - // Size of the LRU cache where we keep recovered data. const LRU_SIZE: u32 = 16; const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request"); -/// Time after which we consider a request to have failed -/// -/// and we should try more peers. Note in theory the request times out at the network level, -/// measurements have shown, that in practice requests might actually take longer to fail in -/// certain occasions. (The very least, authority discovery is not part of the timeout.) -/// -/// For the time being this value is the same as the timeout on the networking layer, but as this -/// timeout is more soft than the networking one, it might make sense to pick different values as -/// well. -#[cfg(not(test))] -const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT; -#[cfg(test)] -const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100); - /// PoV size limit in bytes for which prefer fetching from backers. const SMALL_POV_LIMIT: usize = 128 * 1024; #[derive(Clone, PartialEq)] /// The strategy we use to recover the PoV. -pub enum RecoveryStrategy { +pub enum RecoveryStrategyKind { /// We always try the backing group first, then fallback to validator chunks. BackersFirstAlways, /// We try the backing group first if PoV size is lower than specified, then fallback to @@ -120,92 +92,16 @@ pub enum RecoveryStrategy { BypassAvailabilityStore, } -impl RecoveryStrategy { - /// Returns true if the strategy needs backing group index. - pub fn needs_backing_group(&self) -> bool { - match self { - RecoveryStrategy::BackersFirstAlways | RecoveryStrategy::BackersFirstIfSizeLower(_) => - true, - _ => false, - } - } - - /// Returns the PoV size limit in bytes for `BackersFirstIfSizeLower` strategy, otherwise - /// `None`. - pub fn pov_size_limit(&self) -> Option { - match *self { - RecoveryStrategy::BackersFirstIfSizeLower(limit) => Some(limit), - _ => None, - } - } -} /// The Availability Recovery Subsystem. pub struct AvailabilityRecoverySubsystem { /// PoV recovery strategy to use. - recovery_strategy: RecoveryStrategy, + recovery_strategy_kind: RecoveryStrategyKind, /// Receiver for available data requests. req_receiver: IncomingRequestReceiver, /// Metrics for this subsystem. metrics: Metrics, } -struct RequestFromBackers { - // a random shuffling of the validators from the backing group which indicates the order - // in which we connect to them and request the chunk. - shuffled_backers: Vec, - // channel to the erasure task handler. - erasure_task_tx: futures::channel::mpsc::Sender, -} - -struct RequestChunksFromValidators { - /// How many request have been unsuccessful so far. - error_count: usize, - /// Total number of responses that have been received. - /// - /// including failed ones. - total_received_responses: usize, - /// a random shuffling of the validators which indicates the order in which we connect to the - /// validators and request the chunk from them. - shuffling: VecDeque, - /// Chunks received so far. - received_chunks: HashMap, - /// Pending chunk requests with soft timeout. - requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, - // channel to the erasure task handler. - erasure_task_tx: futures::channel::mpsc::Sender, -} - -struct RecoveryParams { - /// Discovery ids of `validators`. - validator_authority_keys: Vec, - - /// Validators relevant to this `RecoveryTask`. - validators: IndexedVec, - - /// The number of pieces needed. - threshold: usize, - - /// A hash of the relevant candidate. - candidate_hash: CandidateHash, - - /// The root of the erasure encoding of the para block. - erasure_root: Hash, - - /// Metrics to report - metrics: Metrics, - - /// Do not request data from availability-store - bypass_availability_store: bool, -} - -/// Source the availability data either by means -/// of direct request response protocol to -/// backers (a.k.a. fast-path), or recover from chunks. -enum Source { - RequestFromBackers(RequestFromBackers), - RequestChunks(RequestChunksFromValidators), -} - /// Expensive erasure coding computations that we want to run on a blocking thread. enum ErasureTask { /// Reconstructs `AvailableData` from chunks given `n_validators`. @@ -219,486 +115,6 @@ enum ErasureTask { Reencode(usize, Hash, AvailableData, oneshot::Sender>), } -/// A stateful reconstruction of availability data in reference to -/// a candidate hash. -struct RecoveryTask { - sender: Sender, - - /// The parameters of the recovery process. - params: RecoveryParams, - - /// The source to obtain the availability data from. - source: Source, - - // channel to the erasure task handler. - erasure_task_tx: futures::channel::mpsc::Sender, -} - -impl RequestFromBackers { - fn new( - mut backers: Vec, - erasure_task_tx: futures::channel::mpsc::Sender, - ) -> Self { - backers.shuffle(&mut rand::thread_rng()); - - RequestFromBackers { shuffled_backers: backers, erasure_task_tx } - } - - // Run this phase to completion. - async fn run( - &mut self, - params: &RecoveryParams, - sender: &mut impl overseer::AvailabilityRecoverySenderTrait, - ) -> Result { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - erasure_root = ?params.erasure_root, - "Requesting from backers", - ); - loop { - // Pop the next backer, and proceed to next phase if we're out. - let validator_index = - self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?; - - // Request data. - let (req, response) = OutgoingRequest::new( - Recipient::Authority( - params.validator_authority_keys[validator_index.0 as usize].clone(), - ), - req_res::v1::AvailableDataFetchingRequest { candidate_hash: params.candidate_hash }, - ); - - sender - .send_message(NetworkBridgeTxMessage::SendRequests( - vec![Requests::AvailableDataFetchingV1(req)], - IfDisconnected::ImmediateError, - )) - .await; - - match response.await { - Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { - let (reencode_tx, reencode_rx) = channel(); - self.erasure_task_tx - .send(ErasureTask::Reencode( - params.validators.len(), - params.erasure_root, - data, - reencode_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - if let Some(data) = reencode_response { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - "Received full data", - ); - - return Ok(data) - } else { - gum::debug!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - ?validator_index, - "Invalid data response", - ); - - // it doesn't help to report the peer with req/res. - } - }, - Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {}, - Err(e) => gum::debug!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - ?validator_index, - err = ?e, - "Error fetching full available data." - ), - } - } - } -} - -impl RequestChunksFromValidators { - fn new( - n_validators: u32, - erasure_task_tx: futures::channel::mpsc::Sender, - ) -> Self { - let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect(); - shuffling.shuffle(&mut rand::thread_rng()); - - RequestChunksFromValidators { - error_count: 0, - total_received_responses: 0, - shuffling: shuffling.into(), - received_chunks: HashMap::new(), - requesting_chunks: FuturesUndead::new(), - erasure_task_tx, - } - } - - fn is_unavailable(&self, params: &RecoveryParams) -> bool { - is_unavailable( - self.chunk_count(), - self.requesting_chunks.total_len(), - self.shuffling.len(), - params.threshold, - ) - } - - fn chunk_count(&self) -> usize { - self.received_chunks.len() - } - - fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) { - self.received_chunks.insert(validator_index, chunk); - } - - fn can_conclude(&self, params: &RecoveryParams) -> bool { - self.chunk_count() >= params.threshold || self.is_unavailable(params) - } - - /// Desired number of parallel requests. - /// - /// For the given threshold (total required number of chunks) get the desired number of - /// requests we want to have running in parallel at this time. - fn get_desired_request_count(&self, threshold: usize) -> usize { - // Upper bound for parallel requests. - // We want to limit this, so requests can be processed within the timeout and we limit the - // following feedback loop: - // 1. Requests fail due to timeout - // 2. We request more chunks to make up for it - // 3. Bandwidth is spread out even more, so we get even more timeouts - // 4. We request more chunks to make up for it ... - let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold); - // How many chunks are still needed? - let remaining_chunks = threshold.saturating_sub(self.chunk_count()); - // What is the current error rate, so we can make up for it? - let inv_error_rate = - self.total_received_responses.checked_div(self.error_count).unwrap_or(0); - // Actual number of requests we want to have in flight in parallel: - std::cmp::min( - max_requests_boundary, - remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0), - ) - } - - async fn launch_parallel_requests( - &mut self, - params: &RecoveryParams, - sender: &mut Sender, - ) where - Sender: overseer::AvailabilityRecoverySenderTrait, - { - let num_requests = self.get_desired_request_count(params.threshold); - let candidate_hash = ¶ms.candidate_hash; - let already_requesting_count = self.requesting_chunks.len(); - - gum::debug!( - target: LOG_TARGET, - ?candidate_hash, - ?num_requests, - error_count= ?self.error_count, - total_received = ?self.total_received_responses, - threshold = ?params.threshold, - ?already_requesting_count, - "Requesting availability chunks for a candidate", - ); - let mut requests = Vec::with_capacity(num_requests - already_requesting_count); - - while self.requesting_chunks.len() < num_requests { - if let Some(validator_index) = self.shuffling.pop_back() { - let validator = params.validator_authority_keys[validator_index.0 as usize].clone(); - gum::trace!( - target: LOG_TARGET, - ?validator, - ?validator_index, - ?candidate_hash, - "Requesting chunk", - ); - - // Request data. - let raw_request = req_res::v1::ChunkFetchingRequest { - candidate_hash: params.candidate_hash, - index: validator_index, - }; - - let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request); - requests.push(Requests::ChunkFetchingV1(req)); - - params.metrics.on_chunk_request_issued(); - let timer = params.metrics.time_chunk_request(); - - self.requesting_chunks.push(Box::pin(async move { - let _timer = timer; - match res.await { - Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => - Ok(Some(chunk.recombine_into_chunk(&raw_request))), - Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None), - Err(e) => Err((validator_index, e)), - } - })); - } else { - break - } - } - - sender - .send_message(NetworkBridgeTxMessage::SendRequests( - requests, - IfDisconnected::TryConnect, - )) - .await; - } - - /// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`. - async fn wait_for_chunks(&mut self, params: &RecoveryParams) { - let metrics = ¶ms.metrics; - - // Wait for all current requests to conclude or time-out, or until we reach enough chunks. - // We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will - // return in that case for `launch_parallel_requests` to fill up slots again. - while let Some(request_result) = - self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await - { - self.total_received_responses += 1; - - match request_result { - Ok(Some(chunk)) => - if is_chunk_valid(params, &chunk) { - metrics.on_chunk_request_succeeded(); - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - validator_index = ?chunk.index, - "Received valid chunk", - ); - self.insert_chunk(chunk.index, chunk); - } else { - metrics.on_chunk_request_invalid(); - self.error_count += 1; - }, - Ok(None) => { - metrics.on_chunk_request_no_such_chunk(); - self.error_count += 1; - }, - Err((validator_index, e)) => { - self.error_count += 1; - - gum::trace!( - target: LOG_TARGET, - candidate_hash= ?params.candidate_hash, - err = ?e, - ?validator_index, - "Failure requesting chunk", - ); - - match e { - RequestError::InvalidResponse(_) => { - metrics.on_chunk_request_invalid(); - - gum::debug!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - err = ?e, - ?validator_index, - "Chunk fetching response was invalid", - ); - }, - RequestError::NetworkError(err) => { - // No debug logs on general network errors - that became very spammy - // occasionally. - if let RequestFailure::Network(OutboundFailure::Timeout) = err { - metrics.on_chunk_request_timeout(); - } else { - metrics.on_chunk_request_error(); - } - - self.shuffling.push_front(validator_index); - }, - RequestError::Canceled(_) => { - metrics.on_chunk_request_error(); - - self.shuffling.push_front(validator_index); - }, - } - }, - } - - // Stop waiting for requests when we either can already recover the data - // or have gotten firm 'No' responses from enough validators. - if self.can_conclude(params) { - gum::debug!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - received_chunks_count = ?self.chunk_count(), - requested_chunks_count = ?self.requesting_chunks.len(), - threshold = ?params.threshold, - "Can conclude availability for a candidate", - ); - break - } - } - } - - async fn run( - &mut self, - params: &RecoveryParams, - sender: &mut Sender, - ) -> Result - where - Sender: overseer::AvailabilityRecoverySenderTrait, - { - let metrics = ¶ms.metrics; - - // First query the store for any chunks we've got. - if !params.bypass_availability_store { - let (tx, rx) = oneshot::channel(); - sender - .send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx)) - .await; - - match rx.await { - Ok(chunks) => { - // This should either be length 1 or 0. If we had the whole data, - // we wouldn't have reached this stage. - let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect(); - self.shuffling.retain(|i| !chunk_indices.contains(i)); - - for chunk in chunks { - if is_chunk_valid(params, &chunk) { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - validator_index = ?chunk.index, - "Found valid chunk on disk" - ); - self.insert_chunk(chunk.index, chunk); - } else { - gum::error!( - target: LOG_TARGET, - "Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!" - ); - }; - } - }, - Err(oneshot::Canceled) => { - gum::warn!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - "Failed to reach the availability store" - ); - }, - } - } - - let _recovery_timer = metrics.time_full_recovery(); - - loop { - if self.is_unavailable(¶ms) { - gum::debug!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - erasure_root = ?params.erasure_root, - received = %self.chunk_count(), - requesting = %self.requesting_chunks.len(), - total_requesting = %self.requesting_chunks.total_len(), - n_validators = %params.validators.len(), - "Data recovery is not possible", - ); - - metrics.on_recovery_failed(); - - return Err(RecoveryError::Unavailable) - } - - self.launch_parallel_requests(params, sender).await; - self.wait_for_chunks(params).await; - - // If received_chunks has more than threshold entries, attempt to recover the data. - // If that fails, or a re-encoding of it doesn't match the expected erasure root, - // return Err(RecoveryError::Invalid) - if self.chunk_count() >= params.threshold { - let recovery_duration = metrics.time_erasure_recovery(); - - // Send request to reconstruct available data from chunks. - let (avilable_data_tx, available_data_rx) = channel(); - self.erasure_task_tx - .send(ErasureTask::Reconstruct( - params.validators.len(), - std::mem::take(&mut self.received_chunks), - avilable_data_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let available_data_response = - available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - return match available_data_response { - Ok(data) => { - // Send request to re-encode the chunks and check merkle root. - let (reencode_tx, reencode_rx) = channel(); - self.erasure_task_tx - .send(ErasureTask::Reencode( - params.validators.len(), - params.erasure_root, - data, - reencode_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - if let Some(data) = reencode_response { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - erasure_root = ?params.erasure_root, - "Data recovery complete", - ); - metrics.on_recovery_succeeded(); - - Ok(data) - } else { - recovery_duration.map(|rd| rd.stop_and_discard()); - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - erasure_root = ?params.erasure_root, - "Data recovery - root mismatch", - ); - metrics.on_recovery_invalid(); - - Err(RecoveryError::Invalid) - } - }, - Err(err) => { - recovery_duration.map(|rd| rd.stop_and_discard()); - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?params.candidate_hash, - erasure_root = ?params.erasure_root, - ?err, - "Data recovery error ", - ); - metrics.on_recovery_invalid(); - - Err(RecoveryError::Invalid) - }, - } - } - } - } -} - const fn is_unavailable( received_chunks: usize, requesting_chunks: usize, @@ -777,60 +193,6 @@ fn reconstructed_data_matches_root( branches.root() == *expected_root } -impl RecoveryTask -where - Sender: overseer::AvailabilityRecoverySenderTrait, -{ - async fn run(mut self) -> Result { - // First just see if we have the data available locally. - if !self.params.bypass_availability_store { - let (tx, rx) = oneshot::channel(); - self.sender - .send_message(AvailabilityStoreMessage::QueryAvailableData( - self.params.candidate_hash, - tx, - )) - .await; - - match rx.await { - Ok(Some(data)) => return Ok(data), - Ok(None) => {}, - Err(oneshot::Canceled) => { - gum::warn!( - target: LOG_TARGET, - candidate_hash = ?self.params.candidate_hash, - "Failed to reach the availability store", - ) - }, - } - } - - self.params.metrics.on_recovery_started(); - - loop { - // These only fail if we cannot reach the underlying subsystem, which case there is - // nothing meaningful we can do. - match self.source { - Source::RequestFromBackers(ref mut from_backers) => { - match from_backers.run(&self.params, &mut self.sender).await { - Ok(data) => break Ok(data), - Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), - Err(RecoveryError::ChannelClosed) => - break Err(RecoveryError::ChannelClosed), - Err(RecoveryError::Unavailable) => - self.source = Source::RequestChunks(RequestChunksFromValidators::new( - self.params.validators.len() as _, - self.erasure_task_tx.clone(), - )), - } - }, - Source::RequestChunks(ref mut from_all) => - break from_all.run(&self.params, &mut self.sender).await, - } - } - } -} - /// Accumulate all awaiting sides for some particular `AvailableData`. struct RecoveryHandle { candidate_hash: CandidateHash, @@ -925,6 +287,7 @@ struct State { /// An LRU cache of recently recovered data. availability_lru: LruMap, + // TODO: an LRU cache of erasure indices shuffling of all validators (per relay-parent). } impl Default for State { @@ -973,65 +336,23 @@ async fn launch_recovery_task( ctx: &mut Context, session_info: SessionInfo, receipt: CandidateReceipt, - mut backing_group: Option, response_sender: oneshot::Sender>, metrics: &Metrics, - recovery_strategy: &RecoveryStrategy, - erasure_task_tx: futures::channel::mpsc::Sender, + recovery_strategy: RecoveryStrategy, + bypass_availability_store: bool, ) -> error::Result<()> { let candidate_hash = receipt.hash(); let params = RecoveryParams { validator_authority_keys: session_info.discovery_keys.clone(), - validators: session_info.validators.clone(), + n_validators: session_info.validators.len(), threshold: recovery_threshold(session_info.validators.len())?, candidate_hash, erasure_root: receipt.descriptor.erasure_root, metrics: metrics.clone(), - bypass_availability_store: recovery_strategy == &RecoveryStrategy::BypassAvailabilityStore, + bypass_availability_store, }; - if let Some(small_pov_limit) = recovery_strategy.pov_size_limit() { - // Get our own chunk size to get an estimate of the PoV size. - let chunk_size: Result, error::Error> = - query_chunk_size(ctx, candidate_hash).await; - if let Ok(Some(chunk_size)) = chunk_size { - let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3; - let prefer_backing_group = pov_size_estimate < small_pov_limit; - - gum::trace!( - target: LOG_TARGET, - ?candidate_hash, - pov_size_estimate, - small_pov_limit, - enabled = prefer_backing_group, - "Prefer fetch from backing group", - ); - - backing_group = backing_group.filter(|_| { - // We keep the backing group only if `1/3` of chunks sum up to less than - // `small_pov_limit`. - prefer_backing_group - }); - } - } - - let phase = backing_group - .and_then(|g| session_info.validator_groups.get(g)) - .map(|group| { - Source::RequestFromBackers(RequestFromBackers::new( - group.clone(), - erasure_task_tx.clone(), - )) - }) - .unwrap_or_else(|| { - Source::RequestChunks(RequestChunksFromValidators::new( - params.validators.len() as _, - erasure_task_tx.clone(), - )) - }); - - let recovery_task = - RecoveryTask { sender: ctx.sender().clone(), params, source: phase, erasure_task_tx }; + let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategy); let (remote, remote_handle) = recovery_task.run().remote_handle(); @@ -1062,8 +383,8 @@ async fn handle_recover( backing_group: Option, response_sender: oneshot::Sender>, metrics: &Metrics, - recovery_strategy: &RecoveryStrategy, erasure_task_tx: futures::channel::mpsc::Sender, + recovery_strategy_kind: RecoveryStrategyKind, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -1098,19 +419,75 @@ async fn handle_recover( let _span = span.child("session-info-ctx-received"); match session_info { - Some(session_info) => + Some(session_info) => { + let mut skip_backing_group_if: Box bool + Send> = Box::new(|| false); + + if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) = + recovery_strategy_kind + { + // Get our own chunk size to get an estimate of the PoV size. + let chunk_size: Result, error::Error> = + query_chunk_size(ctx, candidate_hash).await; + if let Ok(Some(chunk_size)) = chunk_size { + let pov_size_estimate = + chunk_size.saturating_mul(session_info.validators.len()) / 3; + let prefer_backing_group = pov_size_estimate < small_pov_limit; + + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + pov_size_estimate, + small_pov_limit, + enabled = prefer_backing_group, + "Prefer fetch from backing group", + ); + + skip_backing_group_if = Box::new(move || !prefer_backing_group); + } + }; + + let backing_validators = if let Some(backing_group) = backing_group { + session_info.validator_groups.get(backing_group) + } else { + None + }; + + let fetch_chunks_params = FetchChunksParams { + n_validators: session_info.validators.len(), + erasure_task_tx: erasure_task_tx.clone(), + }; + + let recovery_strategy = if let Some(backing_validators) = backing_validators { + match recovery_strategy_kind { + RecoveryStrategyKind::BackersFirstAlways | + RecoveryStrategyKind::BackersFirstIfSizeLower(_) | + RecoveryStrategyKind::BypassAvailabilityStore => RecoveryStrategy::new() + .then_fetch_full_from_backers(FetchFullParams { + group_name: "backers", + validators: backing_validators.to_vec(), + skip_if: skip_backing_group_if, + erasure_task_tx, + }) + .then_fetch_chunks_from_validators(fetch_chunks_params), + RecoveryStrategyKind::ChunksAlways => RecoveryStrategy::new() + .then_fetch_chunks_from_validators(fetch_chunks_params), + } + } else { + RecoveryStrategy::new().then_fetch_chunks_from_validators(fetch_chunks_params) + }; + launch_recovery_task( state, ctx, session_info, receipt, - backing_group, response_sender, metrics, - recovery_strategy, - erasure_task_tx, + *recovery_strategy, + recovery_strategy_kind == RecoveryStrategyKind::BypassAvailabilityStore, ) - .await, + .await + }, None => { gum::warn!(target: LOG_TARGET, "SessionInfo is `None` at {:?}", state.live_block); response_sender @@ -1155,7 +532,11 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { recovery_strategy: RecoveryStrategy::BypassAvailabilityStore, req_receiver, metrics } + Self { + recovery_strategy_kind: RecoveryStrategyKind::BypassAvailabilityStore, + req_receiver, + metrics, + } } /// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to @@ -1164,7 +545,11 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { recovery_strategy: RecoveryStrategy::BackersFirstAlways, req_receiver, metrics } + Self { + recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways, + req_receiver, + metrics, + } } /// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks @@ -1172,7 +557,7 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { recovery_strategy: RecoveryStrategy::ChunksAlways, req_receiver, metrics } + Self { recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways, req_receiver, metrics } } /// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is @@ -1182,7 +567,7 @@ impl AvailabilityRecoverySubsystem { metrics: Metrics, ) -> Self { Self { - recovery_strategy: RecoveryStrategy::BackersFirstIfSizeLower(SMALL_POV_LIMIT), + recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT), req_receiver, metrics, } @@ -1190,7 +575,7 @@ impl AvailabilityRecoverySubsystem { async fn run(self, mut ctx: Context) -> SubsystemResult<()> { let mut state = State::default(); - let Self { recovery_strategy, mut req_receiver, metrics } = self; + let Self { mut req_receiver, metrics, recovery_strategy_kind } = self; let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); let mut erasure_task_rx = erasure_task_rx.fuse(); @@ -1275,11 +660,11 @@ impl AvailabilityRecoverySubsystem { &mut ctx, receipt, session_index, - maybe_backing_group.filter(|_| recovery_strategy.needs_backing_group()), + maybe_backing_group, response_sender, &metrics, - &recovery_strategy, erasure_task_tx.clone(), + recovery_strategy_kind.clone(), ).await { gum::warn!( target: LOG_TARGET, @@ -1295,7 +680,7 @@ impl AvailabilityRecoverySubsystem { in_req = recv_req => { match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? { Ok(req) => { - if recovery_strategy == RecoveryStrategy::BypassAvailabilityStore { + if recovery_strategy_kind == RecoveryStrategyKind::BypassAvailabilityStore { gum::debug!( target: LOG_TARGET, "Skipping request to availability-store.", diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs new file mode 100644 index 000000000000..854dc26b4dd5 --- /dev/null +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -0,0 +1,938 @@ +use crate::{ + futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask, + LOG_TARGET, +}; +use futures::{channel::oneshot, SinkExt}; +#[cfg(not(test))] +use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT; +use polkadot_node_network_protocol::request_response::{ + self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests, +}; +use polkadot_node_primitives::{AvailableData, ErasureChunk}; +use polkadot_node_subsystem::{ + messages::{AvailabilityStoreMessage, NetworkBridgeTxMessage}, + overseer, RecoveryError, +}; +use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash, ValidatorIndex}; +use rand::seq::SliceRandom; +use sc_network::{IfDisconnected, OutboundFailure, RequestFailure}; +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; + +// How many parallel recovery tasks should be running at once. +const N_PARALLEL: usize = 50; + +/// Time after which we consider a request to have failed +/// +/// and we should try more peers. Note in theory the request times out at the network level, +/// measurements have shown, that in practice requests might actually take longer to fail in +/// certain occasions. (The very least, authority discovery is not part of the timeout.) +/// +/// For the time being this value is the same as the timeout on the networking layer, but as this +/// timeout is more soft than the networking one, it might make sense to pick different values as +/// well. +#[cfg(not(test))] +const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT; +#[cfg(test)] +const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100); + +pub struct RecoveryParams { + /// Discovery ids of `validators`. + pub(crate) validator_authority_keys: Vec, + + /// Number of validators relevant to this `RecoveryTask`. + pub(crate) n_validators: usize, + + /// The number of pieces needed. + pub(crate) threshold: usize, + + /// A hash of the relevant candidate. + pub(crate) candidate_hash: CandidateHash, + + /// The root of the erasure encoding of the para block. + pub(crate) erasure_root: Hash, + + /// Metrics to report + pub(crate) metrics: Metrics, + + /// Do not request data from availability-store + pub(crate) bypass_availability_store: bool, +} +/// Represents intermediate data that must be passed between `RecoveryStrategy`s belonging to the +/// same `RecoveryTask` or data that is used by state methods common to multiple RecoveryStrategies. +pub struct State { + /// Chunks received so far. + received_chunks: HashMap, + requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, +} + +impl State { + fn new() -> Self { + Self { received_chunks: HashMap::new(), requesting_chunks: FuturesUndead::new() } + } + + fn insert_chunk(&mut self, validator: ValidatorIndex, chunk: ErasureChunk) { + self.received_chunks.insert(validator, chunk); + } + + fn chunk_count(&self) -> usize { + self.received_chunks.len() + } + + async fn populate_from_av_store( + &mut self, + params: &RecoveryParams, + sender: &mut Sender, + ) -> Vec { + let (tx, rx) = oneshot::channel(); + sender + .send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx)) + .await; + + match rx.await { + Ok(chunks) => { + // This should either be length 1 or 0. If we had the whole data, + // we wouldn't have reached this stage. + let chunk_indices: Vec<_> = chunks.iter().map(|c| c.index).collect(); + + for chunk in chunks { + if is_chunk_valid(params, &chunk) { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + validator_index = ?chunk.index, + "Found valid chunk on disk" + ); + self.insert_chunk(chunk.index, chunk); + } else { + gum::error!( + target: LOG_TARGET, + "Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!" + ); + }; + } + + chunk_indices + }, + Err(oneshot::Canceled) => { + gum::warn!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + "Failed to reach the availability store" + ); + + vec![] + }, + } + } + + async fn launch_parallel_chunk_requests( + &mut self, + params: &RecoveryParams, + sender: &mut Sender, + desired_requests_count: usize, + validators: &mut VecDeque, + ) where + Sender: overseer::AvailabilityRecoverySenderTrait, + { + let candidate_hash = ¶ms.candidate_hash; + let already_requesting_count = self.requesting_chunks.len(); + + let mut requests = Vec::with_capacity(desired_requests_count - already_requesting_count); + + while self.requesting_chunks.len() < desired_requests_count { + if let Some(validator_index) = validators.pop_back() { + let validator = params.validator_authority_keys[validator_index.0 as usize].clone(); + gum::trace!( + target: LOG_TARGET, + ?validator, + ?validator_index, + ?candidate_hash, + "Requesting chunk", + ); + + // Request data. + let raw_request = req_res::v1::ChunkFetchingRequest { + candidate_hash: params.candidate_hash, + index: validator_index, + }; + + let (req, res) = OutgoingRequest::new(Recipient::Authority(validator), raw_request); + requests.push(Requests::ChunkFetchingV1(req)); + + params.metrics.on_chunk_request_issued(); + let timer = params.metrics.time_chunk_request(); + + self.requesting_chunks.push(Box::pin(async move { + let _timer = timer; + match res.await { + Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => + Ok(Some(chunk.recombine_into_chunk(&raw_request))), + Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) => Ok(None), + Err(e) => Err((validator_index, e)), + } + })); + } else { + break + } + } + + sender + .send_message(NetworkBridgeTxMessage::SendRequests( + requests, + IfDisconnected::TryConnect, + )) + .await; + } + + /// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`. + async fn wait_for_chunks( + &mut self, + params: &RecoveryParams, + validators: &mut VecDeque, + can_conclude: impl Fn(usize, usize, usize, &RecoveryParams, usize) -> bool, + ) -> (usize, usize) { + let metrics = ¶ms.metrics; + + let mut total_received_responses = 0; + let mut error_count = 0; + + // Wait for all current requests to conclude or time-out, or until we reach enough chunks. + // We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will + // return in that case for `launch_parallel_requests` to fill up slots again. + while let Some(request_result) = + self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await + { + total_received_responses += 1; + + match request_result { + Ok(Some(chunk)) => + if is_chunk_valid(params, &chunk) { + metrics.on_chunk_request_succeeded(); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + validator_index = ?chunk.index, + "Received valid chunk", + ); + self.insert_chunk(chunk.index, chunk); + } else { + metrics.on_chunk_request_invalid(); + error_count += 1; + }, + Ok(None) => { + metrics.on_chunk_request_no_such_chunk(); + error_count += 1; + }, + Err((validator_index, e)) => { + error_count += 1; + + gum::trace!( + target: LOG_TARGET, + candidate_hash= ?params.candidate_hash, + err = ?e, + ?validator_index, + "Failure requesting chunk", + ); + + match e { + RequestError::InvalidResponse(_) => { + metrics.on_chunk_request_invalid(); + + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + err = ?e, + ?validator_index, + "Chunk fetching response was invalid", + ); + }, + RequestError::NetworkError(err) => { + // No debug logs on general network errors - that became very spammy + // occasionally. + if let RequestFailure::Network(OutboundFailure::Timeout) = err { + metrics.on_chunk_request_timeout(); + } else { + metrics.on_chunk_request_error(); + } + + validators.push_front(validator_index); + }, + RequestError::Canceled(_) => { + metrics.on_chunk_request_error(); + + validators.push_front(validator_index); + }, + } + }, + } + + // Stop waiting for requests when we either can already recover the data + // or have gotten firm 'No' responses from enough validators. + if can_conclude( + validators.len(), + self.requesting_chunks.total_len(), + self.chunk_count(), + params, + error_count, + ) { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?params.candidate_hash, + received_chunks_count = ?self.chunk_count(), + requested_chunks_count = ?self.requesting_chunks.len(), + threshold = ?params.threshold, + "Can conclude availability for a candidate", + ); + break + } + } + + (total_received_responses, error_count) + } +} + +/// A stateful reconstruction of availability data in reference to +/// a candidate hash. +pub struct RecoveryTask { + sender: Sender, + /// The common parameters of the recovery process, regardless of the strategy. + params: RecoveryParams, + strategy: RecoveryStrategy, + state: State, +} + +impl RecoveryTask { + pub fn new(sender: Sender, params: RecoveryParams, strategy: RecoveryStrategy) -> Self { + Self { sender, params, strategy, state: State::new() } + } +} + +impl RecoveryTask +where + Sender: overseer::AvailabilityRecoverySenderTrait, +{ + async fn in_availability_store(&mut self) -> Option { + if !self.params.bypass_availability_store { + let (tx, rx) = oneshot::channel(); + self.sender + .send_message(AvailabilityStoreMessage::QueryAvailableData( + self.params.candidate_hash, + tx, + )) + .await; + + match rx.await { + Ok(Some(data)) => return Some(data), + Ok(None) => {}, + Err(oneshot::Canceled) => { + gum::warn!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Failed to reach the availability store", + ) + }, + } + } + + None + } + + pub async fn run(mut self) -> Result { + if let Some(data) = self.in_availability_store().await { + return Ok(data) + } + + self.params.metrics.on_recovery_started(); + + let _timer = self.params.metrics.time_full_recovery(); + + let res = loop { + let (current_strategy, next_strategy) = self.strategy.pop_first(); + self.strategy = next_strategy; + + // Make sure we are not referencing futures from past RecoveryStrategy runs. + if self.state.requesting_chunks.total_len() != 0 { + self.state.requesting_chunks = FuturesUndead::new(); + } + + let recovery_strategy_name = current_strategy.display_name(); + + if let Some(name) = recovery_strategy_name { + gum::info!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Starting `{}` strategy", + &name, + ); + } + + let res = match current_strategy { + RecoveryStrategy::Nil => Err(RecoveryError::Unavailable), + RecoveryStrategy::FullFromBackers(inner, _) => + inner.run(&mut self.state, &mut self.sender, &self.params).await, + RecoveryStrategy::ChunksFromValidators(inner, _) => + inner.run(&mut self.state, &mut self.sender, &self.params).await, + }; + + match res { + Err(RecoveryError::Unavailable) => { + if !matches!(&self.strategy, RecoveryStrategy::Nil) { + if let Some(recovery_strategy_name) = recovery_strategy_name { + gum::warn!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Recovery strategy `{}` did not conclude. Trying the next one.", + recovery_strategy_name, + ); + } + continue + } else { + // We have no other strategies to try. + gum::error!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Recovery of available data failed.", + ); + break Err(RecoveryError::Unavailable) + } + }, + Err(err) => break Err(err), + Ok(data) => break Ok(data), + } + }; + + match &res { + Ok(_) => self.params.metrics.on_recovery_succeeded(), + Err(RecoveryError::Invalid) => self.params.metrics.on_recovery_invalid(), + Err(_) => self.params.metrics.on_recovery_failed(), + } + + res + } +} + +pub enum RecoveryStrategy { + Nil, + FullFromBackers(FetchFull, Box), + ChunksFromValidators(FetchChunks, Box), +} + +impl RecoveryStrategy { + pub fn new() -> Box { + Box::new(RecoveryStrategy::Nil) + } + + fn display_name(&self) -> Option<&'static str> { + match self { + Self::Nil => None, + Self::FullFromBackers(_, _) => Some("Full recovery from backers"), + Self::ChunksFromValidators(_, _) => Some("Chunks recovery"), + } + } + + pub fn then_fetch_full_from_backers(self: Box, params: FetchFullParams) -> Box { + match *self { + Self::Nil => Box::new(Self::FullFromBackers(FetchFull::new(params), self)), + Self::ChunksFromValidators(task, next) => { + let next = next.then_fetch_full_from_backers(params); + Box::new(Self::ChunksFromValidators(task, next)) + }, + Self::FullFromBackers(task, next) => { + let next = next.then_fetch_full_from_backers(params); + Box::new(Self::FullFromBackers(task, next)) + }, + } + } + + pub fn then_fetch_chunks_from_validators( + self: Box, + params: FetchChunksParams, + ) -> Box { + match *self { + Self::Nil => Box::new(Self::ChunksFromValidators(FetchChunks::new(params), self)), + Self::ChunksFromValidators(task, next) => { + let next = next.then_fetch_chunks_from_validators(params); + Box::new(Self::ChunksFromValidators(task, next)) + }, + Self::FullFromBackers(task, next) => { + let next = next.then_fetch_chunks_from_validators(params); + Box::new(Self::FullFromBackers(task, next)) + }, + } + } + + fn pop_first(self: Self) -> (Self, Self) { + match self { + Self::Nil => (Self::Nil, Self::Nil), + Self::FullFromBackers(inner, next) => + (Self::FullFromBackers(inner, Box::new(Self::Nil)), *next), + Self::ChunksFromValidators(inner, next) => + (Self::ChunksFromValidators(inner, Box::new(Self::Nil)), *next), + } + } +} + +pub struct FetchFull { + params: FetchFullParams, +} + +pub struct FetchFullParams { + pub(crate) group_name: &'static str, + pub(crate) validators: Vec, + pub(crate) skip_if: Box bool + Send>, + // channel to the erasure task handler. + pub(crate) erasure_task_tx: futures::channel::mpsc::Sender, +} + +impl FetchFull { + fn new(params: FetchFullParams) -> Self { + Self { params } + } + + async fn run( + mut self, + _: &mut State, + sender: &mut Sender, + common_params: &RecoveryParams, + ) -> Result { + if (self.params.skip_if)() { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Skipping requesting availability data from {}", + self.params.group_name + ); + + return Err(RecoveryError::Unavailable) + } + + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Requesting full availability data from {}", + self.params.group_name + ); + loop { + // Pop the next validator, and proceed to next fetch_chunks_task if we're out. + let validator_index = + self.params.validators.pop().ok_or_else(|| RecoveryError::Unavailable)?; + + // Request data. + let (req, response) = OutgoingRequest::new( + Recipient::Authority( + common_params.validator_authority_keys[validator_index.0 as usize].clone(), + ), + req_res::v1::AvailableDataFetchingRequest { + candidate_hash: common_params.candidate_hash, + }, + ); + + sender + .send_message(NetworkBridgeTxMessage::SendRequests( + vec![Requests::AvailableDataFetchingV1(req)], + IfDisconnected::ImmediateError, + )) + .await; + + match response.await { + Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { + let (reencode_tx, reencode_rx) = oneshot::channel(); + self.params + .erasure_task_tx + .send(ErasureTask::Reencode( + common_params.n_validators, + common_params.erasure_root, + data, + reencode_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + let reencode_response = + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; + + if let Some(data) = reencode_response { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + "Received full data", + ); + + return Ok(data) + } else { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + ?validator_index, + "Invalid data response", + ); + + // it doesn't help to report the peer with req/res. + } + }, + Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {}, + Err(e) => gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + ?validator_index, + err = ?e, + "Error fetching full available data." + ), + } + } + } +} + +pub struct FetchChunks { + /// How many request have been unsuccessful so far. + error_count: usize, + /// Total number of responses that have been received. + /// + /// including failed ones. + total_received_responses: usize, + + /// a random shuffling of the validators which indicates the order in which we connect to the + /// validators and request the chunk from them. + validators: VecDeque, + + // channel to the erasure task handler. + erasure_task_tx: futures::channel::mpsc::Sender, +} + +pub struct FetchChunksParams { + pub(crate) n_validators: usize, + // channel to the erasure task handler. + pub(crate) erasure_task_tx: futures::channel::mpsc::Sender, +} + +impl FetchChunks { + fn new(params: FetchChunksParams) -> Self { + let mut shuffling: Vec<_> = (0..params.n_validators) + .map(|i| ValidatorIndex(i.try_into().expect("number of validators must fit in a u32"))) + .collect(); + shuffling.shuffle(&mut rand::thread_rng()); + + Self { + error_count: 0, + total_received_responses: 0, + validators: shuffling.into(), + erasure_task_tx: params.erasure_task_tx, + } + } + + fn is_unavailable( + unrequested_validators: usize, + in_flight_requests: usize, + chunk_count: usize, + threshold: usize, + ) -> bool { + is_unavailable(chunk_count, in_flight_requests, unrequested_validators, threshold) + } + + /// Desired number of parallel requests. + /// + /// For the given threshold (total required number of chunks) get the desired number of + /// requests we want to have running in parallel at this time. + fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize { + // Upper bound for parallel requests. + // We want to limit this, so requests can be processed within the timeout and we limit the + // following feedback loop: + // 1. Requests fail due to timeout + // 2. We request more chunks to make up for it + // 3. Bandwidth is spread out even more, so we get even more timeouts + // 4. We request more chunks to make up for it ... + let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold); + // How many chunks are still needed? + let remaining_chunks = threshold.saturating_sub(chunk_count); + // What is the current error rate, so we can make up for it? + let inv_error_rate = + self.total_received_responses.checked_div(self.error_count).unwrap_or(0); + // Actual number of requests we want to have in flight in parallel: + std::cmp::min( + max_requests_boundary, + remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0), + ) + } + + async fn run( + mut self, + state: &mut State, + sender: &mut Sender, + common_params: &RecoveryParams, + ) -> Result { + // First query the store for any chunks we've got. + if !common_params.bypass_availability_store { + let local_chunk_indices = state.populate_from_av_store(common_params, sender).await; + self.validators.retain(|i| !local_chunk_indices.contains(i)); + } + + // No need to query the validators that have the chunks we already received. + self.validators.retain(|i| !state.received_chunks.contains_key(i)); + + loop { + // If received_chunks has more than threshold entries, attempt to recover the data. + // If that fails, or a re-encoding of it doesn't match the expected erasure root, + // return Err(RecoveryError::Invalid). + // Do this before requesting any chunks because we may have enough of them coming from + // past RecoveryStrategies. + if state.chunk_count() >= common_params.threshold { + return self.attempt_recovery(state, common_params).await + } + + if Self::is_unavailable( + self.validators.len(), + state.requesting_chunks.total_len(), + state.chunk_count(), + common_params.threshold, + ) { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + received = %state.chunk_count(), + requesting = %state.requesting_chunks.len(), + total_requesting = %state.requesting_chunks.total_len(), + n_validators = %common_params.n_validators, + "Data recovery from chunks is not possible", + ); + + return Err(RecoveryError::Unavailable) + } + + let desired_requests_count = + self.get_desired_request_count(state.chunk_count(), common_params.threshold); + let already_requesting_count = state.requesting_chunks.len(); + gum::debug!( + target: LOG_TARGET, + ?common_params.candidate_hash, + ?desired_requests_count, + error_count= ?self.error_count, + total_received = ?self.total_received_responses, + threshold = ?common_params.threshold, + ?already_requesting_count, + "Requesting availability chunks for a candidate", + ); + state + .launch_parallel_chunk_requests( + common_params, + sender, + desired_requests_count, + &mut self.validators, + ) + .await; + + let (total_responses, error_count) = state + .wait_for_chunks( + common_params, + &mut self.validators, + |unrequested_validators, reqs, chunk_count, params, _error_count| { + chunk_count >= params.threshold || + Self::is_unavailable( + unrequested_validators, + reqs, + chunk_count, + params.threshold, + ) + }, + ) + .await; + + self.total_received_responses += total_responses; + self.error_count += error_count; + } + } + + async fn attempt_recovery( + &mut self, + state: &mut State, + common_params: &RecoveryParams, + ) -> Result { + let recovery_duration = common_params.metrics.time_erasure_recovery(); + + // Send request to reconstruct available data from chunks. + let (avilable_data_tx, available_data_rx) = oneshot::channel(); + self.erasure_task_tx + .send(ErasureTask::Reconstruct( + common_params.n_validators, + // Safe to leave an empty vec in place, as we're stopping the recovery process if + // this reconstruct fails. + std::mem::take(&mut state.received_chunks), + avilable_data_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + let available_data_response = + available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; + + match available_data_response { + Ok(data) => { + // Send request to re-encode the chunks and check merkle root. + let (reencode_tx, reencode_rx) = oneshot::channel(); + self.erasure_task_tx + .send(ErasureTask::Reencode( + common_params.n_validators, + common_params.erasure_root, + data, + reencode_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + let reencode_response = + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; + + if let Some(data) = reencode_response { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Data recovery from chunks complete", + ); + + Ok(data) + } else { + recovery_duration.map(|rd| rd.stop_and_discard()); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Data recovery error - root mismatch", + ); + + Err(RecoveryError::Invalid) + } + }, + Err(err) => { + recovery_duration.map(|rd| rd.stop_and_discard()); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + ?err, + "Data recovery error ", + ); + + Err(RecoveryError::Invalid) + }, + } + } +} + +#[cfg(test)] +mod tests { + use std::ops::Deref; + + use super::*; + use assert_matches::assert_matches; + use polkadot_erasure_coding::recovery_threshold; + use RecoveryStrategy::*; + + impl std::fmt::Debug for RecoveryStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Nil => write!(f, "Nil"), + ChunksFromValidators(_, next) => + write!(f, "{:?} -> {}", self.display_name(), format!("{next:?}")), + FullFromBackers(_, next) => + write!(f, "{:?} -> {}", self.display_name(), format!("{next:?}")), + } + } + } + + #[test] + fn test_recovery_strategy_linked_list_ops() { + let fetch_full_params = FetchFullParams { + group_name: "backers", + validators: vec![], + skip_if: Box::new(|| true), + erasure_task_tx: futures::channel::mpsc::channel(0).0, + }; + let fetch_full_params_2 = FetchFullParams { + group_name: "approval-checkers", + validators: vec![], + skip_if: Box::new(|| true), + erasure_task_tx: futures::channel::mpsc::channel(0).0, + }; + + let fetch_chunks_params = FetchChunksParams { + n_validators: 2, + erasure_task_tx: futures::channel::mpsc::channel(0).0, + }; + let fetch_chunks_params_2 = FetchChunksParams { + n_validators: 3, + erasure_task_tx: futures::channel::mpsc::channel(0).0, + }; + let recovery_strategy = RecoveryStrategy::new() + .then_fetch_full_from_backers(fetch_full_params) + .then_fetch_full_from_backers(fetch_full_params_2) + .then_fetch_chunks_from_validators(fetch_chunks_params) + .then_fetch_chunks_from_validators(fetch_chunks_params_2); + + // Check that the builder correctly chains strategies. + assert_matches!( + recovery_strategy.deref(), + FullFromBackers(_, next) + if matches!(next.deref(), FullFromBackers(_, next) + if matches!(next.deref(), ChunksFromValidators(_, next) + if matches!(next.deref(), ChunksFromValidators(_, next) + if matches!(next.deref(), Nil) + ) + ) + ) + ); + + // Check the order for the `pop_first` operation. + let (current, next) = recovery_strategy.pop_first(); + assert_matches!(current, FullFromBackers(task, next) if task.params.group_name == "backers" && matches!(*next, Nil)); + assert_matches!(&next, FullFromBackers(task, _) if task.params.group_name == "approval-checkers"); + + let (current, next) = next.pop_first(); + assert_matches!(current, FullFromBackers(task, next) if task.params.group_name == "approval-checkers" && matches!(*next, Nil)); + assert_matches!(&next, ChunksFromValidators(task, _) if task.validators.len() == 2); + + let (current, next) = next.pop_first(); + assert_matches!(current, ChunksFromValidators(task, next) if task.validators.len() == 2 && matches!(*next, Nil)); + assert_matches!(&next, ChunksFromValidators(task, _) if task.validators.len() == 3); + + let (current, next) = next.pop_first(); + assert_matches!(current, ChunksFromValidators(task, next) if task.validators.len() == 3 && matches!(*next, Nil)); + assert_matches!(&next, Nil); + + let (current, next) = next.pop_first(); + assert_matches!(current, Nil); + assert_matches!(next, Nil); + } + + #[test] + fn parallel_request_calculation_works_as_expected() { + let num_validators = 100; + let threshold = recovery_threshold(num_validators).unwrap(); + let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16); + + let mut fetch_chunks_task = + FetchChunks::new(FetchChunksParams { n_validators: 100, erasure_task_tx }); + assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold); + fetch_chunks_task.error_count = 1; + fetch_chunks_task.total_received_responses = 1; + // We saturate at threshold (34): + assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold); + + fetch_chunks_task.total_received_responses = 2; + // With given error rate - still saturating: + assert_eq!(fetch_chunks_task.get_desired_request_count(1, threshold), threshold); + fetch_chunks_task.total_received_responses += 8; + // error rate: 1/10 + // remaining chunks needed: threshold (34) - 9 + // expected: 24 * (1+ 1/10) = (next greater integer) = 27 + assert_eq!(fetch_chunks_task.get_desired_request_count(9, threshold), 27); + fetch_chunks_task.error_count = 0; + // With error count zero - we should fetch exactly as needed: + assert_eq!(fetch_chunks_task.get_desired_request_count(10, threshold), threshold - 10); + } +} diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index 60c2d38ab31b..17b02080c008 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -21,15 +21,19 @@ use futures::{executor, future}; use futures_timer::Delay; use parity_scale_codec::Encode; -use polkadot_node_network_protocol::request_response::{IncomingRequest, ReqProtocolNames}; +use polkadot_node_network_protocol::request_response::{ + self as req_res, IncomingRequest, Recipient, ReqProtocolNames, Requests, +}; use super::*; -use sc_network::config::RequestResponseConfig; +use sc_network::{config::RequestResponseConfig, IfDisconnected, OutboundFailure, RequestFailure}; use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; use polkadot_node_primitives::{BlockData, PoV, Proof}; -use polkadot_node_subsystem::messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}; +use polkadot_node_subsystem::messages::{ + AllMessages, NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest, +}; use polkadot_node_subsystem_test_helpers::{ make_subsystem_context, mock::new_leaf, TestSubsystemContextHandle, }; @@ -204,7 +208,7 @@ use sp_keyring::Sr25519Keyring; enum Has { No, Yes, - NetworkError(sc_network::RequestFailure), + NetworkError(RequestFailure), /// Make request not return at all, instead the sender is returned from the function. /// /// Note, if you use `DoesNotReturn` you have to keep the returned senders alive, otherwise the @@ -214,7 +218,7 @@ enum Has { impl Has { fn timeout() -> Self { - Has::NetworkError(sc_network::RequestFailure::Network(sc_network::OutboundFailure::Timeout)) + Has::NetworkError(RequestFailure::Network(OutboundFailure::Timeout)) } } @@ -393,7 +397,7 @@ impl TestState { candidate_hash: CandidateHash, virtual_overseer: &mut VirtualOverseer, who_has: impl Fn(usize) -> Has, - ) -> Vec, sc_network::RequestFailure>>> { + ) -> Vec, RequestFailure>>> { let mut senders = Vec::new(); for _ in 0..self.validators.len() { // Receive a request for a chunk. @@ -1546,36 +1550,3 @@ fn invalid_local_chunk_is_ignored() { (virtual_overseer, req_cfg) }); } - -#[test] -fn parallel_request_calculation_works_as_expected() { - let num_validators = 100; - let threshold = recovery_threshold(num_validators).unwrap(); - let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16); - - let mut phase = RequestChunksFromValidators::new(100, erasure_task_tx); - assert_eq!(phase.get_desired_request_count(threshold), threshold); - phase.error_count = 1; - phase.total_received_responses = 1; - // We saturate at threshold (34): - assert_eq!(phase.get_desired_request_count(threshold), threshold); - - let dummy_chunk = - ErasureChunk { chunk: Vec::new(), index: ValidatorIndex(0), proof: Proof::dummy_proof() }; - phase.insert_chunk(ValidatorIndex(0), dummy_chunk.clone()); - phase.total_received_responses = 2; - // With given error rate - still saturating: - assert_eq!(phase.get_desired_request_count(threshold), threshold); - for i in 1..9 { - phase.insert_chunk(ValidatorIndex(i), dummy_chunk.clone()); - } - phase.total_received_responses += 8; - // error rate: 1/10 - // remaining chunks needed: threshold (34) - 9 - // expected: 24 * (1+ 1/10) = (next greater integer) = 27 - assert_eq!(phase.get_desired_request_count(threshold), 27); - phase.insert_chunk(ValidatorIndex(9), dummy_chunk.clone()); - phase.error_count = 0; - // With error count zero - we should fetch exactly as needed: - assert_eq!(phase.get_desired_request_count(threshold), threshold - phase.chunk_count()); -} From 166aae5e9c8d8b377d5227c444d7e4d5aeb50cb0 Mon Sep 17 00:00:00 2001 From: alindima Date: Fri, 8 Sep 2023 11:32:58 +0300 Subject: [PATCH 02/12] add copright header --- .../network/availability-recovery/src/task.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 854dc26b4dd5..8046c1a8ff96 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -1,3 +1,21 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Recovery task and associated strategies. + use crate::{ futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask, LOG_TARGET, From 518d8fd7941682ea866a0e3edd55bfe392c48dc6 Mon Sep 17 00:00:00 2001 From: alindima Date: Fri, 8 Sep 2023 13:35:08 +0300 Subject: [PATCH 03/12] fix clippy --- polkadot/node/network/availability-recovery/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 8046c1a8ff96..927c96092222 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -482,7 +482,7 @@ impl RecoveryStrategy { } } - fn pop_first(self: Self) -> (Self, Self) { + fn pop_first(self) -> (Self, Self) { match self { Self::Nil => (Self::Nil, Self::Nil), Self::FullFromBackers(inner, next) => From dae34a535eebfb53637ec2d9476f677611931527 Mon Sep 17 00:00:00 2001 From: alindima Date: Fri, 8 Sep 2023 17:11:13 +0300 Subject: [PATCH 04/12] Refactor RecoveryStrategy using dynamic dispatch --- Cargo.lock | 1 + .../network/availability-recovery/Cargo.toml | 1 + .../network/availability-recovery/src/lib.rs | 46 +- .../network/availability-recovery/src/task.rs | 491 +++++++----------- 4 files changed, 221 insertions(+), 318 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a95a7e2561d6..70c334446ba4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11570,6 +11570,7 @@ name = "polkadot-availability-recovery" version = "1.0.0" dependencies = [ "assert_matches", + "async-trait", "env_logger 0.9.3", "fatality", "futures", diff --git a/polkadot/node/network/availability-recovery/Cargo.toml b/polkadot/node/network/availability-recovery/Cargo.toml index 07ff09c7e70e..42c3abef547b 100644 --- a/polkadot/node/network/availability-recovery/Cargo.toml +++ b/polkadot/node/network/availability-recovery/Cargo.toml @@ -11,6 +11,7 @@ schnellru = "0.2.1" rand = "0.8.5" fatality = "0.0.6" thiserror = "1.0.48" +async-trait = "0.1.73" gum = { package = "tracing-gum", path = "../../gum" } polkadot-erasure-coding = { path = "../../../erasure-coding" } diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 63bfc82e3d7b..aaf149cdb6a1 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -18,7 +18,12 @@ #![warn(missing_docs)] -use std::{collections::HashMap, iter::Iterator, num::NonZeroUsize, pin::Pin}; +use std::{ + collections::{HashMap, VecDeque}, + iter::Iterator, + num::NonZeroUsize, + pin::Pin, +}; use futures::{ channel::oneshot, @@ -30,7 +35,7 @@ use futures::{ task::{Context, Poll}, }; use schnellru::{ByLength, LruMap}; -use task::{FetchChunksParams, FetchFullParams}; +use task::{FetchChunks, FetchChunksParams, FetchFull, FetchFullParams}; use fatality::Nested; use polkadot_erasure_coding::{ @@ -47,8 +52,8 @@ use polkadot_node_subsystem::{ errors::RecoveryError, jaeger, messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage}, - overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, - SubsystemResult, + overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, + SubsystemContext, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::request_session_info; use polkadot_primitives::{ @@ -103,7 +108,7 @@ pub struct AvailabilityRecoverySubsystem { } /// Expensive erasure coding computations that we want to run on a blocking thread. -enum ErasureTask { +pub enum ErasureTask { /// Reconstructs `AvailableData` from chunks given `n_validators`. Reconstruct( usize, @@ -287,7 +292,6 @@ struct State { /// An LRU cache of recently recovered data. availability_lru: LruMap, - // TODO: an LRU cache of erasure indices shuffling of all validators (per relay-parent). } impl Default for State { @@ -338,7 +342,7 @@ async fn launch_recovery_task( receipt: CandidateReceipt, response_sender: oneshot::Sender>, metrics: &Metrics, - recovery_strategy: RecoveryStrategy, + recovery_strategies: VecDeque::Sender>>>, bypass_availability_store: bool, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -352,7 +356,7 @@ async fn launch_recovery_task( bypass_availability_store, }; - let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategy); + let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies); let (remote, remote_handle) = recovery_task.run().remote_handle(); @@ -457,24 +461,26 @@ async fn handle_recover( erasure_task_tx: erasure_task_tx.clone(), }; - let recovery_strategy = if let Some(backing_validators) = backing_validators { + let mut recovery_strategies: VecDeque< + Box::Sender>>, + > = VecDeque::with_capacity(2); + + if let Some(backing_validators) = backing_validators { match recovery_strategy_kind { RecoveryStrategyKind::BackersFirstAlways | RecoveryStrategyKind::BackersFirstIfSizeLower(_) | - RecoveryStrategyKind::BypassAvailabilityStore => RecoveryStrategy::new() - .then_fetch_full_from_backers(FetchFullParams { + RecoveryStrategyKind::BypassAvailabilityStore => + recovery_strategies.push_back(Box::new(FetchFull::new(FetchFullParams { group_name: "backers", validators: backing_validators.to_vec(), skip_if: skip_backing_group_if, erasure_task_tx, - }) - .then_fetch_chunks_from_validators(fetch_chunks_params), - RecoveryStrategyKind::ChunksAlways => RecoveryStrategy::new() - .then_fetch_chunks_from_validators(fetch_chunks_params), - } - } else { - RecoveryStrategy::new().then_fetch_chunks_from_validators(fetch_chunks_params) - }; + }))), + RecoveryStrategyKind::ChunksAlways => {}, + }; + } + + recovery_strategies.push_back(Box::new(FetchChunks::new(fetch_chunks_params))); launch_recovery_task( state, @@ -483,7 +489,7 @@ async fn handle_recover( receipt, response_sender, metrics, - *recovery_strategy, + recovery_strategies, recovery_strategy_kind == RecoveryStrategyKind::BypassAvailabilityStore, ) .await diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 927c96092222..339cd0c18490 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -16,6 +16,8 @@ //! Recovery task and associated strategies. +#![warn(missing_docs)] + use crate::{ futures_undead::FuturesUndead, is_chunk_valid, is_unavailable, metrics::Metrics, ErasureTask, LOG_TARGET, @@ -56,33 +58,50 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT; #[cfg(test)] const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100); +#[async_trait::async_trait] +/// Common trait for runnable recovery strategies. +pub trait RecoveryStrategy: Send { + /// Main entry point of the strategy. + async fn run( + &mut self, + state: &mut State, + sender: &mut Sender, + common_params: &RecoveryParams, + ) -> Result; + + /// Return the name of the strategy for logging purposes. + fn display_name(&self) -> &'static str; +} + +/// Recovery parameters common to all strategies in a `RecoveryTask`. pub struct RecoveryParams { /// Discovery ids of `validators`. - pub(crate) validator_authority_keys: Vec, + pub validator_authority_keys: Vec, /// Number of validators relevant to this `RecoveryTask`. - pub(crate) n_validators: usize, + pub n_validators: usize, - /// The number of pieces needed. - pub(crate) threshold: usize, + /// The number of chunks needed. + pub threshold: usize, /// A hash of the relevant candidate. - pub(crate) candidate_hash: CandidateHash, + pub candidate_hash: CandidateHash, - /// The root of the erasure encoding of the para block. - pub(crate) erasure_root: Hash, + /// The root of the erasure encoding of the candidate. + pub erasure_root: Hash, - /// Metrics to report - pub(crate) metrics: Metrics, + /// Metrics to report. + pub metrics: Metrics, - /// Do not request data from availability-store - pub(crate) bypass_availability_store: bool, + /// Do not request data from availability-store. Useful for collators. + pub bypass_availability_store: bool, } -/// Represents intermediate data that must be passed between `RecoveryStrategy`s belonging to the -/// same `RecoveryTask` or data that is used by state methods common to multiple RecoveryStrategies. +/// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the +/// same `RecoveryTask`. pub struct State { /// Chunks received so far. received_chunks: HashMap, + /// Collection of in-flight requests. requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, } @@ -99,6 +118,7 @@ impl State { self.received_chunks.len() } + /// Retrieve the local chunks held in the av-store (either 0 or 1). async fn populate_from_av_store( &mut self, params: &RecoveryParams, @@ -146,6 +166,7 @@ impl State { } } + /// Launch chunk requests in parallel, according to the parameters. async fn launch_parallel_chunk_requests( &mut self, params: &RecoveryParams, @@ -314,24 +335,26 @@ impl State { /// A stateful reconstruction of availability data in reference to /// a candidate hash. -pub struct RecoveryTask { +pub struct RecoveryTask { sender: Sender, - /// The common parameters of the recovery process, regardless of the strategy. params: RecoveryParams, - strategy: RecoveryStrategy, + strategies: VecDeque>>, state: State, } -impl RecoveryTask { - pub fn new(sender: Sender, params: RecoveryParams, strategy: RecoveryStrategy) -> Self { - Self { sender, params, strategy, state: State::new() } - } -} - impl RecoveryTask where Sender: overseer::AvailabilityRecoverySenderTrait, { + /// Instantiate a new recovery task. + pub fn new( + sender: Sender, + params: RecoveryParams, + strategies: VecDeque>>, + ) -> Self { + Self { sender, params, strategies, state: State::new() } + } + async fn in_availability_store(&mut self) -> Option { if !self.params.bypass_availability_store { let (tx, rx) = oneshot::channel(); @@ -358,6 +381,8 @@ where None } + /// Run this recovery task to completion. It will loop through the configured strategies + /// in-order and return whenever the first one recovers the full `AvailableData`. pub async fn run(mut self) -> Result { if let Some(data) = self.in_availability_store().await { return Ok(data) @@ -368,57 +393,44 @@ where let _timer = self.params.metrics.time_full_recovery(); let res = loop { - let (current_strategy, next_strategy) = self.strategy.pop_first(); - self.strategy = next_strategy; - - // Make sure we are not referencing futures from past RecoveryStrategy runs. - if self.state.requesting_chunks.total_len() != 0 { - self.state.requesting_chunks = FuturesUndead::new(); - } - - let recovery_strategy_name = current_strategy.display_name(); + if let Some(mut current_strategy) = self.strategies.pop_front() { + // Make sure we are not referencing futures from past RecoveryStrategy runs. + if self.state.requesting_chunks.total_len() != 0 { + self.state.requesting_chunks = FuturesUndead::new(); + } - if let Some(name) = recovery_strategy_name { gum::info!( target: LOG_TARGET, candidate_hash = ?self.params.candidate_hash, "Starting `{}` strategy", - &name, + current_strategy.display_name(), ); - } - let res = match current_strategy { - RecoveryStrategy::Nil => Err(RecoveryError::Unavailable), - RecoveryStrategy::FullFromBackers(inner, _) => - inner.run(&mut self.state, &mut self.sender, &self.params).await, - RecoveryStrategy::ChunksFromValidators(inner, _) => - inner.run(&mut self.state, &mut self.sender, &self.params).await, - }; - - match res { - Err(RecoveryError::Unavailable) => { - if !matches!(&self.strategy, RecoveryStrategy::Nil) { - if let Some(recovery_strategy_name) = recovery_strategy_name { + let res = + current_strategy.run(&mut self.state, &mut self.sender, &self.params).await; + + match res { + Err(RecoveryError::Unavailable) => + if self.strategies.front().is_some() { gum::warn!( target: LOG_TARGET, candidate_hash = ?self.params.candidate_hash, "Recovery strategy `{}` did not conclude. Trying the next one.", - recovery_strategy_name, + current_strategy.display_name(), ); - } - continue - } else { - // We have no other strategies to try. - gum::error!( - target: LOG_TARGET, - candidate_hash = ?self.params.candidate_hash, - "Recovery of available data failed.", - ); - break Err(RecoveryError::Unavailable) - } - }, - Err(err) => break Err(err), - Ok(data) => break Ok(data), + continue + }, + Err(err) => break Err(err), + Ok(data) => break Ok(data), + } + } else { + // We have no other strategies to try. + gum::error!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Recovery of available data failed.", + ); + break Err(RecoveryError::Unavailable) } }; @@ -432,86 +444,40 @@ where } } -pub enum RecoveryStrategy { - Nil, - FullFromBackers(FetchFull, Box), - ChunksFromValidators(FetchChunks, Box), -} - -impl RecoveryStrategy { - pub fn new() -> Box { - Box::new(RecoveryStrategy::Nil) - } - - fn display_name(&self) -> Option<&'static str> { - match self { - Self::Nil => None, - Self::FullFromBackers(_, _) => Some("Full recovery from backers"), - Self::ChunksFromValidators(_, _) => Some("Chunks recovery"), - } - } - - pub fn then_fetch_full_from_backers(self: Box, params: FetchFullParams) -> Box { - match *self { - Self::Nil => Box::new(Self::FullFromBackers(FetchFull::new(params), self)), - Self::ChunksFromValidators(task, next) => { - let next = next.then_fetch_full_from_backers(params); - Box::new(Self::ChunksFromValidators(task, next)) - }, - Self::FullFromBackers(task, next) => { - let next = next.then_fetch_full_from_backers(params); - Box::new(Self::FullFromBackers(task, next)) - }, - } - } - - pub fn then_fetch_chunks_from_validators( - self: Box, - params: FetchChunksParams, - ) -> Box { - match *self { - Self::Nil => Box::new(Self::ChunksFromValidators(FetchChunks::new(params), self)), - Self::ChunksFromValidators(task, next) => { - let next = next.then_fetch_chunks_from_validators(params); - Box::new(Self::ChunksFromValidators(task, next)) - }, - Self::FullFromBackers(task, next) => { - let next = next.then_fetch_chunks_from_validators(params); - Box::new(Self::FullFromBackers(task, next)) - }, - } - } - - fn pop_first(self) -> (Self, Self) { - match self { - Self::Nil => (Self::Nil, Self::Nil), - Self::FullFromBackers(inner, next) => - (Self::FullFromBackers(inner, Box::new(Self::Nil)), *next), - Self::ChunksFromValidators(inner, next) => - (Self::ChunksFromValidators(inner, Box::new(Self::Nil)), *next), - } - } -} - +/// `RecoveryStrategy` that sequentially tries to fetch the full `AvailableData` from +/// already-connected validators in the configured validator set. pub struct FetchFull { params: FetchFullParams, } pub struct FetchFullParams { - pub(crate) group_name: &'static str, - pub(crate) validators: Vec, - pub(crate) skip_if: Box bool + Send>, - // channel to the erasure task handler. - pub(crate) erasure_task_tx: futures::channel::mpsc::Sender, + /// Name of the validator group used for recovery. For logging purposes. + /// (e.g."backers"/"approval-checkers") + pub group_name: &'static str, + /// Validators that will be used for fetching the data. + pub validators: Vec, + /// Predicate that if is true, will result in skipping this strategy. + pub skip_if: Box bool + Send>, + /// Channel to the erasure task handler. + pub erasure_task_tx: futures::channel::mpsc::Sender, } impl FetchFull { - fn new(params: FetchFullParams) -> Self { + /// Create a new `FetchFull` recovery strategy. + pub fn new(mut params: FetchFullParams) -> Self { + params.validators.shuffle(&mut rand::thread_rng()); Self { params } } +} + +#[async_trait::async_trait] +impl RecoveryStrategy for FetchFull { + fn display_name(&self) -> &'static str { + "Full recovery from backers" + } - async fn run( - mut self, + async fn run( + &mut self, _: &mut State, sender: &mut Sender, common_params: &RecoveryParams, @@ -606,30 +572,32 @@ impl FetchFull { } } +/// `RecoveryStrategy` that requests chunks from validators, in parallel. pub struct FetchChunks { - /// How many request have been unsuccessful so far. + /// How many requests have been unsuccessful so far. error_count: usize, - /// Total number of responses that have been received. - /// - /// including failed ones. + /// Total number of responses that have been received, including failed ones. total_received_responses: usize, - /// a random shuffling of the validators which indicates the order in which we connect to the + /// A random shuffling of the validators which indicates the order in which we connect to the /// validators and request the chunk from them. validators: VecDeque, - // channel to the erasure task handler. + /// Channel to the erasure task handler. erasure_task_tx: futures::channel::mpsc::Sender, } +/// Parameters specific to the `FetchChunks` strategy. pub struct FetchChunksParams { - pub(crate) n_validators: usize, - // channel to the erasure task handler. - pub(crate) erasure_task_tx: futures::channel::mpsc::Sender, + /// Total number of validators. + pub n_validators: usize, + /// Channel to the erasure task handler. + pub erasure_task_tx: futures::channel::mpsc::Sender, } impl FetchChunks { - fn new(params: FetchChunksParams) -> Self { + /// Instantiate a new strategy. + pub fn new(params: FetchChunksParams) -> Self { let mut shuffling: Vec<_> = (0..params.n_validators) .map(|i| ValidatorIndex(i.try_into().expect("number of validators must fit in a u32"))) .collect(); @@ -677,8 +645,91 @@ impl FetchChunks { ) } - async fn run( - mut self, + async fn attempt_recovery( + &mut self, + state: &mut State, + common_params: &RecoveryParams, + ) -> Result { + let recovery_duration = common_params.metrics.time_erasure_recovery(); + + // Send request to reconstruct available data from chunks. + let (avilable_data_tx, available_data_rx) = oneshot::channel(); + self.erasure_task_tx + .send(ErasureTask::Reconstruct( + common_params.n_validators, + // Safe to leave an empty vec in place, as we're stopping the recovery process if + // this reconstruct fails. + std::mem::take(&mut state.received_chunks), + avilable_data_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + let available_data_response = + available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; + + match available_data_response { + Ok(data) => { + // Send request to re-encode the chunks and check merkle root. + let (reencode_tx, reencode_rx) = oneshot::channel(); + self.erasure_task_tx + .send(ErasureTask::Reencode( + common_params.n_validators, + common_params.erasure_root, + data, + reencode_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + let reencode_response = + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; + + if let Some(data) = reencode_response { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Data recovery from chunks complete", + ); + + Ok(data) + } else { + recovery_duration.map(|rd| rd.stop_and_discard()); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Data recovery error - root mismatch", + ); + + Err(RecoveryError::Invalid) + } + }, + Err(err) => { + recovery_duration.map(|rd| rd.stop_and_discard()); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + ?err, + "Data recovery error ", + ); + + Err(RecoveryError::Invalid) + }, + } + } +} + +#[async_trait::async_trait] +impl RecoveryStrategy for FetchChunks { + fn display_name(&self) -> &'static str { + "Fetch chunks" + } + + async fn run( + &mut self, state: &mut State, sender: &mut Sender, common_params: &RecoveryParams, @@ -764,168 +815,12 @@ impl FetchChunks { self.error_count += error_count; } } - - async fn attempt_recovery( - &mut self, - state: &mut State, - common_params: &RecoveryParams, - ) -> Result { - let recovery_duration = common_params.metrics.time_erasure_recovery(); - - // Send request to reconstruct available data from chunks. - let (avilable_data_tx, available_data_rx) = oneshot::channel(); - self.erasure_task_tx - .send(ErasureTask::Reconstruct( - common_params.n_validators, - // Safe to leave an empty vec in place, as we're stopping the recovery process if - // this reconstruct fails. - std::mem::take(&mut state.received_chunks), - avilable_data_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let available_data_response = - available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - match available_data_response { - Ok(data) => { - // Send request to re-encode the chunks and check merkle root. - let (reencode_tx, reencode_rx) = oneshot::channel(); - self.erasure_task_tx - .send(ErasureTask::Reencode( - common_params.n_validators, - common_params.erasure_root, - data, - reencode_tx, - )) - .await - .map_err(|_| RecoveryError::ChannelClosed)?; - - let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; - - if let Some(data) = reencode_response { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - erasure_root = ?common_params.erasure_root, - "Data recovery from chunks complete", - ); - - Ok(data) - } else { - recovery_duration.map(|rd| rd.stop_and_discard()); - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - erasure_root = ?common_params.erasure_root, - "Data recovery error - root mismatch", - ); - - Err(RecoveryError::Invalid) - } - }, - Err(err) => { - recovery_duration.map(|rd| rd.stop_and_discard()); - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - erasure_root = ?common_params.erasure_root, - ?err, - "Data recovery error ", - ); - - Err(RecoveryError::Invalid) - }, - } - } } #[cfg(test)] mod tests { - use std::ops::Deref; - use super::*; - use assert_matches::assert_matches; use polkadot_erasure_coding::recovery_threshold; - use RecoveryStrategy::*; - - impl std::fmt::Debug for RecoveryStrategy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Nil => write!(f, "Nil"), - ChunksFromValidators(_, next) => - write!(f, "{:?} -> {}", self.display_name(), format!("{next:?}")), - FullFromBackers(_, next) => - write!(f, "{:?} -> {}", self.display_name(), format!("{next:?}")), - } - } - } - - #[test] - fn test_recovery_strategy_linked_list_ops() { - let fetch_full_params = FetchFullParams { - group_name: "backers", - validators: vec![], - skip_if: Box::new(|| true), - erasure_task_tx: futures::channel::mpsc::channel(0).0, - }; - let fetch_full_params_2 = FetchFullParams { - group_name: "approval-checkers", - validators: vec![], - skip_if: Box::new(|| true), - erasure_task_tx: futures::channel::mpsc::channel(0).0, - }; - - let fetch_chunks_params = FetchChunksParams { - n_validators: 2, - erasure_task_tx: futures::channel::mpsc::channel(0).0, - }; - let fetch_chunks_params_2 = FetchChunksParams { - n_validators: 3, - erasure_task_tx: futures::channel::mpsc::channel(0).0, - }; - let recovery_strategy = RecoveryStrategy::new() - .then_fetch_full_from_backers(fetch_full_params) - .then_fetch_full_from_backers(fetch_full_params_2) - .then_fetch_chunks_from_validators(fetch_chunks_params) - .then_fetch_chunks_from_validators(fetch_chunks_params_2); - - // Check that the builder correctly chains strategies. - assert_matches!( - recovery_strategy.deref(), - FullFromBackers(_, next) - if matches!(next.deref(), FullFromBackers(_, next) - if matches!(next.deref(), ChunksFromValidators(_, next) - if matches!(next.deref(), ChunksFromValidators(_, next) - if matches!(next.deref(), Nil) - ) - ) - ) - ); - - // Check the order for the `pop_first` operation. - let (current, next) = recovery_strategy.pop_first(); - assert_matches!(current, FullFromBackers(task, next) if task.params.group_name == "backers" && matches!(*next, Nil)); - assert_matches!(&next, FullFromBackers(task, _) if task.params.group_name == "approval-checkers"); - - let (current, next) = next.pop_first(); - assert_matches!(current, FullFromBackers(task, next) if task.params.group_name == "approval-checkers" && matches!(*next, Nil)); - assert_matches!(&next, ChunksFromValidators(task, _) if task.validators.len() == 2); - - let (current, next) = next.pop_first(); - assert_matches!(current, ChunksFromValidators(task, next) if task.validators.len() == 2 && matches!(*next, Nil)); - assert_matches!(&next, ChunksFromValidators(task, _) if task.validators.len() == 3); - - let (current, next) = next.pop_first(); - assert_matches!(current, ChunksFromValidators(task, next) if task.validators.len() == 3 && matches!(*next, Nil)); - assert_matches!(&next, Nil); - - let (current, next) = next.pop_first(); - assert_matches!(current, Nil); - assert_matches!(next, Nil); - } #[test] fn parallel_request_calculation_works_as_expected() { From 640c2d59778bb3103183ae8977b9222d28acc27e Mon Sep 17 00:00:00 2001 From: alindima Date: Tue, 12 Sep 2023 14:47:39 +0300 Subject: [PATCH 05/12] address comments --- .../network/availability-recovery/src/lib.rs | 18 ++- .../network/availability-recovery/src/task.rs | 113 +++++++----------- 2 files changed, 52 insertions(+), 79 deletions(-) diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index aaf149cdb6a1..04b71147c9f0 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -424,7 +424,7 @@ async fn handle_recover( let _span = span.child("session-info-ctx-received"); match session_info { Some(session_info) => { - let mut skip_backing_group_if: Box bool + Send> = Box::new(|| false); + let mut prefer_backing_group = true; if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) = recovery_strategy_kind @@ -435,7 +435,7 @@ async fn handle_recover( if let Ok(Some(chunk_size)) = chunk_size { let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len()) / 3; - let prefer_backing_group = pov_size_estimate < small_pov_limit; + prefer_backing_group = pov_size_estimate < small_pov_limit; gum::trace!( target: LOG_TARGET, @@ -445,8 +445,6 @@ async fn handle_recover( enabled = prefer_backing_group, "Prefer fetch from backing group", ); - - skip_backing_group_if = Box::new(move || !prefer_backing_group); } }; @@ -466,17 +464,15 @@ async fn handle_recover( > = VecDeque::with_capacity(2); if let Some(backing_validators) = backing_validators { - match recovery_strategy_kind { - RecoveryStrategyKind::BackersFirstAlways | - RecoveryStrategyKind::BackersFirstIfSizeLower(_) | - RecoveryStrategyKind::BypassAvailabilityStore => + match (&recovery_strategy_kind, prefer_backing_group) { + (RecoveryStrategyKind::BackersFirstAlways, true) | + (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) | + (RecoveryStrategyKind::BypassAvailabilityStore, true) => recovery_strategies.push_back(Box::new(FetchFull::new(FetchFullParams { - group_name: "backers", validators: backing_validators.to_vec(), - skip_if: skip_backing_group_if, erasure_task_tx, }))), - RecoveryStrategyKind::ChunksAlways => {}, + _ => {}, }; } diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 339cd0c18490..30eaf7734a52 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -78,7 +78,7 @@ pub struct RecoveryParams { /// Discovery ids of `validators`. pub validator_authority_keys: Vec, - /// Number of validators relevant to this `RecoveryTask`. + /// Number of validators. pub n_validators: usize, /// The number of chunks needed. @@ -96,6 +96,7 @@ pub struct RecoveryParams { /// Do not request data from availability-store. Useful for collators. pub bypass_availability_store: bool, } + /// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the /// same `RecoveryTask`. pub struct State { @@ -392,55 +393,55 @@ where let _timer = self.params.metrics.time_full_recovery(); - let res = loop { - if let Some(mut current_strategy) = self.strategies.pop_front() { - // Make sure we are not referencing futures from past RecoveryStrategy runs. - if self.state.requesting_chunks.total_len() != 0 { - self.state.requesting_chunks = FuturesUndead::new(); - } + while let Some(mut current_strategy) = self.strategies.pop_front() { + // Make sure we are not referencing futures from past RecoveryStrategy runs. + if self.state.requesting_chunks.total_len() != 0 { + self.state.requesting_chunks = FuturesUndead::new(); + } - gum::info!( - target: LOG_TARGET, - candidate_hash = ?self.params.candidate_hash, - "Starting `{}` strategy", - current_strategy.display_name(), - ); + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Starting `{}` strategy", + current_strategy.display_name(), + ); - let res = - current_strategy.run(&mut self.state, &mut self.sender, &self.params).await; + let res = current_strategy.run(&mut self.state, &mut self.sender, &self.params).await; - match res { - Err(RecoveryError::Unavailable) => - if self.strategies.front().is_some() { - gum::warn!( - target: LOG_TARGET, - candidate_hash = ?self.params.candidate_hash, - "Recovery strategy `{}` did not conclude. Trying the next one.", - current_strategy.display_name(), - ); - continue - }, - Err(err) => break Err(err), - Ok(data) => break Ok(data), - } - } else { - // We have no other strategies to try. - gum::error!( - target: LOG_TARGET, - candidate_hash = ?self.params.candidate_hash, - "Recovery of available data failed.", - ); - break Err(RecoveryError::Unavailable) + match res { + Err(RecoveryError::Unavailable) => + if self.strategies.front().is_some() { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Recovery strategy `{}` did not conclude. Trying the next one.", + current_strategy.display_name(), + ); + continue + }, + Err(err) => { + match &err { + RecoveryError::Invalid => self.params.metrics.on_recovery_invalid(), + _ => self.params.metrics.on_recovery_failed(), + } + return Err(err) + }, + Ok(data) => { + self.params.metrics.on_recovery_succeeded(); + return Ok(data) + }, } - }; - - match &res { - Ok(_) => self.params.metrics.on_recovery_succeeded(), - Err(RecoveryError::Invalid) => self.params.metrics.on_recovery_invalid(), - Err(_) => self.params.metrics.on_recovery_failed(), } - res + // We have no other strategies to try. + gum::warn!( + target: LOG_TARGET, + candidate_hash = ?self.params.candidate_hash, + "Recovery of available data failed.", + ); + self.params.metrics.on_recovery_failed(); + + Err(RecoveryError::Unavailable) } } @@ -451,13 +452,8 @@ pub struct FetchFull { } pub struct FetchFullParams { - /// Name of the validator group used for recovery. For logging purposes. - /// (e.g."backers"/"approval-checkers") - pub group_name: &'static str, /// Validators that will be used for fetching the data. pub validators: Vec, - /// Predicate that if is true, will result in skipping this strategy. - pub skip_if: Box bool + Send>, /// Channel to the erasure task handler. pub erasure_task_tx: futures::channel::mpsc::Sender, } @@ -482,25 +478,6 @@ impl RecoveryStrategy sender: &mut Sender, common_params: &RecoveryParams, ) -> Result { - if (self.params.skip_if)() { - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - erasure_root = ?common_params.erasure_root, - "Skipping requesting availability data from {}", - self.params.group_name - ); - - return Err(RecoveryError::Unavailable) - } - - gum::trace!( - target: LOG_TARGET, - candidate_hash = ?common_params.candidate_hash, - erasure_root = ?common_params.erasure_root, - "Requesting full availability data from {}", - self.params.group_name - ); loop { // Pop the next validator, and proceed to next fetch_chunks_task if we're out. let validator_index = From bf39ba0cddec7c1438bf83106cd0f2833852ca99 Mon Sep 17 00:00:00 2001 From: alindima Date: Fri, 15 Sep 2023 12:24:15 +0300 Subject: [PATCH 06/12] don't use the backing group if chunk size query failed --- polkadot/node/network/availability-recovery/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 04b71147c9f0..4a6a805ae451 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -445,6 +445,10 @@ async fn handle_recover( enabled = prefer_backing_group, "Prefer fetch from backing group", ); + } else { + // we have a POV limit but were not able to query the chunk size, so don't use + // the backing group. + prefer_backing_group = false; } }; From 34408b54abbdee49e187baee399bc85393c5522a Mon Sep 17 00:00:00 2001 From: alindima Date: Fri, 15 Sep 2023 12:29:47 +0300 Subject: [PATCH 07/12] add ImmediateError to chunks recovery strategy --- polkadot/node/network/availability-recovery/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 30eaf7734a52..8acd4f2040cb 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -222,7 +222,7 @@ impl State { sender .send_message(NetworkBridgeTxMessage::SendRequests( requests, - IfDisconnected::TryConnect, + IfDisconnected::ImmediateError, )) .await; } From 1569e49e3dfb547df3cf195c1e209715522b2011 Mon Sep 17 00:00:00 2001 From: alindima Date: Mon, 18 Sep 2023 10:41:45 +0300 Subject: [PATCH 08/12] more review comments --- .../network/availability-recovery/src/lib.rs | 97 +++++++++---------- 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 4a6a805ae451..98087bca6f0b 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -424,63 +424,58 @@ async fn handle_recover( let _span = span.child("session-info-ctx-received"); match session_info { Some(session_info) => { - let mut prefer_backing_group = true; - - if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) = - recovery_strategy_kind - { - // Get our own chunk size to get an estimate of the PoV size. - let chunk_size: Result, error::Error> = - query_chunk_size(ctx, candidate_hash).await; - if let Ok(Some(chunk_size)) = chunk_size { - let pov_size_estimate = - chunk_size.saturating_mul(session_info.validators.len()) / 3; - prefer_backing_group = pov_size_estimate < small_pov_limit; - - gum::trace!( - target: LOG_TARGET, - ?candidate_hash, - pov_size_estimate, - small_pov_limit, - enabled = prefer_backing_group, - "Prefer fetch from backing group", - ); - } else { - // we have a POV limit but were not able to query the chunk size, so don't use - // the backing group. - prefer_backing_group = false; - } - }; - - let backing_validators = if let Some(backing_group) = backing_group { - session_info.validator_groups.get(backing_group) - } else { - None - }; - - let fetch_chunks_params = FetchChunksParams { - n_validators: session_info.validators.len(), - erasure_task_tx: erasure_task_tx.clone(), - }; - let mut recovery_strategies: VecDeque< Box::Sender>>, > = VecDeque::with_capacity(2); - if let Some(backing_validators) = backing_validators { - match (&recovery_strategy_kind, prefer_backing_group) { - (RecoveryStrategyKind::BackersFirstAlways, true) | - (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) | - (RecoveryStrategyKind::BypassAvailabilityStore, true) => - recovery_strategies.push_back(Box::new(FetchFull::new(FetchFullParams { - validators: backing_validators.to_vec(), - erasure_task_tx, - }))), - _ => {}, - }; + if let Some(backing_group) = backing_group { + if let Some(backing_validators) = session_info.validator_groups.get(backing_group) { + let mut small_pov_size = true; + + if let RecoveryStrategyKind::BackersFirstIfSizeLower(small_pov_limit) = + recovery_strategy_kind + { + // Get our own chunk size to get an estimate of the PoV size. + let chunk_size: Result, error::Error> = + query_chunk_size(ctx, candidate_hash).await; + if let Ok(Some(chunk_size)) = chunk_size { + let pov_size_estimate = + chunk_size.saturating_mul(session_info.validators.len()) / 3; + small_pov_size = pov_size_estimate < small_pov_limit; + + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + pov_size_estimate, + small_pov_limit, + enabled = small_pov_size, + "Prefer fetch from backing group", + ); + } else { + // we have a POV limit but were not able to query the chunk size, so + // don't use the backing group. + small_pov_size = false; + } + }; + + match (&recovery_strategy_kind, small_pov_size) { + (RecoveryStrategyKind::BackersFirstAlways, _) | + (RecoveryStrategyKind::BypassAvailabilityStore, _) | + (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) => recovery_strategies.push_back( + Box::new(FetchFull::new(FetchFullParams { + validators: backing_validators.to_vec(), + erasure_task_tx: erasure_task_tx.clone(), + })), + ), + _ => {}, + }; + } } - recovery_strategies.push_back(Box::new(FetchChunks::new(fetch_chunks_params))); + recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams { + n_validators: session_info.validators.len(), + erasure_task_tx, + }))); launch_recovery_task( state, From d5c32d13c58e52cfbd7c0879670281b79e84c379 Mon Sep 17 00:00:00 2001 From: alindima Date: Mon, 18 Sep 2023 11:55:46 +0300 Subject: [PATCH 09/12] fix test --- polkadot/node/network/availability-recovery/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index 17b02080c008..63ccf0e94f91 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -1014,7 +1014,7 @@ fn recovers_from_only_chunks_if_pov_large() { AvailabilityRecoveryMessage::RecoverAvailableData( new_candidate.clone(), test_state.session_index, - None, + Some(GroupIndex(0)), tx, ), ) From cb42bef4ae047bbe3bf1681874daebf72fc7acb9 Mon Sep 17 00:00:00 2001 From: alindima Date: Mon, 18 Sep 2023 12:12:07 +0300 Subject: [PATCH 10/12] rollback to using TryConnect for fetch chunks --- polkadot/node/network/availability-recovery/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 8acd4f2040cb..30eaf7734a52 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -222,7 +222,7 @@ impl State { sender .send_message(NetworkBridgeTxMessage::SendRequests( requests, - IfDisconnected::ImmediateError, + IfDisconnected::TryConnect, )) .await; } From b82b2a00b56483a71186cc0835c51fed042e69e3 Mon Sep 17 00:00:00 2001 From: alindima Date: Tue, 19 Sep 2023 09:52:21 +0300 Subject: [PATCH 11/12] replace BypassAvStore variant with a separate flag --- .../network/availability-recovery/src/lib.rs | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 98087bca6f0b..e2146981da92 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -90,17 +90,17 @@ pub enum RecoveryStrategyKind { BackersFirstIfSizeLower(usize), /// We always recover using validator chunks. ChunksAlways, - /// Do not request data from the availability store. - /// This is the useful for nodes where the - /// availability-store subsystem is not expected to run, - /// such as collators. - BypassAvailabilityStore, } /// The Availability Recovery Subsystem. pub struct AvailabilityRecoverySubsystem { /// PoV recovery strategy to use. recovery_strategy_kind: RecoveryStrategyKind, + // If this is true, do not request data from the availability store. + /// This is the useful for nodes where the + /// availability-store subsystem is not expected to run, + /// such as collators. + bypass_availability_store: bool, /// Receiver for available data requests. req_receiver: IncomingRequestReceiver, /// Metrics for this subsystem. @@ -389,6 +389,7 @@ async fn handle_recover( metrics: &Metrics, erasure_task_tx: futures::channel::mpsc::Sender, recovery_strategy_kind: RecoveryStrategyKind, + bypass_availability_store: bool, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -460,7 +461,6 @@ async fn handle_recover( match (&recovery_strategy_kind, small_pov_size) { (RecoveryStrategyKind::BackersFirstAlways, _) | - (RecoveryStrategyKind::BypassAvailabilityStore, _) | (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) => recovery_strategies.push_back( Box::new(FetchFull::new(FetchFullParams { validators: backing_validators.to_vec(), @@ -485,7 +485,7 @@ async fn handle_recover( response_sender, metrics, recovery_strategies, - recovery_strategy_kind == RecoveryStrategyKind::BypassAvailabilityStore, + bypass_availability_store, ) .await }, @@ -534,7 +534,8 @@ impl AvailabilityRecoverySubsystem { metrics: Metrics, ) -> Self { Self { - recovery_strategy_kind: RecoveryStrategyKind::BypassAvailabilityStore, + recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT), + bypass_availability_store: true, req_receiver, metrics, } @@ -548,6 +549,7 @@ impl AvailabilityRecoverySubsystem { ) -> Self { Self { recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways, + bypass_availability_store: false, req_receiver, metrics, } @@ -558,7 +560,12 @@ impl AvailabilityRecoverySubsystem { req_receiver: IncomingRequestReceiver, metrics: Metrics, ) -> Self { - Self { recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways, req_receiver, metrics } + Self { + recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways, + bypass_availability_store: false, + req_receiver, + metrics, + } } /// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is @@ -569,6 +576,7 @@ impl AvailabilityRecoverySubsystem { ) -> Self { Self { recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT), + bypass_availability_store: false, req_receiver, metrics, } @@ -576,7 +584,8 @@ impl AvailabilityRecoverySubsystem { async fn run(self, mut ctx: Context) -> SubsystemResult<()> { let mut state = State::default(); - let Self { mut req_receiver, metrics, recovery_strategy_kind } = self; + let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } = + self; let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); let mut erasure_task_rx = erasure_task_rx.fuse(); @@ -666,6 +675,7 @@ impl AvailabilityRecoverySubsystem { &metrics, erasure_task_tx.clone(), recovery_strategy_kind.clone(), + bypass_availability_store ).await { gum::warn!( target: LOG_TARGET, @@ -681,7 +691,7 @@ impl AvailabilityRecoverySubsystem { in_req = recv_req => { match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? { Ok(req) => { - if recovery_strategy_kind == RecoveryStrategyKind::BypassAvailabilityStore { + if bypass_availability_store { gum::debug!( target: LOG_TARGET, "Skipping request to availability-store.", From 8585628341aa1b40b74b6a432014f74d72229aef Mon Sep 17 00:00:00 2001 From: alindima Date: Tue, 19 Sep 2023 10:04:23 +0300 Subject: [PATCH 12/12] move requesting_chunks to the strategy --- .../network/availability-recovery/src/task.rs | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 30eaf7734a52..d5bc2da84944 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -102,13 +102,11 @@ pub struct RecoveryParams { pub struct State { /// Chunks received so far. received_chunks: HashMap, - /// Collection of in-flight requests. - requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, } impl State { fn new() -> Self { - Self { received_chunks: HashMap::new(), requesting_chunks: FuturesUndead::new() } + Self { received_chunks: HashMap::new() } } fn insert_chunk(&mut self, validator: ValidatorIndex, chunk: ErasureChunk) { @@ -174,15 +172,18 @@ impl State { sender: &mut Sender, desired_requests_count: usize, validators: &mut VecDeque, + requesting_chunks: &mut FuturesUndead< + Result, (ValidatorIndex, RequestError)>, + >, ) where Sender: overseer::AvailabilityRecoverySenderTrait, { let candidate_hash = ¶ms.candidate_hash; - let already_requesting_count = self.requesting_chunks.len(); + let already_requesting_count = requesting_chunks.len(); let mut requests = Vec::with_capacity(desired_requests_count - already_requesting_count); - while self.requesting_chunks.len() < desired_requests_count { + while requesting_chunks.len() < desired_requests_count { if let Some(validator_index) = validators.pop_back() { let validator = params.validator_authority_keys[validator_index.0 as usize].clone(); gum::trace!( @@ -205,7 +206,7 @@ impl State { params.metrics.on_chunk_request_issued(); let timer = params.metrics.time_chunk_request(); - self.requesting_chunks.push(Box::pin(async move { + requesting_chunks.push(Box::pin(async move { let _timer = timer; match res.await { Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => @@ -232,6 +233,9 @@ impl State { &mut self, params: &RecoveryParams, validators: &mut VecDeque, + requesting_chunks: &mut FuturesUndead< + Result, (ValidatorIndex, RequestError)>, + >, can_conclude: impl Fn(usize, usize, usize, &RecoveryParams, usize) -> bool, ) -> (usize, usize) { let metrics = ¶ms.metrics; @@ -243,7 +247,7 @@ impl State { // We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will // return in that case for `launch_parallel_requests` to fill up slots again. while let Some(request_result) = - self.requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await + requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await { total_received_responses += 1; @@ -313,7 +317,7 @@ impl State { // or have gotten firm 'No' responses from enough validators. if can_conclude( validators.len(), - self.requesting_chunks.total_len(), + requesting_chunks.total_len(), self.chunk_count(), params, error_count, @@ -322,7 +326,7 @@ impl State { target: LOG_TARGET, candidate_hash = ?params.candidate_hash, received_chunks_count = ?self.chunk_count(), - requested_chunks_count = ?self.requesting_chunks.len(), + requested_chunks_count = ?requesting_chunks.len(), threshold = ?params.threshold, "Can conclude availability for a candidate", ); @@ -394,11 +398,6 @@ where let _timer = self.params.metrics.time_full_recovery(); while let Some(mut current_strategy) = self.strategies.pop_front() { - // Make sure we are not referencing futures from past RecoveryStrategy runs. - if self.state.requesting_chunks.total_len() != 0 { - self.state.requesting_chunks = FuturesUndead::new(); - } - gum::debug!( target: LOG_TARGET, candidate_hash = ?self.params.candidate_hash, @@ -555,11 +554,11 @@ pub struct FetchChunks { error_count: usize, /// Total number of responses that have been received, including failed ones. total_received_responses: usize, - + /// Collection of in-flight requests. + requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, /// A random shuffling of the validators which indicates the order in which we connect to the /// validators and request the chunk from them. validators: VecDeque, - /// Channel to the erasure task handler. erasure_task_tx: futures::channel::mpsc::Sender, } @@ -583,6 +582,7 @@ impl FetchChunks { Self { error_count: 0, total_received_responses: 0, + requesting_chunks: FuturesUndead::new(), validators: shuffling.into(), erasure_task_tx: params.erasure_task_tx, } @@ -732,7 +732,7 @@ impl RecoveryStrategy if Self::is_unavailable( self.validators.len(), - state.requesting_chunks.total_len(), + self.requesting_chunks.total_len(), state.chunk_count(), common_params.threshold, ) { @@ -741,8 +741,8 @@ impl RecoveryStrategy candidate_hash = ?common_params.candidate_hash, erasure_root = ?common_params.erasure_root, received = %state.chunk_count(), - requesting = %state.requesting_chunks.len(), - total_requesting = %state.requesting_chunks.total_len(), + requesting = %self.requesting_chunks.len(), + total_requesting = %self.requesting_chunks.total_len(), n_validators = %common_params.n_validators, "Data recovery from chunks is not possible", ); @@ -752,7 +752,7 @@ impl RecoveryStrategy let desired_requests_count = self.get_desired_request_count(state.chunk_count(), common_params.threshold); - let already_requesting_count = state.requesting_chunks.len(); + let already_requesting_count = self.requesting_chunks.len(); gum::debug!( target: LOG_TARGET, ?common_params.candidate_hash, @@ -769,6 +769,7 @@ impl RecoveryStrategy sender, desired_requests_count, &mut self.validators, + &mut self.requesting_chunks, ) .await; @@ -776,6 +777,7 @@ impl RecoveryStrategy .wait_for_chunks( common_params, &mut self.validators, + &mut self.requesting_chunks, |unrequested_validators, reqs, chunk_count, params, _error_count| { chunk_count >= params.threshold || Self::is_unavailable(