Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

relay chain selection and dispute-coordinator fixes and improvements #4752

Merged
merged 27 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 128 additions & 18 deletions node/core/dispute-coordinator/src/real/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use lru::LruCache;

use sc_keystore::LocalKeystore;

Expand All @@ -37,7 +38,7 @@ use polkadot_node_subsystem::{
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
};
use polkadot_primitives::{
v1::{
Expand All @@ -48,11 +49,12 @@ use polkadot_primitives::{
v2::SessionInfo,
};

use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET};

use crate::{
error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
metrics::Metrics,
real::DisputeCoordinatorSubsystem,
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
LOG_TARGET,
};

use super::{
Expand All @@ -66,6 +68,9 @@ use super::{
OverlayedBackend,
};

const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 40;
const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20;

/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
Expand All @@ -80,6 +85,11 @@ pub struct Initialized {
ordering_provider: OrderingProvider,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
/// Latest relay blocks that have been successfully scraped.
last_scraped_blocks: LruCache<Hash, ()>,
}

impl Initialized {
Expand All @@ -105,6 +115,8 @@ impl Initialized {
participation,
participation_receiver,
metrics,
error: None,
last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY),
}
}

Expand Down Expand Up @@ -245,22 +257,26 @@ impl Initialized {
.await?;
self.participation.process_active_leaves_update(ctx, &update).await?;

let new_activations = update.activated.into_iter().map(|a| a.hash);
for new_leaf in new_activations {
match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
if let Some(new_leaf) = update.activated {
match self
.rolling_session_window
.cache_session_info_for_head(ctx, new_leaf.hash)
.await
{
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
continue
self.error = Some(e);
},
Ok(SessionWindowUpdate::Advanced {
new_window_end: window_end,
new_window_start,
..
}) => {
self.error = None;
let session = window_end;
if self.highest_session < session {
tracing::trace!(
Expand All @@ -277,7 +293,57 @@ impl Initialized {
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?;

// Scrape the head if above rolling session update went well.
if self.error.is_none() {
let _ = self
.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now)
.await
.map_err(|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block #{}({}) due to error: {}",
new_leaf.number,
new_leaf.hash,
err
);
});
}

// Try to scrape any blocks for which we could not get the current session or did not receive an
// active leaves update.
let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS);
let ancestors = OrderingProvider::get_block_ancestors(
ctx.sender(),
new_leaf.hash,
new_leaf.number,
target_block,
&mut self.last_scraped_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
"Skipping leaf ancestors scraping due to error: {}",
err
);
Vec::new()
});

// We could do this in parallel, but we don't want to overindex on the wasm instances
// usage.
sandreim marked this conversation as resolved.
Show resolved Hide resolved
for ancestor in ancestors {
let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err(
|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block {} due to error: {}",
ancestor,
err
sandreim marked this conversation as resolved.
Show resolved Hide resolved
);
},
);
}
}

Ok(())
Expand All @@ -293,6 +359,11 @@ impl Initialized {
new_leaf: Hash,
now: u64,
) -> Result<()> {
// Avoid scraping twice.
if self.last_scraped_blocks.get(&new_leaf).is_some() {
return Ok(())
}

// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
Expand Down Expand Up @@ -331,6 +402,9 @@ impl Initialized {
};

if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
// This block is not interesting as it doesnt contain any backing votes or disputes. We'll
// mark it here as scraped to prevent further processing.
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -413,6 +487,7 @@ impl Initialized {
}

if disputes.is_empty() {
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -490,6 +565,8 @@ impl Initialized {
"Attempted import of on-chain statement of concluded dispute failed"),
}
}

self.last_scraped_blocks.put(new_leaf, ());
Ok(())
}

Expand Down Expand Up @@ -533,18 +610,39 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
// Return error if session information is missing.
self.ensure_no_errors()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
std::collections::BTreeMap::new()
sandreim marked this conversation as resolved.
Show resolved Hide resolved
};

