Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Resolve some todos in async backing statement-distribution branch #6482

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use futures::{

use error::{Error, FatalResult};
use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
ValidationResult, BACKING_EXECUTION_TIMEOUT,
minimum_votes, AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD,
StatementWithPVD, ValidationResult, BACKING_EXECUTION_TIMEOUT,
};
use polkadot_node_subsystem::{
messages::{
Expand Down Expand Up @@ -385,14 +385,6 @@ struct AttestingData {
backing: Vec<ValidatorIndex>,
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and
/// statement distribution.
fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}

#[derive(Default)]
struct TableContext {
validator: Option<Validator>,
Expand Down
53 changes: 52 additions & 1 deletion node/network/statement-distribution/src/vstaging/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ClusterTracker {
}

/// Note that we accepted an incoming statement. This updates internal structures.
///
/// Should only be called after a successful `can_receive` call.
pub fn note_received(
&mut self,
Expand Down Expand Up @@ -748,5 +749,55 @@ mod tests {
);
}

// TODO [now] ensure statements received with prejudice don't prevent sending later
// Ensure statements received with prejudice don't prevent sending later.
#[test]
fn can_send_statements_received_with_prejudice() {
let group =
vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];

let seconding_limit = 1;

let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
let hash_a = CandidateHash(Hash::repeat_byte(1));
let hash_b = CandidateHash(Hash::repeat_byte(2));

assert_eq!(
tracker.can_receive(
ValidatorIndex(200),
ValidatorIndex(5),
CompactStatement::Seconded(hash_a),
),
Ok(Accept::Ok),
);

tracker.note_received(
ValidatorIndex(200),
ValidatorIndex(5),
CompactStatement::Seconded(hash_a),
);

assert_eq!(
tracker.can_receive(
ValidatorIndex(24),
ValidatorIndex(5),
CompactStatement::Seconded(hash_b),
),
Ok(Accept::WithPrejudice),
);

tracker.note_received(
ValidatorIndex(24),
ValidatorIndex(5),
CompactStatement::Seconded(hash_b),
);

assert_eq!(
tracker.can_send(
ValidatorIndex(24),
ValidatorIndex(5),
CompactStatement::Seconded(hash_a),
),
Ok(()),
);
}
}
10 changes: 1 addition & 9 deletions node/network/statement-distribution/src/vstaging/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! A utility for tracking groups and their members within a session.

use polkadot_node_primitives::minimum_votes;
use polkadot_primitives::vstaging::{AuthorityDiscoveryId, GroupIndex, ValidatorIndex};

use std::collections::HashMap;
Expand Down Expand Up @@ -78,12 +79,3 @@ impl Groups {
self.by_discovery_key.get(&discovery_key).map(|x| *x)
}
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and
/// the backing subsystem.
// TODO [now]: extract to shared primitives.
fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}
10 changes: 2 additions & 8 deletions node/network/statement-distribution/src/vstaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod statement_store;

const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent");
Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep =
Rep::CostMinor("Unexpected Statement, unknown candidate");
const COST_UNEXPECTED_STATEMENT_REMOTE: Rep =
Expand Down Expand Up @@ -180,6 +180,7 @@ pub(crate) struct State {

// For the provided validator index, if there is a connected peer
// controlling the given authority ID,
// TODO [now]: finish above doc.
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
fn connected_validator_peer(
authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
per_session: &PerSessionState,
Expand Down Expand Up @@ -214,13 +215,6 @@ impl PeerState {
}
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module.
fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_network_update<Context>(
ctx: &mut Context,
Expand Down
70 changes: 69 additions & 1 deletion node/network/statement-distribution/src/vstaging/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,24 @@ impl RequestManager {
}
}

// TODO [now]: removal based on relay-parent.
/// Remove based on relay-parent.
pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) {
// Remove from `by_priority` and `requests`.
self.by_priority.retain(|(_priority, id)| {
let retain = relay_parent != id.relay_parent;
if !retain {
self.requests.remove(id);
}
retain
});
// Remove from `unique_identifiers`.
for (_, candidate_identifier_set) in self.unique_identifiers.iter_mut() {
candidate_identifier_set.retain(|id| relay_parent != id.relay_parent);
}
// TODO: is this necessary?
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
// If any candidates don't have unique identifiers, remove them.
self.unique_identifiers.retain(|_, set| !set.is_empty());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't totally sure about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems correct, since we would keep around empty unique_identifiers sets over time and gradually leak memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation could be made more efficient with some code like this:

  1. In the first step, when pruning by_priority, construct a Vec<CandidateHash> or HashSet of candidates to inspect in unique_identifiers.
  2. Loop over this set, invoking self.unique_identifiers.entry(candidate_hash), and then in a single branch on Entry::Occupied
    a. remove all entries with the relay_parent from the candidate identifier set
    b. if the set is empty, invoke entry.remove()
    c. we can ignore Entry::Vacant. I can explain why we can expect to encounter it but only if nodes are misbehaving and we don't use a deduplicating collection, and there are no issues from ignoring it

}

/// Yields the next request to dispatch, if there is any.
///
Expand Down Expand Up @@ -615,5 +632,56 @@ fn insert_or_update_priority(
mod tests {
use super::*;

#[test]
fn test_remove_by_relay_parent() {
let parent_a = Hash::from_low_u64_le(10);
let parent_b = Hash::from_low_u64_le(11);
let parent_c = Hash::from_low_u64_le(12);

let candidate_a1 = CandidateHash(Hash::from_low_u64_le(101));
let candidate_a2 = CandidateHash(Hash::from_low_u64_le(102));
let candidate_b1 = CandidateHash(Hash::from_low_u64_le(111));
let candidate_b2 = CandidateHash(Hash::from_low_u64_le(112));
let candidate_c1 = CandidateHash(Hash::from_low_u64_le(121));
let duplicate_hash = CandidateHash(Hash::from_low_u64_le(121));

let mut request_manager = RequestManager::new();
request_manager.get_or_insert(parent_a, candidate_a1, 1.into());
request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
request_manager.get_or_insert(parent_b, candidate_b1, 1.into());
request_manager.get_or_insert(parent_b, candidate_b2, 2.into());
request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
request_manager.get_or_insert(parent_a, duplicate_hash, 1.into());

assert_eq!(request_manager.requests.len(), 6);
assert_eq!(request_manager.by_priority.len(), 6);
assert_eq!(request_manager.unique_identifiers.len(), 5);

request_manager.remove_by_relay_parent(parent_a);

assert_eq!(request_manager.requests.len(), 3);
assert_eq!(request_manager.by_priority.len(), 3);
assert_eq!(request_manager.unique_identifiers.len(), 3);

assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1));
assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2));
assert!(!request_manager.unique_identifiers.contains_key(&duplicate_hash));

request_manager.remove_by_relay_parent(parent_b);

assert_eq!(request_manager.requests.len(), 1);
assert_eq!(request_manager.by_priority.len(), 1);
assert_eq!(request_manager.unique_identifiers.len(), 1);

assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1));
assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2));

request_manager.remove_by_relay_parent(parent_c);

assert!(request_manager.requests.is_empty());
assert!(request_manager.requests.is_empty());
assert!(request_manager.requests.is_empty());
}

// TODO [now]: test priority ordering.
}
8 changes: 8 additions & 0 deletions node/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,11 @@ pub fn maybe_compress_pov(pov: PoV) -> PoV {
let pov = PoV { block_data: BlockData(raw) };
pov
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
/// the backing subsystem.
pub fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}