Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

relay chain selection and dispute-coordinator fixes and improvements #4752

Merged
merged 27 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 159 additions & 19 deletions node/core/dispute-coordinator/src/real/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

//! Dispute coordinator subsystem in initialized state (after first active leaf is received).

use std::{collections::HashSet, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};

use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use lru::LruCache;

use sc_keystore::LocalKeystore;

Expand All @@ -37,7 +41,7 @@ use polkadot_node_subsystem::{
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext,
};
use polkadot_node_subsystem_util::rolling_session_window::{
RollingSessionWindow, SessionWindowUpdate,
RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable,
};
use polkadot_primitives::{
v1::{
Expand All @@ -48,11 +52,12 @@ use polkadot_primitives::{
v2::SessionInfo,
};

use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET};

use crate::{
error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result},
metrics::Metrics,
real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem},
status::{get_active_with_status, Clock, DisputeStatus, Timestamp},
LOG_TARGET,
};

use super::{
Expand All @@ -66,6 +71,11 @@ use super::{
OverlayedBackend,
};

// The capacity and scrape depth are equal to the maximum allowed unfinalized depth.
const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500;
// This is in sync with `MAX_FINALITY_LAG` in relay chain selection.
const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500;

/// After the first active leaves update we transition to `Initialized` state.
///
/// Before the first active leaves update we can't really do much. We cannot check incoming
Expand All @@ -80,6 +90,11 @@ pub struct Initialized {
ordering_provider: OrderingProvider,
participation_receiver: WorkerMessageReceiver,
metrics: Metrics,
// This tracks only rolling session window failures.
// It can be a `Vec` if the need to track more arises.
error: Option<SessionsUnavailable>,
/// Latest relay blocks that have been successfully scraped.
last_scraped_blocks: LruCache<Hash, ()>,
}

impl Initialized {
Expand All @@ -105,6 +120,8 @@ impl Initialized {
participation,
participation_receiver,
metrics,
error: None,
last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY),
}
}

Expand Down Expand Up @@ -245,22 +262,26 @@ impl Initialized {
.await?;
self.participation.process_active_leaves_update(ctx, &update).await?;

let new_activations = update.activated.into_iter().map(|a| a.hash);
for new_leaf in new_activations {
match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await {
if let Some(new_leaf) = update.activated {
match self
.rolling_session_window
.cache_session_info_for_head(ctx, new_leaf.hash)
.await
{
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
target: LOG_TARGET,
err = ?e,
"Failed to update session cache for disputes",
);
continue
self.error = Some(e);
},
Ok(SessionWindowUpdate::Advanced {
new_window_end: window_end,
new_window_start,
..
}) => {
self.error = None;
let session = window_end;
if self.highest_session < session {
tracing::trace!(
Expand All @@ -277,7 +298,82 @@ impl Initialized {
},
Ok(SessionWindowUpdate::Unchanged) => {},
};
self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?;

// Scrape the head if above rolling session update went well.
if self.error.is_none() {
let _ = self
.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now)
.await
.map_err(|err| {
tracing::warn!(
target: LOG_TARGET,
"Skipping scraping block #{}({}) due to error: {}",
new_leaf.number,
new_leaf.hash,
err
);
});
}

// Try to scrape any blocks for which we could not get the current session or did not receive an
// active leaves update.
let ancestors = match get_finalized_block_number(ctx.sender()).await {
Ok(block_number) => {
// Limit our search to last finalized block, or up to max finality lag.
let block_number = std::cmp::max(
block_number,
new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

);
// Fetch ancestry up to and including the last finalized block.
// `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to
// pass in it's parent.
OrderingProvider::get_block_ancestors(
ctx.sender(),
new_leaf.hash,
new_leaf.number,
block_number.saturating_sub(1),
&mut self.last_scraped_blocks,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors due to an error",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
})
},
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
activated_leaf = ?new_leaf,
error = ?err,
"Skipping leaf ancestors scraping",
);
// We assume this is a spurious error so we'll move forward with an
// empty ancestry.
Vec::new()
},
};

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
for ancestor in ancestors {
let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err(
|err| {
tracing::warn!(
target: LOG_TARGET,
hash = ?ancestor,
error = ?err,
"Skipping scraping block due to error",
);
},
);
}
}

Ok(())
Expand All @@ -293,6 +389,11 @@ impl Initialized {
new_leaf: Hash,
now: u64,
) -> Result<()> {
// Avoid scraping twice.
if self.last_scraped_blocks.get(&new_leaf).is_some() {
return Ok(())
}

// obtain the concluded disputes as well as the candidate backing votes
// from the new leaf
let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = {
Expand Down Expand Up @@ -331,6 +432,9 @@ impl Initialized {
};

if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
// This block is not interesting as it doesnt contain any backing votes or disputes. We'll
// mark it here as scraped to prevent further processing.
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -413,6 +517,7 @@ impl Initialized {
}

if disputes.is_empty() {
self.last_scraped_blocks.put(new_leaf, ());
return Ok(())
}

Expand Down Expand Up @@ -490,6 +595,8 @@ impl Initialized {
"Attempted import of on-chain statement of concluded dispute failed"),
}
}

self.last_scraped_blocks.put(new_leaf, ());
Ok(())
}

Expand Down Expand Up @@ -533,18 +640,39 @@ impl Initialized {
}
},
DisputeCoordinatorMessage::RecentDisputes(tx) => {
let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
// Return error if session information is missing.
self.ensure_available_session_info()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};

let _ = tx.send(recent_disputes.keys().cloned().collect());
},
DisputeCoordinatorMessage::ActiveDisputes(tx) => {
let recent_disputes =
overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter();
let _ =
tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect());
// Return error if session information is missing.
self.ensure_available_session_info()?;

let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
disputes
} else {
BTreeMap::new()
};

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.collect(),
);
},
DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
// Return error if session information is missing.
self.ensure_available_session_info()?;

let mut query_output = Vec::new();
for (session_index, candidate_hash) in query.into_iter() {
for (session_index, candidate_hash) in query {
if let Some(v) =
overlay_db.load_candidate_votes(session_index, &candidate_hash)?
{
Expand Down Expand Up @@ -581,6 +709,9 @@ impl Initialized {
block_descriptions,
tx,
} => {
// Return error if session information is missing.
self.ensure_available_session_info()?;

let undisputed_chain = determine_undisputed_chain(
overlay_db,
base_number,
Expand All @@ -595,6 +726,15 @@ impl Initialized {
Ok(Box::new(|| Ok(())))
}

// Helper function for checking subsystem errors in message processing.
fn ensure_available_session_info(&self) -> Result<()> {
if let Some(subsystem_error) = self.error.clone() {
return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error)))
}

Ok(())
}

async fn handle_import_statements(
&mut self,
ctx: &mut impl SubsystemContext,
Expand Down
4 changes: 4 additions & 0 deletions node/core/dispute-coordinator/src/real/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ impl DisputeCoordinatorSubsystem {
},
};

// Before we move to the initialized state we need to check if we got at
// least on finality notification to prevent large ancestry block scraping,
// when the node is syncing.

let mut overlay_db = OverlayedBackend::new(&mut backend);
let (participations, spam_slots, ordering_provider) = match self
.handle_startup(
Expand Down
Loading