let _ = tx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
let recent_disputes =
overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter();
let _ =
tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect());
// Return error if session information is missing.
self.ensure_no_errors()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
std::collections::BTreeMap::new()
};

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.collect(),
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
// Return error if session information is missing.
self.ensure_no_errors()?;
sandreim marked this conversation as resolved.
Show resolved Hide resolved

let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
for (session_index, candidate_hash) in query {
if let Some(v) =
overlay_db.load_candidate_votes(session_index, &candidate_hash)?
{
Expand Down Expand Up @@ -581,6 +679,9 @@ impl Initialized {
block_descriptions,
tx,
} => {
// Return error if session information is missing.
self.ensure_no_errors()?;

let undisputed_chain = determine_undisputed_chain(
overlay_db,
base_number,
Expand All @@ -595,6 +696,15 @@ impl Initialized {
Ok(Box::new(|| Ok(())))
}

// Helper function for checking subsystem errors in message processing.
fn ensure_no_errors(&self) -> Result<()> {
if let Some(subsystem_error) = self.error.clone() {
return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error)))
}

Ok(())
}

async fn handle_import_statements(
&mut self,
ctx: &mut impl SubsystemContext,
Expand Down
55 changes: 39 additions & 16 deletions node/core/dispute-coordinator/src/real/ordering/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,43 @@ impl OrderingProvider {
update: &ActiveLeavesUpdate,
) -> Result<()> {
if let Some(activated) = update.activated.as_ref() {
// Fetch ancestors of the activated leaf.
let ancestors = self
.get_block_ancestors(sender, activated.hash, activated.number)
.await
.unwrap_or_else(|err| {
// Fetch last finalized block.
let ancestors = match get_finalized_block_number(sender).await {
Ok(block_number) => {
// Fetch ancestry up to last finalized block.
Self::get_block_ancestors(
sender,
activated.hash,
activated.number,
block_number,
&mut self.last_observed_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?activated,
"Skipping leaf ancestors due to an error: {}",
err
sandreim marked this conversation as resolved.
Show resolved Hide resolved
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?activated,
"Skipping leaf ancestors due to an error: {}",
"Failed to retrieve last finalized block number: {}",
err
sandreim marked this conversation as resolved.
Show resolved Hide resolved
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
});
},
};

// Ancestors block numbers are consecutive in the descending order.
let earliest_block_number = activated.number - ancestors.len() as u32;
let block_numbers = (earliest_block_number..=activated.number).rev();
Expand Down Expand Up @@ -242,23 +266,22 @@ impl OrderingProvider {
}

/// Returns ancestors of `head` in the descending order, stopping
/// either at the block present in cache or the latest finalized block.
/// either at the block present in cache or at `target_ancestor`.
///
/// Suited specifically for querying non-finalized chains, thus
/// doesn't rely on block numbers.
///
/// Both `head` and last are **not** included in the result.
async fn get_block_ancestors<Sender: SubsystemSender>(
&mut self,
pub async fn get_block_ancestors<Sender: SubsystemSender>(
sender: &mut Sender,
mut head: Hash,
mut head_number: BlockNumber,
target_ancestor: BlockNumber,
lookup_cache: &mut LruCache<Hash, ()>,
) -> Result<Vec<Hash>> {
let mut ancestors = Vec::new();

let finalized_block_number = get_finalized_block_number(sender).await?;
sandreim marked this conversation as resolved.
Show resolved Hide resolved

if self.last_observed_blocks.get(&head).is_some() || head_number <= finalized_block_number {
if lookup_cache.get(&head).is_some() || head_number <= target_ancestor {
return Ok(ancestors)
}

Expand Down Expand Up @@ -297,10 +320,10 @@ impl OrderingProvider {
let block_numbers = (earliest_block_number..head_number).rev();

for (block_number, hash) in block_numbers.zip(&hashes) {
// Return if we either met finalized/cached block or
// Return if we either met target/cached block or
// hit the size limit for the returned ancestry of head.
if self.last_observed_blocks.get(hash).is_some() ||
block_number <= finalized_block_number ||
if lookup_cache.get(hash).is_some() ||
block_number <= target_ancestor ||
ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT
{
return Ok(ancestors)
Expand Down
Loading