Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Resolve some todos in async backing statement-distribution branch #6482

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use futures::{

use error::{Error, FatalResult};
use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
ValidationResult, BACKING_EXECUTION_TIMEOUT,
minimum_votes, AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD,
StatementWithPVD, ValidationResult, BACKING_EXECUTION_TIMEOUT,
};
use polkadot_node_subsystem::{
messages::{
Expand Down Expand Up @@ -374,14 +374,6 @@ struct AttestingData {
backing: Vec<ValidatorIndex>,
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module and
/// statement distribution.
fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}

#[derive(Default)]
struct TableContext {
validator: Option<Validator>,
Expand Down
53 changes: 52 additions & 1 deletion node/network/statement-distribution/src/vstaging/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ClusterTracker {
}

/// Note that we accepted an incoming statement. This updates internal structures.
///
/// Should only be called after a successful `can_receive` call.
pub fn note_received(
&mut self,
Expand Down Expand Up @@ -748,5 +749,55 @@ mod tests {
);
}

// TODO [now] ensure statements received with prejudice don't prevent sending later
// Ensure statements received with prejudice don't prevent sending later.
#[test]
fn can_send_statements_received_with_prejudice() {
let group =
vec![ValidatorIndex(5), ValidatorIndex(200), ValidatorIndex(24), ValidatorIndex(146)];

let seconding_limit = 1;

let mut tracker = ClusterTracker::new(group.clone(), seconding_limit).expect("not empty");
let hash_a = CandidateHash(Hash::repeat_byte(1));
let hash_b = CandidateHash(Hash::repeat_byte(2));

assert_eq!(
tracker.can_receive(
ValidatorIndex(200),
ValidatorIndex(5),
CompactStatement::Seconded(hash_a),
),
Ok(Accept::Ok),
);

tracker.note_received(
ValidatorIndex(200),
ValidatorIndex(5),
CompactStatement::Seconded(hash_a),
);

assert_eq!(
tracker.can_receive(
ValidatorIndex(24),
ValidatorIndex(5),
CompactStatement::Seconded(hash_b),
),
Ok(Accept::WithPrejudice),
);

tracker.note_received(
ValidatorIndex(24),
ValidatorIndex(5),
CompactStatement::Seconded(hash_b),
);

assert_eq!(
tracker.can_send(
ValidatorIndex(24),
ValidatorIndex(5),
CompactStatement::Seconded(hash_a),
),
Ok(()),
);
}
}
3 changes: 2 additions & 1 deletion node/network/statement-distribution/src/vstaging/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! A utility for tracking groups and their members within a session.

use polkadot_node_primitives::minimum_votes;
use polkadot_primitives::vstaging::{AuthorityDiscoveryId, GroupIndex, IndexedVec, ValidatorIndex};

use std::collections::HashMap;
Expand Down Expand Up @@ -68,7 +69,7 @@ impl Groups {
&self,
group_index: GroupIndex,
) -> Option<(usize, usize)> {
self.get(group_index).map(|g| (g.len(), super::minimum_votes(g.len())))
self.get(group_index).map(|g| (g.len(), minimum_votes(g.len())))
}

/// Get the group index for a validator by index.
Expand Down
19 changes: 5 additions & 14 deletions node/network/statement-distribution/src/vstaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mod statement_store;

const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep =
Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent");
Rep::CostMinor("Unexpected Statement, missing knowledge for relay parent");
const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep =
Rep::CostMinor("Unexpected Statement, unknown candidate");
const COST_UNEXPECTED_STATEMENT_REMOTE: Rep =
Expand Down Expand Up @@ -202,8 +202,8 @@ pub(crate) struct State {
request_manager: RequestManager,
}

// For the provided validator index, if there is a connected peer
// controlling the given authority ID,
// For the provided validator index, if there is a connected peer controlling the given authority
// ID, then return that peer's `PeerId`.
fn connected_validator_peer(
authorities: &HashMap<AuthorityDiscoveryId, PeerId>,
per_session: &PerSessionState,
Expand Down Expand Up @@ -278,14 +278,6 @@ impl PeerState {
}
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module.
// TODO [now]: extract to shared primitives
fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}

#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
pub(crate) async fn handle_network_update<Context>(
ctx: &mut Context,
Expand Down Expand Up @@ -1118,11 +1110,11 @@ async fn handle_incoming_statement<Context>(
.request_manager
.get_or_insert(relay_parent, candidate_hash, originator_group);

request_entry.get_mut().add_peer(peer);
request_entry.add_peer(peer);

// We only successfully accept statements from the grid on confirmed
// candidates, therefore this check only passes if the statement is from the cluster
request_entry.get_mut().set_cluster_priority();
request_entry.set_cluster_priority();
}

let was_fresh = match per_relay_parent.statement_store.insert(
Expand Down Expand Up @@ -1835,7 +1827,6 @@ async fn handle_incoming_manifest<Context>(
state
.request_manager
.get_or_insert(manifest.relay_parent, manifest.candidate_hash, manifest.group_index)
.get_mut()
.add_peer(peer);
}
}
Expand Down
175 changes: 152 additions & 23 deletions node/network/statement-distribution/src/vstaging/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@

//! A requester for full information on candidates.
//!
// TODO [now]: some module docs.
//! 1. We use `RequestManager::get_or_insert().get_mut()` to add and mutate [`RequestedCandidate`]s, either setting the
//! priority or adding a peer we know has the candidate. We currently prioritize "cluster" candidates (those from our
//! own group, although the cluster mechanism could be made to include multiple groups in the future) over "grid"
//! candidates (those from other groups).
//!
//! 2. The main loop of the module will invoke [`RequestManager::next_request`] in a loop until it returns `None`,
//! dispatching all requests with the `NetworkBridgeTxMessage`. The receiving half of the channel is owned by the
//! [`RequestManager`].
//!
//! 3. The main loop of the module will also select over [`RequestManager::await_incoming`] to receive
//! [`UnhandledResponse`]s, which it then validates using [`UnhandledResponse::validate_response`] (which requires state
//! not owned by the request manager).

use super::{
BENEFIT_VALID_RESPONSE, BENEFIT_VALID_STATEMENT, COST_IMPROPERLY_DECODED_RESPONSE,
Expand Down Expand Up @@ -78,20 +89,6 @@ pub struct RequestedCandidate {
in_flight: bool,
}

impl RequestedCandidate {
/// Add a peer to the set of known peers.
pub fn add_peer(&mut self, peer: PeerId) {
if !self.known_by.contains(&peer) {
self.known_by.push_back(peer);
}
}

/// Note that the candidate is required for the cluster.
pub fn set_cluster_priority(&mut self) {
self.priority.origin = Origin::Cluster;
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Origin {
Cluster = 0,
Expand All @@ -113,14 +110,17 @@ pub struct Entry<'a> {
}

impl<'a> Entry<'a> {
/// Access the underlying requested candidate.
pub fn get_mut(&mut self) -> &mut RequestedCandidate {
&mut self.requested
/// Add a peer to the set of known peers.
pub fn add_peer(&mut self, peer: PeerId) {
if !self.requested.known_by.contains(&peer) {
self.requested.known_by.push_back(peer);
}
}
}

impl<'a> Drop for Entry<'a> {
fn drop(&mut self) {
/// Note that the candidate is required for the cluster.
pub fn set_cluster_priority(&mut self) {
self.requested.priority.origin = Origin::Cluster;

insert_or_update_priority(
&mut *self.by_priority,
Some(self.prev_index),
Expand Down Expand Up @@ -213,7 +213,35 @@ impl RequestManager {
}
}

// TODO [now]: removal based on relay-parent.
/// Remove based on relay-parent.
pub fn remove_by_relay_parent(&mut self, relay_parent: Hash) {
let mut candidate_hashes = HashSet::new();

// Remove from `by_priority` and `requests`.
self.by_priority.retain(|(_priority, id)| {
let retain = relay_parent != id.relay_parent;
if !retain {
self.requests.remove(id);
candidate_hashes.insert(id.candidate_hash);
}
retain
});

// Remove from `unique_identifiers`.
for candidate_hash in candidate_hashes {
match self.unique_identifiers.entry(candidate_hash) {
HEntry::Occupied(mut entry) => {
entry.get_mut().retain(|id| relay_parent != id.relay_parent);
if entry.get().is_empty() {
entry.remove();
}
},
// We can expect to encounter vacant entries, but only if nodes are misbehaving and
// we don't use a deduplicating collection; there are no issues from ignoring it.
HEntry::Vacant(entry) => (),
}
}
}

/// Yields the next request to dispatch, if there is any.
///
Expand Down Expand Up @@ -622,5 +650,106 @@ fn insert_or_update_priority(
mod tests {
use super::*;

// TODO [now]: test priority ordering.
#[test]
fn test_remove_by_relay_parent() {
let parent_a = Hash::from_low_u64_le(1);
let parent_b = Hash::from_low_u64_le(2);
let parent_c = Hash::from_low_u64_le(3);

let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));
let duplicate_hash = CandidateHash(Hash::from_low_u64_le(31));

let mut request_manager = RequestManager::new();
request_manager.get_or_insert(parent_a, candidate_a1, 1.into());
request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
request_manager.get_or_insert(parent_b, candidate_b1, 1.into());
request_manager.get_or_insert(parent_b, candidate_b2, 2.into());
request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
request_manager.get_or_insert(parent_a, duplicate_hash, 1.into());

assert_eq!(request_manager.requests.len(), 6);
assert_eq!(request_manager.by_priority.len(), 6);
assert_eq!(request_manager.unique_identifiers.len(), 5);

request_manager.remove_by_relay_parent(parent_a);

assert_eq!(request_manager.requests.len(), 3);
assert_eq!(request_manager.by_priority.len(), 3);
assert_eq!(request_manager.unique_identifiers.len(), 3);

assert!(!request_manager.unique_identifiers.contains_key(&candidate_a1));
assert!(!request_manager.unique_identifiers.contains_key(&candidate_a2));
// Duplicate hash should still be there (under a different parent).
assert!(request_manager.unique_identifiers.contains_key(&duplicate_hash));

request_manager.remove_by_relay_parent(parent_b);

assert_eq!(request_manager.requests.len(), 1);
assert_eq!(request_manager.by_priority.len(), 1);
assert_eq!(request_manager.unique_identifiers.len(), 1);

assert!(!request_manager.unique_identifiers.contains_key(&candidate_b1));
assert!(!request_manager.unique_identifiers.contains_key(&candidate_b2));

request_manager.remove_by_relay_parent(parent_c);

assert!(request_manager.requests.is_empty());
assert!(request_manager.by_priority.is_empty());
assert!(request_manager.unique_identifiers.is_empty());
}

#[test]
fn test_priority_ordering() {
let parent_a = Hash::from_low_u64_le(1);
let parent_b = Hash::from_low_u64_le(2);
let parent_c = Hash::from_low_u64_le(3);

let candidate_a1 = CandidateHash(Hash::from_low_u64_le(11));
let candidate_a2 = CandidateHash(Hash::from_low_u64_le(12));
let candidate_b1 = CandidateHash(Hash::from_low_u64_le(21));
let candidate_b2 = CandidateHash(Hash::from_low_u64_le(22));
let candidate_c1 = CandidateHash(Hash::from_low_u64_le(31));

let mut request_manager = RequestManager::new();

// Add some entries, set a couple of them to cluster (high) priority.
let identifier_a1 = request_manager
.get_or_insert(parent_a, candidate_a1, 1.into())
.identifier
.clone();
let identifier_a2 = {
let mut entry = request_manager.get_or_insert(parent_a, candidate_a2, 1.into());
entry.set_cluster_priority();
entry.identifier.clone()
};
let identifier_b1 = request_manager
.get_or_insert(parent_b, candidate_b1, 1.into())
.identifier
.clone();
let identifier_b2 = request_manager
.get_or_insert(parent_b, candidate_b2, 2.into())
.identifier
.clone();
let identifier_c1 = {
let mut entry = request_manager.get_or_insert(parent_c, candidate_c1, 2.into());
entry.set_cluster_priority();
entry.identifier.clone()
};

let attempts = 0;
assert_eq!(
request_manager.by_priority,
vec![
(Priority { origin: Origin::Cluster, attempts }, identifier_a2),
(Priority { origin: Origin::Cluster, attempts }, identifier_c1),
(Priority { origin: Origin::Unspecified, attempts }, identifier_a1),
(Priority { origin: Origin::Unspecified, attempts }, identifier_b1),
(Priority { origin: Origin::Unspecified, attempts }, identifier_b2),
]
);
}
}
7 changes: 7 additions & 0 deletions node/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,10 @@ pub fn maybe_compress_pov(pov: PoV) -> PoV {
let pov = PoV { block_data: BlockData(raw) };
pov
}

/// How many votes we need to consider a candidate backed.
///
/// WARNING: This has to be kept in sync with the runtime check in the inclusion module.
pub fn minimum_votes(n_validators: usize) -> usize {
std::cmp::min(2, n_validators)
}
3 changes: 1 addition & 2 deletions runtime/parachains/src/inclusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ impl<H> Default for ProcessedCandidates<H> {

/// Number of backing votes we need for a valid backing.
///
/// WARNING: This check has to be kept in sync with the node side check in the backing
/// subsystem.
/// WARNING: This check has to be kept in sync with the node side checks.
pub fn minimum_backing_votes(n_validators: usize) -> usize {
// For considerations on this value see:
// https://github.com/paritytech/polkadot/pull/1656#issuecomment-999734650
Expand Down