Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispute-coordinator: disabling in participation #2637

Merged
merged 9 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,9 +1030,10 @@ async fn construct_per_relay_parent_state<Context>(
// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
// `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the
// `try_join!` above and use `try_runtime_api!` to get `disabled_validators`
let disabled_validators = get_disabled_validators_with_fallback(ctx.sender(), parent)
.await
.map_err(Error::UtilError)?;
let disabled_validators =
get_disabled_validators_with_fallback(ctx.sender(), parent).await.map_err(|e| {
Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error conversion is a bit ugly, but I'm open for suggestions how to improve it

})?;

let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
Expand Down
27 changes: 26 additions & 1 deletion polkadot/node/core/dispute-coordinator/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct CandidateEnvironment<'a> {
executor_params: &'a ExecutorParams,
/// Validator indices controlled by this node.
controlled_indices: HashSet<ValidatorIndex>,
/// Indices of disabled validators at the `relay_parent`.
disabled_indices: HashSet<ValidatorIndex>,
}

#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
Expand All @@ -66,6 +68,16 @@ impl<'a> CandidateEnvironment<'a> {
session_index: SessionIndex,
relay_parent: Hash,
) -> Option<CandidateEnvironment<'a>> {
let disabled_indices = runtime_info
.get_disabled_validators(ctx.sender(), relay_parent)
.await
.unwrap_or_else(|err| {
gum::info!(target: LOG_TARGET, ?err, "Failed to get disabled validators");
Vec::new()
})
.into_iter()
.collect();

let (session, executor_params) = match runtime_info
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await
Expand All @@ -76,7 +88,7 @@ impl<'a> CandidateEnvironment<'a> {
};

let controlled_indices = find_controlled_validator_indices(keystore, &session.validators);
Some(Self { session_index, session, executor_params, controlled_indices })
Some(Self { session_index, session, executor_params, controlled_indices, disabled_indices })
}

/// Validators in the candidate's session.
Expand All @@ -103,6 +115,11 @@ impl<'a> CandidateEnvironment<'a> {
pub fn controlled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
&self.controlled_indices
}

/// Indices of disabled validators at the `relay_parent`.
pub fn disabled_indices(&'a self) -> &'a HashSet<ValidatorIndex> {
&self.disabled_indices
}
}

/// Whether or not we already issued some statement about a candidate.
Expand Down Expand Up @@ -344,6 +361,14 @@ impl CandidateVoteState<CandidateVotes> {
&self.votes.candidate_receipt
}

/// Returns true if all the invalid votes are from disabled validators.
pub fn invalid_votes_all_disabled(
&self,
mut is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
self.votes.invalid.keys().all(|i| is_disabled(i))
}

/// Extract `CandidateVotes` for handling import of new statements.
fn into_old_state(self) -> (CandidateVotes, CandidateVoteState<()>) {
let CandidateVoteState { votes, own_vote, dispute_status, byzantine_threshold_against } =
Expand Down
155 changes: 140 additions & 15 deletions polkadot/node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Dispute coordinator subsystem in initialized state (after first active leaf is received).

use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashSet, VecDeque},
sync::Arc,
};

Expand Down Expand Up @@ -47,6 +47,7 @@ use polkadot_primitives::{
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex,
};
use schnellru::{LruMap, UnlimitedCompact};

