diff --git a/polkadot/erasure-coding/src/lib.rs b/polkadot/erasure-coding/src/lib.rs index 36847b463715..1bb190211e4b 100644 --- a/polkadot/erasure-coding/src/lib.rs +++ b/polkadot/erasure-coding/src/lib.rs @@ -139,6 +139,21 @@ pub fn obtain_chunks(n_validators: usize, data: &T) -> Result( + _n_validators: usize, + _chunks: I, +) -> Result +where + I: IntoIterator, +{ + todo!() +} + /// Reconstruct the v1 available data from a set of chunks. /// /// Provide an iterator containing chunk data and the corresponding index. diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 191ee2acd973..4e4a537de776 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -168,6 +168,7 @@ impl FetchTaskConfig { request: ChunkFetchingRequest { candidate_hash: core.candidate_hash, index: session_info.our_index, + // TODO: this will no longer be our index. we need to take into account the per-height shuffling of systemic chunks. }, erasure_root: core.candidate_descriptor.erasure_root, relay_parent: core.candidate_descriptor.relay_parent, diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 446988f7cc0d..c8c3f6b4b8ce 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -77,8 +77,16 @@ pub struct Requester { /// Prometheus Metrics metrics: Metrics, + // TODO: use helper function to compute the chunk index we need to request and pass it to + // FetchTaskConfig::new() and save it in a HashMap. I think It's useful to + // cache it because we may hold availability chunks for multiple parablocks in the same relay + // parent. use an LruCache with the capacity of the number of relay blocks in a session and + // fallback to the helper function if the entry is not present in the cache. } +// TODO: helper function for computing the chunk index for the validator, starting from the block +// height and array of all the validators (not just in group) + #[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)] impl Requester { /// How many ancestors of the leaf should we consider along with it. @@ -231,6 +239,8 @@ impl Requester { // guaranteed to be fetchable by the state trie. leaf, leaf_session_index, + // TODO: query the hashmap and pass in the chunk index to + // FetchTaskConfig. |info| FetchTaskConfig::new(leaf, &core, tx, metrics, info, span), ) .await diff --git a/polkadot/node/network/availability-distribution/src/requester/session_cache.rs b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs index 8a48e19c2827..aeba2a0e2524 100644 --- a/polkadot/node/network/availability-distribution/src/requester/session_cache.rs +++ b/polkadot/node/network/availability-distribution/src/requester/session_cache.rs @@ -188,6 +188,9 @@ impl SessionCache { // Get our group index: let our_group = info.validator_info.our_group; + // TODO: This shuffling is fine, as it only is needed for requesting the chunk + // for blocks pending availability. + // Shuffle validators in groups: let mut rng = thread_rng(); for g in validator_groups.iter_mut() { diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index aaf149cdb6a1..9b7b83e6f48a 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -35,7 +35,10 @@ use futures::{ task::{Context, Poll}, }; use schnellru::{ByLength, LruMap}; -use task::{FetchChunks, FetchChunksParams, FetchFull, FetchFullParams}; +use task::{ + FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks, + FetchSystematicChunksParams, +}; use fatality::Nested; use polkadot_erasure_coding::{ @@ -90,6 +93,8 @@ pub enum RecoveryStrategyKind { BackersFirstIfSizeLower(usize), /// We always recover using validator chunks. ChunksAlways, + /// First try the backing group. Then systematic chunks. + BackersThenSystematicChunks, /// Do not request data from the availability store. /// This is the useful for nodes where the /// availability-store subsystem is not expected to run, @@ -474,12 +479,23 @@ async fn handle_recover( group_name: "backers", validators: backing_validators.to_vec(), skip_if: skip_backing_group_if, - erasure_task_tx, + erasure_task_tx: erasure_task_tx.clone(), }))), - RecoveryStrategyKind::ChunksAlways => {}, + _ => {}, }; } + if recovery_strategy_kind == RecoveryStrategyKind::BackersThenSystematicChunks { + recovery_strategies.push_back(Box::new(FetchSystematicChunks::new( + FetchSystematicChunksParams { + validators: (0..recovery_threshold(session_info.validators.len()).unwrap()) + .map(|i| ValidatorIndex(u32::try_from(i).unwrap())) + .collect(), + erasure_task_tx, + }, + ))); + } + recovery_strategies.push_back(Box::new(FetchChunks::new(fetch_chunks_params))); launch_recovery_task( diff --git a/polkadot/node/network/availability-recovery/src/task.rs b/polkadot/node/network/availability-recovery/src/task.rs index 339cd0c18490..da4d3912361a 100644 --- a/polkadot/node/network/availability-recovery/src/task.rs +++ b/polkadot/node/network/availability-recovery/src/task.rs @@ -37,7 +37,7 @@ use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash, ValidatorIn use rand::seq::SliceRandom; use sc_network::{IfDisconnected, OutboundFailure, RequestFailure}; use std::{ - collections::{HashMap, VecDeque}, + collections::{BTreeSet, HashMap, VecDeque}, time::Duration, }; @@ -572,6 +572,278 @@ impl RecoveryStrategy } } +/// `RecoveryStrategy` that attempts to recover the systematic chunks from the validators that +/// hold them, in order to bypass the erasure code reconstruction step, which is costly. +pub struct FetchSystematicChunks { + params: FetchSystematicChunksParams, + validators: VecDeque, +} + +/// Parameters needed for fetching systematic chunks. +pub struct FetchSystematicChunksParams { + /// Validators that hold the systematic chunks. Should be pre-shuffled. + pub validators: BTreeSet, + /// Channel to the erasure task handler. + pub erasure_task_tx: futures::channel::mpsc::Sender, +} + +impl FetchSystematicChunks { + /// Instantiate a new systematic chunks strategy. + pub fn new(params: FetchSystematicChunksParams) -> Self { + Self { + // We maintain the copy of the systematic validators in the params field, so that we + // can use them when doing the recovery. + validators: params.validators.clone().into_iter().collect(), + params, + } + } + + fn is_unavailable( + unrequested_validators: usize, + in_flight_requests: usize, + systematic_chunk_count: usize, + threshold: usize, + ) -> bool { + is_unavailable( + systematic_chunk_count, + in_flight_requests, + unrequested_validators, + threshold, + ) + } + + /// Desired number of parallel requests. + /// + /// For the given threshold (total required number of chunks) get the desired number of + /// requests we want to have running in parallel at this time. + fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize { + // Upper bound for parallel requests. + let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold); + // How many chunks are still needed? + let remaining_chunks = threshold.saturating_sub(chunk_count); + // Actual number of requests we want to have in flight in parallel: + // We don't have to make up for any error rate, as an error fetching a systematic chunk + // results in failure of the entire strategy. + std::cmp::min(max_requests_boundary, remaining_chunks) + } + + async fn attempt_systematic_recovery( + &mut self, + state: &mut State, + common_params: &RecoveryParams, + ) -> Result { + let recovery_duration = common_params.metrics.time_erasure_recovery(); + + let available_data = polkadot_erasure_coding::reconstruct_from_systematic_v1( + common_params.n_validators, + state.received_chunks.iter().map(|(index, chunk)| { + ( + &chunk.chunk[..], + usize::try_from(index.0).expect( + "polkadot does not run on platforms where u32 cannot be cast into a usize", + ), + ) + }), + ); + + match available_data { + Ok(data) => { + // Send request to re-encode the chunks and check merkle root. + let (reencode_tx, reencode_rx) = oneshot::channel(); + self.params + .erasure_task_tx + .send(ErasureTask::Reencode( + common_params.n_validators, + common_params.erasure_root, + data, + reencode_tx, + )) + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + + let reencode_response = + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; + + if let Some(data) = reencode_response { + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Data recovery from systematic chunks complete", + ); + + Ok(data) + } else { + recovery_duration.map(|rd| rd.stop_and_discard()); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + "Systematic data recovery error - root mismatch", + ); + + // Return a non-fatal error, because we may have gotten a non-systematic chunk + // from a validator by mistake. We may have enough chunks for the next strategy + // to pass. + Err(RecoveryError::Unavailable) + } + }, + Err(err) => { + recovery_duration.map(|rd| rd.stop_and_discard()); + gum::trace!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + ?err, + "Systematic data recovery error ", + ); + + // Return a non-fatal error, because we may have gotten a non-systematic chunk from + // a validator by mistake. We may have enough chunks for the next strategy to pass. + Err(RecoveryError::Unavailable) + }, + } + } +} + +#[async_trait::async_trait] +impl RecoveryStrategy + for FetchSystematicChunks +{ + fn display_name(&self) -> &'static str { + "Fetch systematic chunks" + } + + async fn run( + &mut self, + state: &mut State, + sender: &mut Sender, + common_params: &RecoveryParams, + ) -> Result { + // First query the store for any chunks we've got. + if !common_params.bypass_availability_store { + let local_chunk_indices = state.populate_from_av_store(common_params, sender).await; + self.validators.retain(|i| !local_chunk_indices.contains(i)); + + for i in &local_chunk_indices { + // If we are among the systematic validators but hold an invalid chunk, we cannot + // perform the systematic recovery. Fall through to the next strategy. + if self.params.validators.contains(i) && !state.received_chunks.contains_key(i) { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + requesting = %state.requesting_chunks.len(), + total_requesting = %state.requesting_chunks.total_len(), + n_validators = %common_params.n_validators, + "Systematic chunk recovery is not possible. We are among the systematic validators but hold an invalid chunk", + ); + return Err(RecoveryError::Unavailable) + } + } + } + + // No need to query the validators that have the chunks we already received. + self.validators.retain(|i| !state.received_chunks.contains_key(i)); + + let mut systematic_chunk_count = self + .params + .validators + .iter() + .filter(|i| state.received_chunks.contains_key(i)) + .count(); + let systematic_chunk_threshold = self.params.validators.len(); + + loop { + // If received_chunks has `systematic_chunk_threshold` entries, attempt to recover the + // data. If that fails, or a re-encoding of it doesn't match the expected erasure root, + // return Err(RecoveryError::Invalid) + if systematic_chunk_count >= systematic_chunk_threshold { + return self.attempt_systematic_recovery(state, common_params).await + } + + if Self::is_unavailable( + self.validators.len(), + state.requesting_chunks.total_len(), + systematic_chunk_count, + systematic_chunk_threshold, + ) { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + received = %systematic_chunk_count, + requesting = %state.requesting_chunks.len(), + total_requesting = %state.requesting_chunks.total_len(), + n_validators = %common_params.n_validators, + threshold = ?systematic_chunk_threshold, + "Data recovery is not possible", + ); + + return Err(RecoveryError::Unavailable) + } + + let desired_requests_count = + self.get_desired_request_count(systematic_chunk_count, systematic_chunk_threshold); + let already_requesting_count = state.requesting_chunks.len(); + gum::debug!( + target: LOG_TARGET, + ?common_params.candidate_hash, + ?desired_requests_count, + total_received = ?systematic_chunk_count, + threshold = ?systematic_chunk_threshold, + ?already_requesting_count, + "Requesting systematic availability chunks for a candidate", + ); + + state + .launch_parallel_chunk_requests( + common_params, + sender, + desired_requests_count, + &mut self.validators, + ) + .await; + + let (total_responses, error_count) = state + .wait_for_chunks( + common_params, + &mut self.validators, + |unrequested_validators, reqs, chunk_count, _params, error_count| { + error_count != 0 || + chunk_count >= systematic_chunk_threshold || + Self::is_unavailable( + unrequested_validators, + reqs, + chunk_count, + systematic_chunk_threshold, + ) + }, + ) + .await; + + // We can't afford any errors, as we need all the systematic chunks for this to work. + if error_count > 0 { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?common_params.candidate_hash, + erasure_root = ?common_params.erasure_root, + received = %systematic_chunk_count, + requesting = %state.requesting_chunks.len(), + total_requesting = %state.requesting_chunks.total_len(), + n_validators = %common_params.n_validators, + threshold = ?systematic_chunk_threshold, + "Systematic chunk recovery is not possible. ", + ); + + return Err(RecoveryError::Unavailable) + } + + systematic_chunk_count += total_responses; + } + } +} + /// `RecoveryStrategy` that requests chunks from validators, in parallel. pub struct FetchChunks { /// How many requests have been unsuccessful so far.