use crate::{
db,
Expand Down Expand Up @@ -92,6 +93,9 @@ pub struct InitialData {
pub(crate) struct Initialized {
keystore: Arc<LocalKeystore>,
runtime_info: RuntimeInfo,
/// We have the onchain state of disabled validators as well as the offchain
/// state that is based on the lost disputes.
offchain_disabled_validators: OffchainDisabledValidators,
/// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doesn't matter if it
/// was cached successfully or not. It is used to detect ancient disputes.
highest_session_seen: SessionIndex,
Expand Down Expand Up @@ -130,10 +134,12 @@ impl Initialized {

let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender, metrics.clone());
let offchain_disabled_validators = OffchainDisabledValidators::default();

Self {
keystore,
runtime_info,
offchain_disabled_validators,
highest_session_seen,
gaps_in_cache,
spam_slots,
Expand Down Expand Up @@ -319,13 +325,16 @@ impl Initialized {
self.runtime_info.pin_block(session_idx, new_leaf.unpin_handle);
// Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps
// in cache and we are not missing too many `SessionInfo`s
let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
if !self.gaps_in_cache && self.highest_session_seen > lower_bound {
lower_bound = self.highest_session_seen + 1
}
let prune_up_to = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
let fetch_lower_bound =
if !self.gaps_in_cache && self.highest_session_seen > prune_up_to {
self.highest_session_seen + 1
} else {
prune_up_to
};

// There is a new session. Perform a dummy fetch to cache it.
for idx in lower_bound..=session_idx {
for idx in fetch_lower_bound..=session_idx {
if let Err(err) = self
.runtime_info
.get_session_info_by_index(ctx.sender(), new_leaf.hash, idx)
Expand All @@ -344,11 +353,9 @@ impl Initialized {

self.highest_session_seen = session_idx;

db::v1::note_earliest_session(
overlay_db,
session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1),
)?;
self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1));
db::v1::note_earliest_session(overlay_db, prune_up_to)?;
self.spam_slots.prune_old(prune_up_to);
self.offchain_disabled_validators.prune_old(prune_up_to);
},
Ok(_) => { /* no new session => nothing to cache */ },
Err(err) => {
Expand Down Expand Up @@ -978,11 +985,13 @@ impl Initialized {
Some(env) => env,
};

let n_validators = env.validators().len();

gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?session,
num_validators = ?env.session_info().validators.len(),
?n_validators,
"Number of validators"
);

Expand Down Expand Up @@ -1084,18 +1093,42 @@ impl Initialized {
target: LOG_TARGET,
?candidate_hash,
?session,
num_validators = ?env.session_info().validators.len(),
?n_validators,
"Import result ready"
);

let new_state = import_result.new_state();

let byzantine_threshold = polkadot_primitives::byzantine_threshold(n_validators);
// combine on-chain with off-chain disabled validators
// process disabled validators in the following order:
// - on-chain disabled validators
// - prioritized order of off-chain disabled validators
// deduplicate the list and take at most `byzantine_threshold` validators
let disabled_validators = {
let mut d: HashSet<ValidatorIndex> = HashSet::new();
for v in env
.disabled_indices()
.iter()
.cloned()
.chain(self.offchain_disabled_validators.iter(session))
{
if d.len() == byzantine_threshold {
break
}
d.insert(v);
}
d
};

let is_included = self.scraper.is_candidate_included(&candidate_hash);
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let own_vote_missing = new_state.own_vote_missing();
let is_disputed = new_state.is_disputed();
let is_confirmed = new_state.is_confirmed();
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash);
// We participate only in disputes which are not potential spam.
let potential_spam = is_potential_spam(&self.scraper, &new_state, &candidate_hash, |v| {
disabled_validators.contains(v)
});
let allow_participation = !potential_spam;

gum::trace!(
Expand All @@ -1106,6 +1139,7 @@ impl Initialized {
?candidate_hash,
confirmed = ?new_state.is_confirmed(),
has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(),
n_disabled_validators = ?disabled_validators.len(),
"Is spam?"
);

Expand Down Expand Up @@ -1337,6 +1371,10 @@ impl Initialized {
);
}
}
for validator_index in new_state.votes().invalid.keys() {
self.offchain_disabled_validators
.insert_against_valid(session, *validator_index);
}
self.metrics.on_concluded_valid();
}
if import_result.is_freshly_concluded_against() {
Expand All @@ -1356,6 +1394,14 @@ impl Initialized {
);
}
}
for (validator_index, (kind, _sig)) in new_state.votes().valid.raw() {
let is_backer = kind.is_backing();
self.offchain_disabled_validators.insert_for_invalid(
session,
*validator_index,
is_backer,
);
}
self.metrics.on_concluded_invalid();
}

Expand Down Expand Up @@ -1591,3 +1637,82 @@ fn determine_undisputed_chain(

Ok(last)
}

#[derive(Default)]
struct OffchainDisabledValidators {
// Ideally, we want to use the top `byzantine_threshold` offenders here based on the amount of
// stake slashed. However, given that slashing might be applied with a delay, we want to have
// some list of offenders as soon as disputes conclude offchain. This list only approximates
// the top offenders and only accounts for lost disputes. But that should be good enough to
// prevent spam attacks.
per_session: BTreeMap<SessionIndex, LostSessionDisputes>,
}

struct LostSessionDisputes {
// We separate lost disputes to prioritize "for invalid" offenders. And among those, we
// prioritize backing votes the most. There's no need to limit the size of these sets, as they
// are already limited by the number of validators in the session. We use `LruMap` to ensure
// the iteration order prioritizes most recently disputes lost over older ones in case we reach
// the limit.
backers_for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
against_valid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
ordian marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for LostSessionDisputes {
fn default() -> Self {
Self {
backers_for_invalid: LruMap::new(UnlimitedCompact),
for_invalid: LruMap::new(UnlimitedCompact),
against_valid: LruMap::new(UnlimitedCompact),
}
}
}

impl OffchainDisabledValidators {
fn prune_old(&mut self, up_to_excluding: SessionIndex) {
eskimor marked this conversation as resolved.
Show resolved Hide resolved
// split_off returns everything after the given key, including the key.
let mut relevant = self.per_session.split_off(&up_to_excluding);
std::mem::swap(&mut relevant, &mut self.per_session);
}

fn insert_for_invalid(
&mut self,
session_index: SessionIndex,
validator_index: ValidatorIndex,
is_backer: bool,
) {
let entry = self.per_session.entry(session_index).or_default();
if is_backer {
entry.backers_for_invalid.insert(validator_index, ());
} else {
entry.for_invalid.insert(validator_index, ());
}
}

fn insert_against_valid(
&mut self,
session_index: SessionIndex,
validator_index: ValidatorIndex,
) {
self.per_session
.entry(session_index)
.or_default()
.against_valid
.insert(validator_index, ());
}

/// Iterate over all validators that are offchain disabled.
/// The order of iteration prioritizes `for_invalid` offenders (and backers among those) over
/// `against_valid` offenders. And most recently lost disputes over older ones.
/// NOTE: the iterator might contain duplicates.
fn iter(&self, session_index: SessionIndex) -> impl Iterator<Item = ValidatorIndex> + '_ {
self.per_session.get(&session_index).into_iter().flat_map(|e| {
e.backers_for_invalid
.iter()
.chain(e.for_invalid.iter())
.chain(e.against_valid.iter())
.map(|(i, _)| *i)
})
}
}
15 changes: 10 additions & 5 deletions polkadot/node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,10 @@ impl DisputeCoordinatorSubsystem {
},
};
let vote_state = CandidateVoteState::new(votes, &env, now);

let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash);
let onchain_disabled = env.disabled_indices();
let potential_spam = is_potential_spam(&scraper, &vote_state, candidate_hash, |v| {
onchain_disabled.contains(v)
});
let is_included =
scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());

Expand Down Expand Up @@ -462,17 +464,20 @@ async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<Activa
/// Check wheter a dispute for the given candidate could be spam.
///
/// That is the candidate could be made up.
pub fn is_potential_spam<V>(
pub fn is_potential_spam(
scraper: &ChainScraper,
vote_state: &CandidateVoteState<V>,
vote_state: &CandidateVoteState<CandidateVotes>,
candidate_hash: &CandidateHash,
is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
let is_disputed = vote_state.is_disputed();
let is_included = scraper.is_candidate_included(candidate_hash);
let is_backed = scraper.is_candidate_backed(candidate_hash);
let is_confirmed = vote_state.is_confirmed();
let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled);
let ignore_disabled = !is_confirmed && all_invalid_votes_disabled;

is_disputed && !is_included && !is_backed && !is_confirmed
(is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled
}

/// Tell dispute-distribution to send all our votes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ fn cannot_participate_if_cannot_recover_validation_code() {
let mut participation = Participation::new(sender, Metrics::default());
activate_leaf(&mut ctx, &mut participation, 10).await.unwrap();
participate(&mut ctx, &mut participation).await.unwrap();

recover_available_data(&mut ctx_handle).await;

assert_matches!(
Expand Down
Loading