diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index 957953cf38e2..c79bfdada8cb 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -1606,10 +1606,7 @@ async fn sign_import_and_distribute_statement( if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics).await { import_statement(ctx, rp_state, per_candidate, &signed_statement).await?; - let smsg = StatementDistributionMessage::Share( - rp_state.parent, - StatementWithPVD::drop_pvd_from_signed(signed_statement.clone()), - ); + let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone()); ctx.send_unbounded_message(smsg); Ok(Some(signed_statement)) diff --git a/node/core/backing/src/tests/mod.rs b/node/core/backing/src/tests/mod.rs index 81ebcf5c6dc4..2e2a5878a888 100644 --- a/node/core/backing/src/tests/mod.rs +++ b/node/core/backing/src/tests/mod.rs @@ -928,7 +928,7 @@ fn backing_misbehavior_works() { signed_statement, ) ) if relay_parent == test_state.relay_parent => { - assert_eq!(*signed_statement.payload(), Statement::Valid(candidate_a_hash)); + assert_eq!(*signed_statement.payload(), StatementWithPVD::Valid(candidate_a_hash)); } ); @@ -1092,14 +1092,14 @@ fn backing_dont_second_invalid() { virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive( - _pvd, + pvd, _validation_code, candidate_receipt, _pov, timeout, tx, ), - ) if _pvd == pvd_b && + ) if pvd == pvd_b && _validation_code == validation_code_b && *_pov == pov_block_b && &candidate_receipt.descriptor == candidate_b.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && @@ -1135,7 +1135,7 @@ fn backing_dont_second_invalid() { signed_statement, ) ) if parent_hash == test_state.relay_parent => { - assert_eq!(*signed_statement.payload(), Statement::Seconded(candidate_b)); + assert_eq!(*signed_statement.payload(), StatementWithPVD::Seconded(candidate_b, pvd_b.clone())); } ); diff --git a/node/core/backing/src/tests/prospective_parachains.rs b/node/core/backing/src/tests/prospective_parachains.rs index 212c7d7167ae..3f7065d7d5df 100644 --- a/node/core/backing/src/tests/prospective_parachains.rs +++ b/node/core/backing/src/tests/prospective_parachains.rs @@ -1276,7 +1276,7 @@ fn concurrent_dependent_candidates() { let payload = statement.payload(); assert_matches!( payload.clone(), - Statement::Valid(hash) + StatementWithPVD::Valid(hash) if hash == candidate_a_hash || hash == candidate_b_hash => { assert!(valid_statements.insert(hash)); diff --git a/node/network/bridge/src/rx/tests.rs b/node/network/bridge/src/rx/tests.rs index dc22bf66bf22..5b43695d6ff6 100644 --- a/node/network/bridge/src/rx/tests.rs +++ b/node/network/bridge/src/rx/tests.rs @@ -1414,14 +1414,16 @@ fn network_protocol_versioning_subsystem_msg() { } ); - let metadata = protocol_vstaging::StatementMetadata { + let metadata = protocol_v1::StatementMetadata { relay_parent: Hash::zero(), candidate_hash: CandidateHash::default(), signed_by: ValidatorIndex(0), signature: sp_core::crypto::UncheckedFrom::unchecked_from([1u8; 64]), }; let statement_distribution_message = - protocol_vstaging::StatementDistributionMessage::LargeStatement(metadata); + protocol_vstaging::StatementDistributionMessage::V1Compatibility( + protocol_v1::StatementDistributionMessage::LargeStatement(metadata), + ); let msg = protocol_vstaging::ValidationProtocol::StatementDistribution( statement_distribution_message.clone(), ); diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 1c561905ecf9..9b8545a14e2c 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -946,7 +946,7 @@ async fn handle_network_msg( PeerMessage(remote, Versioned::V1(msg)) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; }, - PeerMessage(_, Versioned::VStaging(msg)) => {}, + PeerMessage(_, Versioned::VStaging(_msg)) => {}, NewGossipTopology { .. } => { // impossible! }, diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 5996ebd1d2b3..806581a2dd89 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -1084,7 +1084,7 @@ async fn handle_network_msg( PeerMessage(remote, Versioned::V1(msg)) => { process_incoming_peer_message(ctx, state, remote, msg).await; }, - PeerMessage(_, Versioned::VStaging(msg)) => {}, + PeerMessage(_, Versioned::VStaging(_msg)) => {}, } Ok(()) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index b9f40241a4d7..a45bca82df49 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -592,8 +592,8 @@ pub mod vstaging { use parity_scale_codec::{Decode, Encode}; use polkadot_primitives::vstaging::{ - CandidateHash, CandidateIndex, CollatorId, CollatorSignature, CompactStatement, Hash, - Id as ParaId, UncheckedSignedAvailabilityBitfield, ValidatorIndex, ValidatorSignature, + CandidateIndex, CollatorId, CollatorSignature, Hash, Id as ParaId, + UncheckedSignedAvailabilityBitfield, }; use polkadot_node_primitives::{ @@ -612,67 +612,16 @@ pub mod vstaging { /// Network messages used by the statement distribution subsystem. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum StatementDistributionMessage { - /// A signed full statement under a given relay-parent. - #[codec(index = 0)] - Statement(Hash, UncheckedSignedFullStatement), - /// Seconded statement with large payload (e.g. containing a runtime upgrade). + // TODO [now]: notifications for v2 + /// All messages for V1 for compatibility with the statement distribution + /// protocol, for relay-parents that don't support asynchronous backing. /// - /// We only gossip the hash in that case, actual payloads can be fetched from sending node - /// via request/response. - #[codec(index = 1)] - LargeStatement(StatementMetadata), - } - - /// Data that makes a statement unique. - #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Hash)] - pub struct StatementMetadata { - /// Relay parent this statement is relevant under. - pub relay_parent: Hash, - /// Hash of the candidate that got validated. - pub candidate_hash: CandidateHash, - /// Validator that attested the validity. - pub signed_by: ValidatorIndex, - /// Signature of seconding validator. - pub signature: ValidatorSignature, - } - - impl StatementDistributionMessage { - /// Get fingerprint describing the contained statement uniquely. - pub fn get_fingerprint(&self) -> (CompactStatement, ValidatorIndex) { - match self { - Self::Statement(_, statement) => ( - statement.unchecked_payload().to_compact(), - statement.unchecked_validator_index(), - ), - Self::LargeStatement(meta) => - (CompactStatement::Seconded(meta.candidate_hash), meta.signed_by), - } - } - - /// Get the signature from the statement. - pub fn get_signature(&self) -> ValidatorSignature { - match self { - Self::Statement(_, statement) => statement.unchecked_signature().clone(), - Self::LargeStatement(metadata) => metadata.signature.clone(), - } - } - - /// Get contained relay parent. - pub fn get_relay_parent(&self) -> Hash { - match self { - Self::Statement(r, _) => *r, - Self::LargeStatement(meta) => meta.relay_parent, - } - } - - /// Whether this message contains a large statement. - pub fn is_large_statement(&self) -> bool { - if let Self::LargeStatement(_) = self { - true - } else { - false - } - } + /// These are illegal to send to V1 peers, and illegal to send concerning relay-parents + /// which support asynchronous backing. This backwards compatibility should be + /// considered immediately deprecated and can be removed once the node software + /// is not required to support asynchronous backing anymore. + #[codec(index = 255)] + V1Compatibility(crate::v1::StatementDistributionMessage), } /// Network messages used by the approval distribution subsystem. diff --git a/node/network/statement-distribution/src/legacy_v1/mod.rs b/node/network/statement-distribution/src/legacy_v1/mod.rs new file mode 100644 index 000000000000..e07f4489dfba --- /dev/null +++ b/node/network/statement-distribution/src/legacy_v1/mod.rs @@ -0,0 +1,2118 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use parity_scale_codec::Encode; + +use polkadot_node_network_protocol::{ + self as net_protocol, + grid_topology::{RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology}, + peer_set::{IsAuthority, PeerSet, ValidationVersion}, + v1::{self as protocol_v1, StatementMetadata}, + vstaging as protocol_vstaging, IfDisconnected, PeerId, UnifiedReputationChange as Rep, + Versioned, View, +}; +use polkadot_node_primitives::{ + SignedFullStatement, Statement, StatementWithPVD, UncheckedSignedFullStatement, +}; +use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS}; + +use polkadot_node_subsystem::{ + jaeger, + messages::{CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeTxMessage}, + overseer, ActivatedLeaf, PerLeafSpan, StatementDistributionSenderTrait, +}; +use polkadot_primitives::v2::{ + AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash, + Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, SignedStatement, SigningContext, + UncheckedSignedStatement, ValidatorId, ValidatorIndex, ValidatorSignature, +}; + +use futures::{ + channel::{mpsc, oneshot}, + future::RemoteHandle, + prelude::*, +}; +use indexmap::{map::Entry as IEntry, IndexMap}; +use rand::Rng; +use sp_keystore::SyncCryptoStorePtr; +use util::runtime::RuntimeInfo; + +use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; + +use crate::error::{Error, JfyiError, JfyiErrorResult, Result}; + +/// Background task logic for requesting of large statements. +mod requester; +use requester::fetch; + +/// Background task logic for responding for large statements. +mod responder; + +use crate::{metrics::Metrics, LOG_TARGET}; + +pub use requester::RequesterMessage; +pub use responder::{respond, ResponderMessage}; + +#[cfg(test)] +mod tests; + +const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); +const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep = + Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent"); +const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep = + Rep::CostMinor("Unexpected Statement, unknown candidate"); +const COST_UNEXPECTED_STATEMENT_REMOTE: Rep = + Rep::CostMinor("Unexpected Statement, remote not allowed"); + +const COST_FETCH_FAIL: Rep = + Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed"); +const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature"); +const COST_WRONG_HASH: Rep = Rep::CostMajor("Received candidate had wrong hash"); +const COST_DUPLICATE_STATEMENT: Rep = + Rep::CostMajorRepeated("Statement sent more than once by peer"); +const COST_APPARENT_FLOOD: Rep = Rep::Malicious("Peer appears to be flooding us with statements"); + +const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid statement"); +const BENEFIT_VALID_STATEMENT_FIRST: Rep = + Rep::BenefitMajorFirst("Peer was the first to provide a valid statement"); +const BENEFIT_VALID_RESPONSE: Rep = + Rep::BenefitMajor("Peer provided a valid large statement response"); + +/// The maximum amount of candidates each validator is allowed to second at any relay-parent. +/// Short for "Validator Candidate Threshold". +/// +/// This is the amount of candidates we keep per validator at any relay-parent. +/// Typically we will only keep 1, but when a validator equivocates we will need to track 2. +const VC_THRESHOLD: usize = 2; + +/// Large statements should be rare. +const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20; + +/// Overall state of the legacy-v1 portion of the subsystem. +pub(crate) struct State { + peers: HashMap, + topology_storage: SessionBoundGridTopologyStorage, + authorities: HashMap, + active_heads: HashMap, + recent_outdated_heads: RecentOutdatedHeads, + runtime: RuntimeInfo, +} + +impl State { + /// Create a new state. + pub(crate) fn new(keystore: SyncCryptoStorePtr) -> Self { + State { + peers: HashMap::new(), + topology_storage: Default::default(), + authorities: HashMap::new(), + active_heads: HashMap::new(), + recent_outdated_heads: RecentOutdatedHeads::default(), + runtime: RuntimeInfo::new(Some(keystore)), + } + } + + /// Query whether the state contains some relay-parent. + pub(crate) fn contains_relay_parent(&self, relay_parent: &Hash) -> bool { + self.active_heads.contains_key(relay_parent) + } +} + +#[derive(Default)] +struct RecentOutdatedHeads { + buf: VecDeque, +} + +impl RecentOutdatedHeads { + fn note_outdated(&mut self, hash: Hash) { + const MAX_BUF_LEN: usize = 10; + + self.buf.push_back(hash); + + while self.buf.len() > MAX_BUF_LEN { + let _ = self.buf.pop_front(); + } + } + + fn is_recent_outdated(&self, hash: &Hash) -> bool { + self.buf.contains(hash) + } +} + +/// Tracks our impression of a single peer's view of the candidates a validator has seconded +/// for a given relay-parent. +/// +/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD` +/// via other means. +#[derive(Default)] +struct VcPerPeerTracker { + local_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>, + remote_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>, +} + +impl VcPerPeerTracker { + /// Note that the remote should now be aware that a validator has seconded a given candidate (by hash) + /// based on a message that we have sent it from our local pool. + fn note_local(&mut self, h: CandidateHash) { + if !note_hash(&mut self.local_observed, h) { + gum::warn!( + target: LOG_TARGET, + "Statement distribution is erroneously attempting to distribute more \ + than {} candidate(s) per validator index. Ignoring", + VC_THRESHOLD, + ); + } + } + + /// Note that the remote should now be aware that a validator has seconded a given candidate (by hash) + /// based on a message that it has sent us. + /// + /// Returns `true` if the peer was allowed to send us such a message, `false` otherwise. + fn note_remote(&mut self, h: CandidateHash) -> bool { + note_hash(&mut self.remote_observed, h) + } + + /// Returns `true` if the peer is allowed to send us such a message, `false` otherwise. + fn is_wanted_candidate(&self, h: &CandidateHash) -> bool { + !self.remote_observed.contains(h) && !self.remote_observed.is_full() + } +} + +fn note_hash( + observed: &mut arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>, + h: CandidateHash, +) -> bool { + if observed.contains(&h) { + return true + } + + observed.try_push(h).is_ok() +} + +/// knowledge that a peer has about goings-on in a relay parent. +#[derive(Default)] +struct PeerRelayParentKnowledge { + /// candidates that the peer is aware of because we sent statements to it. This indicates that we can + /// send other statements pertaining to that candidate. + sent_candidates: HashSet, + /// candidates that peer is aware of, because we received statements from it. + received_candidates: HashSet, + /// fingerprints of all statements a peer should be aware of: those that + /// were sent to the peer by us. + sent_statements: HashSet<(CompactStatement, ValidatorIndex)>, + /// fingerprints of all statements a peer should be aware of: those that + /// were sent to us by the peer. + received_statements: HashSet<(CompactStatement, ValidatorIndex)>, + /// How many candidates this peer is aware of for each given validator index. + seconded_counts: HashMap, + /// How many statements we've received for each candidate that we're aware of. + received_message_count: HashMap, + + /// How many large statements this peer already sent us. + /// + /// Flood protection for large statements is rather hard and as soon as we get + /// `https://github.com/paritytech/polkadot/issues/2979` implemented also no longer necessary. + /// Reason: We keep messages around until we fetched the payload, but if a node makes up + /// statements and never provides the data, we will keep it around for the slot duration. Not + /// even signature checking would help, as the sender, if a validator, can just sign arbitrary + /// invalid statements and will not face any consequences as long as it won't provide the + /// payload. + /// + /// Quick and temporary fix, only accept `MAX_LARGE_STATEMENTS_PER_SENDER` per connected node. + /// + /// Large statements should be rare, if they were not, we would run into problems anyways, as + /// we would not be able to distribute them in a timely manner. Therefore + /// `MAX_LARGE_STATEMENTS_PER_SENDER` can be set to a relatively small number. It is also not + /// per candidate hash, but in total as candidate hashes can be made up, as illustrated above. + /// + /// An attacker could still try to fill up our memory, by repeatedly disconnecting and + /// connecting again with new peer ids, but we assume that the resulting effective bandwidth + /// for such an attack would be too low. + large_statement_count: usize, + + /// We have seen a message that that is unexpected from this peer, so note this fact + /// and stop subsequent logging and peer reputation flood. + unexpected_count: usize, +} + +impl PeerRelayParentKnowledge { + /// Updates our view of the peer's knowledge with this statement's fingerprint based + /// on something that we would like to send to the peer. + /// + /// NOTE: assumes `self.can_send` returned true before this call. + /// + /// Once the knowledge has incorporated a statement, it cannot be incorporated again. + /// + /// This returns `true` if this is the first time the peer has become aware of a + /// candidate with the given hash. + fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { + debug_assert!( + self.can_send(fingerprint), + "send is only called after `can_send` returns true; qed", + ); + + let new_known = match fingerprint.0 { + CompactStatement::Seconded(ref h) => { + self.seconded_counts.entry(fingerprint.1).or_default().note_local(h.clone()); + + let was_known = self.is_known_candidate(h); + self.sent_candidates.insert(h.clone()); + !was_known + }, + CompactStatement::Valid(_) => false, + }; + + self.sent_statements.insert(fingerprint.clone()); + + new_known + } + + /// This returns `true` if the peer cannot accept this statement, without altering internal + /// state, `false` otherwise. + fn can_send(&self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { + let already_known = self.sent_statements.contains(fingerprint) || + self.received_statements.contains(fingerprint); + + if already_known { + return false + } + + match fingerprint.0 { + CompactStatement::Valid(ref h) => { + // The peer can only accept Valid statements for which it is aware + // of the corresponding candidate. + self.is_known_candidate(h) + }, + CompactStatement::Seconded(_) => true, + } + } + + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on + /// a message we are receiving from the peer. + /// + /// Provide the maximum message count that we can receive per candidate. In practice we should + /// not receive more statements for any one candidate than there are members in the group assigned + /// to that para, but this maximum needs to be lenient to account for equivocations that may be + /// cross-group. As such, a maximum of 2 * `n_validators` is recommended. + /// + /// This returns an error if the peer should not have sent us this message according to protocol + /// rules for flood protection. + /// + /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new + /// candidate, we are then cleared to send the peer further statements about that candidate. + /// + /// This returns `Ok(true)` if this is the first time the peer has become aware of a + /// candidate with given hash. + fn receive( + &mut self, + fingerprint: &(CompactStatement, ValidatorIndex), + max_message_count: usize, + ) -> std::result::Result { + // We don't check `sent_statements` because a statement could be in-flight from both + // sides at the same time. + if self.received_statements.contains(fingerprint) { + return Err(COST_DUPLICATE_STATEMENT) + } + + let (candidate_hash, fresh) = match fingerprint.0 { + CompactStatement::Seconded(ref h) => { + let allowed_remote = self + .seconded_counts + .entry(fingerprint.1) + .or_insert_with(Default::default) + .note_remote(h.clone()); + + if !allowed_remote { + return Err(COST_UNEXPECTED_STATEMENT_REMOTE) + } + + (h, !self.is_known_candidate(h)) + }, + CompactStatement::Valid(ref h) => { + if !self.is_known_candidate(h) { + return Err(COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE) + } + + (h, false) + }, + }; + + { + let received_per_candidate = + self.received_message_count.entry(*candidate_hash).or_insert(0); + + if *received_per_candidate >= max_message_count { + return Err(COST_APPARENT_FLOOD) + } + + *received_per_candidate += 1; + } + + self.received_statements.insert(fingerprint.clone()); + self.received_candidates.insert(candidate_hash.clone()); + Ok(fresh) + } + + /// Note a received large statement metadata. + fn receive_large_statement(&mut self) -> std::result::Result<(), Rep> { + if self.large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER { + return Err(COST_APPARENT_FLOOD) + } + self.large_statement_count += 1; + Ok(()) + } + + /// This method does the same checks as `receive` without modifying the internal state. + /// Returns an error if the peer should not have sent us this message according to protocol + /// rules for flood protection. + fn check_can_receive( + &self, + fingerprint: &(CompactStatement, ValidatorIndex), + max_message_count: usize, + ) -> std::result::Result<(), Rep> { + // We don't check `sent_statements` because a statement could be in-flight from both + // sides at the same time. + if self.received_statements.contains(fingerprint) { + return Err(COST_DUPLICATE_STATEMENT) + } + + let candidate_hash = match fingerprint.0 { + CompactStatement::Seconded(ref h) => { + let allowed_remote = self + .seconded_counts + .get(&fingerprint.1) + .map_or(true, |r| r.is_wanted_candidate(h)); + + if !allowed_remote { + return Err(COST_UNEXPECTED_STATEMENT_REMOTE) + } + + h + }, + CompactStatement::Valid(ref h) => { + if !self.is_known_candidate(&h) { + return Err(COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE) + } + + h + }, + }; + + let received_per_candidate = self.received_message_count.get(candidate_hash).unwrap_or(&0); + + if *received_per_candidate >= max_message_count { + Err(COST_APPARENT_FLOOD) + } else { + Ok(()) + } + } + + /// Check for candidates that the peer is aware of. This indicates that we can + /// send other statements pertaining to that candidate. + fn is_known_candidate(&self, candidate: &CandidateHash) -> bool { + self.sent_candidates.contains(candidate) || self.received_candidates.contains(candidate) + } +} + +struct PeerData { + view: View, + protocol_version: ValidationVersion, + view_knowledge: HashMap, + /// Peer might be known as authority with the given ids. + maybe_authority: Option>, +} + +impl PeerData { + /// Updates our view of the peer's knowledge with this statement's fingerprint based + /// on something that we would like to send to the peer. + /// + /// NOTE: assumes `self.can_send` returned true before this call. + /// + /// Once the knowledge has incorporated a statement, it cannot be incorporated again. + /// + /// This returns `true` if this is the first time the peer has become aware of a + /// candidate with the given hash. + fn send( + &mut self, + relay_parent: &Hash, + fingerprint: &(CompactStatement, ValidatorIndex), + ) -> bool { + debug_assert!( + self.can_send(relay_parent, fingerprint), + "send is only called after `can_send` returns true; qed", + ); + self.view_knowledge + .get_mut(relay_parent) + .expect("send is only called after `can_send` returns true; qed") + .send(fingerprint) + } + + /// This returns `None` if the peer cannot accept this statement, without altering internal + /// state. + fn can_send( + &self, + relay_parent: &Hash, + fingerprint: &(CompactStatement, ValidatorIndex), + ) -> bool { + self.view_knowledge.get(relay_parent).map_or(false, |k| k.can_send(fingerprint)) + } + + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on + /// a message we are receiving from the peer. + /// + /// Provide the maximum message count that we can receive per candidate. In practice we should + /// not receive more statements for any one candidate than there are members in the group assigned + /// to that para, but this maximum needs to be lenient to account for equivocations that may be + /// cross-group. As such, a maximum of 2 * `n_validators` is recommended. + /// + /// This returns an error if the peer should not have sent us this message according to protocol + /// rules for flood protection. + /// + /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new + /// candidate, we are then cleared to send the peer further statements about that candidate. + /// + /// This returns `Ok(true)` if this is the first time the peer has become aware of a + /// candidate with given hash. + fn receive( + &mut self, + relay_parent: &Hash, + fingerprint: &(CompactStatement, ValidatorIndex), + max_message_count: usize, + ) -> std::result::Result { + self.view_knowledge + .get_mut(relay_parent) + .ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)? + .receive(fingerprint, max_message_count) + } + + /// This method does the same checks as `receive` without modifying the internal state. + /// Returns an error if the peer should not have sent us this message according to protocol + /// rules for flood protection. + fn check_can_receive( + &self, + relay_parent: &Hash, + fingerprint: &(CompactStatement, ValidatorIndex), + max_message_count: usize, + ) -> std::result::Result<(), Rep> { + self.view_knowledge + .get(relay_parent) + .ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)? + .check_can_receive(fingerprint, max_message_count) + } + + /// Receive a notice about out of view statement and returns the value of the old flag + fn receive_unexpected(&mut self, relay_parent: &Hash) -> usize { + self.view_knowledge + .get_mut(relay_parent) + .map_or(0_usize, |relay_parent_peer_knowledge| { + let old = relay_parent_peer_knowledge.unexpected_count; + relay_parent_peer_knowledge.unexpected_count += 1_usize; + old + }) + } + + /// Basic flood protection for large statements. + fn receive_large_statement(&mut self, relay_parent: &Hash) -> std::result::Result<(), Rep> { + self.view_knowledge + .get_mut(relay_parent) + .ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)? + .receive_large_statement() + } +} + +// A statement stored while a relay chain head is active. +#[derive(Debug, Copy, Clone)] +struct StoredStatement<'a> { + comparator: &'a StoredStatementComparator, + statement: &'a SignedFullStatement, +} + +// A value used for comparison of stored statements to each other. +// +// The compact version of the statement, the validator index, and the signature of the validator +// is enough to differentiate between all types of equivocations, as long as the signature is +// actually checked to be valid. The same statement with 2 signatures and 2 statements with +// different (or same) signatures wll all be correctly judged to be unequal with this comparator. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +struct StoredStatementComparator { + compact: CompactStatement, + validator_index: ValidatorIndex, + signature: ValidatorSignature, +} + +impl<'a> From<(&'a StoredStatementComparator, &'a SignedFullStatement)> for StoredStatement<'a> { + fn from( + (comparator, statement): (&'a StoredStatementComparator, &'a SignedFullStatement), + ) -> Self { + Self { comparator, statement } + } +} + +impl<'a> StoredStatement<'a> { + fn compact(&self) -> &'a CompactStatement { + &self.comparator.compact + } + + fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) { + (self.comparator.compact.clone(), self.statement.validator_index()) + } +} + +#[derive(Debug)] +enum NotedStatement<'a> { + NotUseful, + Fresh(StoredStatement<'a>), + UsefulButKnown, +} + +/// Large statement fetching status. +enum LargeStatementStatus { + /// We are currently fetching the statement data from a remote peer. We keep a list of other nodes + /// claiming to have that data and will fallback on them. + Fetching(FetchingInfo), + /// Statement data is fetched or we got it locally via `StatementDistributionMessage::Share`. + FetchedOrShared(CommittedCandidateReceipt), +} + +/// Info about a fetch in progress. +struct FetchingInfo { + /// All peers that send us a `LargeStatement` or a `Valid` statement for the given + /// `CandidateHash`, together with their originally sent messages. + /// + /// We use an `IndexMap` here to preserve the ordering of peers sending us messages. This is + /// desirable because we reward first sending peers with reputation. + available_peers: IndexMap>, + /// Peers left to try in case the background task needs it. + peers_to_try: Vec, + /// Sender for sending fresh peers to the fetching task in case of failure. + peer_sender: Option>>, + /// Task taking care of the request. + /// + /// Will be killed once dropped. + #[allow(dead_code)] + fetching_task: RemoteHandle<()>, +} + +#[derive(Debug, PartialEq, Eq)] +enum DeniedStatement { + NotUseful, + UsefulButKnown, +} + +pub(crate) struct ActiveHeadData { + /// All candidates we are aware of for this head, keyed by hash. + candidates: HashSet, + /// Persisted validation data cache. + cached_validation_data: HashMap, + /// Stored statements for circulation to peers. + /// + /// These are iterable in insertion order, and `Seconded` statements are always + /// accepted before dependent statements. + statements: IndexMap, + /// Large statements we are waiting for with associated meta data. + waiting_large_statements: HashMap, + /// The parachain validators at the head's child session index. + validators: Vec, + /// The current session index of this fork. + session_index: sp_staking::SessionIndex, + /// How many `Seconded` statements we've seen per validator. + seconded_counts: HashMap, + /// A Jaeger span for this head, so we can attach data to it. + span: PerLeafSpan, +} + +impl ActiveHeadData { + fn new( + validators: Vec, + session_index: sp_staking::SessionIndex, + span: PerLeafSpan, + ) -> Self { + ActiveHeadData { + candidates: Default::default(), + cached_validation_data: Default::default(), + statements: Default::default(), + waiting_large_statements: Default::default(), + validators, + session_index, + seconded_counts: Default::default(), + span, + } + } + + /// Fetches the `PersistedValidationData` from the runtime, assuming + /// that the core is free. The relay parent must match that of the active + /// head. + async fn fetch_persisted_validation_data( + &mut self, + sender: &mut Sender, + relay_parent: Hash, + para_id: ParaId, + ) -> Result> + where + Sender: StatementDistributionSenderTrait, + { + if let Entry::Vacant(entry) = self.cached_validation_data.entry(para_id) { + let persisted_validation_data = + polkadot_node_subsystem_util::request_persisted_validation_data( + relay_parent, + para_id, + OccupiedCoreAssumption::Free, + sender, + ) + .await + .await + .map_err(Error::RuntimeApiUnavailable)? + .map_err(|err| Error::FetchPersistedValidationData(para_id, err))?; + + match persisted_validation_data { + Some(pvd) => entry.insert(pvd), + None => return Ok(None), + }; + } + + Ok(self.cached_validation_data.get(¶_id)) + } + + /// Note the given statement. + /// + /// If it was not already known and can be accepted, returns `NotedStatement::Fresh`, + /// with a handle to the statement. + /// + /// If it can be accepted, but we already know it, returns `NotedStatement::UsefulButKnown`. + /// + /// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements + /// per validator. These will be the first ones we see. The statement is assumed + /// to have been checked, including that the validator index is not out-of-bounds and + /// the signature is valid. + /// + /// Any other statements or those that reference a candidate we are not aware of cannot be accepted + /// and will return `NotedStatement::NotUseful`. + fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement { + let validator_index = statement.validator_index(); + let comparator = StoredStatementComparator { + compact: statement.payload().to_compact(), + validator_index, + signature: statement.signature().clone(), + }; + + match comparator.compact { + CompactStatement::Seconded(h) => { + let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); + if *seconded_so_far >= VC_THRESHOLD { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + ?statement, + "Extra statement is ignored" + ); + return NotedStatement::NotUseful + } + + self.candidates.insert(h); + if let Some(old) = self.statements.insert(comparator.clone(), statement) { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + statement = ?old, + "Known statement" + ); + NotedStatement::UsefulButKnown + } else { + *seconded_so_far += 1; + + gum::trace!( + target: LOG_TARGET, + ?validator_index, + statement = ?self.statements.last().expect("Just inserted").1, + "Noted new statement" + ); + // This will always return `Some` because it was just inserted. + let key_value = self + .statements + .get_key_value(&comparator) + .expect("Statement was just inserted; qed"); + + NotedStatement::Fresh(key_value.into()) + } + }, + CompactStatement::Valid(h) => { + if !self.candidates.contains(&h) { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + ?statement, + "Statement for unknown candidate" + ); + return NotedStatement::NotUseful + } + + if let Some(old) = self.statements.insert(comparator.clone(), statement) { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + statement = ?old, + "Known statement" + ); + NotedStatement::UsefulButKnown + } else { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + statement = ?self.statements.last().expect("Just inserted").1, + "Noted new statement" + ); + // This will always return `Some` because it was just inserted. + NotedStatement::Fresh( + self.statements + .get_key_value(&comparator) + .expect("Statement was just inserted; qed") + .into(), + ) + } + }, + } + } + + /// Returns an error if the statement is already known or not useful + /// without modifying the internal state. + fn check_useful_or_unknown( + &self, + statement: &UncheckedSignedStatement, + ) -> std::result::Result<(), DeniedStatement> { + let validator_index = statement.unchecked_validator_index(); + let compact = statement.unchecked_payload(); + let comparator = StoredStatementComparator { + compact: compact.clone(), + validator_index, + signature: statement.unchecked_signature().clone(), + }; + + match compact { + CompactStatement::Seconded(_) => { + let seconded_so_far = self.seconded_counts.get(&validator_index).unwrap_or(&0); + if *seconded_so_far >= VC_THRESHOLD { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + ?statement, + "Extra statement is ignored", + ); + return Err(DeniedStatement::NotUseful) + } + + if self.statements.contains_key(&comparator) { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + ?statement, + "Known statement", + ); + return Err(DeniedStatement::UsefulButKnown) + } + }, + CompactStatement::Valid(h) => { + if !self.candidates.contains(&h) { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + ?statement, + "Statement for unknown candidate", + ); + return Err(DeniedStatement::NotUseful) + } + + if self.statements.contains_key(&comparator) { + gum::trace!( + target: LOG_TARGET, + ?validator_index, + ?statement, + "Known statement", + ); + return Err(DeniedStatement::UsefulButKnown) + } + }, + } + Ok(()) + } + + /// Get an iterator over all statements for the active head. Seconded statements come first. + fn statements(&self) -> impl Iterator> + '_ { + self.statements.iter().map(Into::into) + } + + /// Get an iterator over all statements for the active head that are for a particular candidate. + fn statements_about( + &self, + candidate_hash: CandidateHash, + ) -> impl Iterator> + '_ { + self.statements() + .filter(move |s| s.compact().candidate_hash() == &candidate_hash) + } +} + +/// Check a statement signature under this parent hash. +fn check_statement_signature( + head: &ActiveHeadData, + relay_parent: Hash, + statement: UncheckedSignedStatement, +) -> std::result::Result { + let signing_context = + SigningContext { session_index: head.session_index, parent_hash: relay_parent }; + + head.validators + .get(statement.unchecked_validator_index().0 as usize) + .ok_or_else(|| statement.clone()) + .and_then(|v| statement.try_into_checked(&signing_context, v)) +} + +/// Places the statement in storage if it is new, and then +/// circulates the statement to all peers who have not seen it yet, and +/// sends all statements dependent on that statement to peers who could previously not receive +/// them but now can. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn circulate_statement_and_dependents( + topology_store: &SessionBoundGridTopologyStorage, + peers: &mut HashMap, + active_heads: &mut HashMap, + ctx: &mut Context, + relay_parent: Hash, + statement: SignedFullStatement, + priority_peers: Vec, + metrics: &Metrics, + rng: &mut impl rand::Rng, +) { + let active_head = match active_heads.get_mut(&relay_parent) { + Some(res) => res, + None => return, + }; + + let _span = active_head + .span + .child("circulate-statement") + .with_candidate(statement.payload().candidate_hash()) + .with_stage(jaeger::Stage::StatementDistribution); + + let topology = topology_store.get_topology_or_fallback(active_head.session_index); + // First circulate the statement directly to all peers needing it. + // The borrow of `active_head` needs to encompass only this (Rust) statement. + let outputs: Option<(CandidateHash, Vec)> = { + match active_head.note_statement(statement) { + NotedStatement::Fresh(stored) => Some(( + *stored.compact().candidate_hash(), + circulate_statement( + RequiredRouting::GridXY, + topology, + peers, + ctx, + relay_parent, + stored, + priority_peers, + metrics, + rng, + ) + .await, + )), + _ => None, + } + }; + + let _span = _span.child("send-to-peers"); + // Now send dependent statements to all peers needing them, if any. + if let Some((candidate_hash, peers_needing_dependents)) = outputs { + for peer in peers_needing_dependents { + if let Some(peer_data) = peers.get_mut(&peer) { + let _span_loop = _span.child("to-peer").with_peer_id(&peer); + // defensive: the peer data should always be some because the iterator + // of peers is derived from the set of peers. + send_statements_about( + peer, + peer_data, + ctx, + relay_parent, + candidate_hash, + &*active_head, + metrics, + ) + .await; + } + } + } +} + +/// Create a network message from a given statement. +fn v1_statement_message( + relay_parent: Hash, + statement: SignedFullStatement, + metrics: &Metrics, +) -> protocol_v1::StatementDistributionMessage { + let (is_large, size) = is_statement_large(&statement); + if let Some(size) = size { + metrics.on_created_message(size); + } + + if is_large { + protocol_v1::StatementDistributionMessage::LargeStatement(StatementMetadata { + relay_parent, + candidate_hash: statement.payload().candidate_hash(), + signed_by: statement.validator_index(), + signature: statement.signature().clone(), + }) + } else { + protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement.into()) + } +} + +/// Check whether a statement should be treated as large statement. +/// +/// Also report size of statement - if it is a `Seconded` statement, otherwise `None`. +fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option) { + match &statement.payload() { + Statement::Seconded(committed) => { + let size = statement.as_unchecked().encoded_size(); + // Runtime upgrades will always be large and even if not - no harm done. + if committed.commitments.new_validation_code.is_some() { + return (true, Some(size)) + } + + // Half max size seems to be a good threshold to start not using notifications: + let threshold = + PeerSet::Validation.get_max_notification_size(IsAuthority::Yes) as usize / 2; + + (size >= threshold, Some(size)) + }, + Statement::Valid(_) => (false, None), + } +} + +/// Circulates a statement to all peers who have not seen it yet, and returns +/// an iterator over peers who need to have dependent statements sent. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn circulate_statement<'a, Context>( + required_routing: RequiredRouting, + topology: &SessionGridTopology, + peers: &mut HashMap, + ctx: &mut Context, + relay_parent: Hash, + stored: StoredStatement<'a>, + mut priority_peers: Vec, + metrics: &Metrics, + rng: &mut impl rand::Rng, +) -> Vec { + let fingerprint = stored.fingerprint(); + + let mut peers_to_send: Vec = peers + .iter() + .filter_map(|(peer, data)| { + if data.can_send(&relay_parent, &fingerprint) { + Some(peer.clone()) + } else { + None + } + }) + .collect(); + + let good_peers: HashSet<&PeerId> = peers_to_send.iter().collect(); + // Only take priority peers we can send data to: + priority_peers.retain(|p| good_peers.contains(p)); + + // Avoid duplicates: + let priority_set: HashSet<&PeerId> = priority_peers.iter().collect(); + peers_to_send.retain(|p| !priority_set.contains(p)); + + util::choose_random_subset_with_rng( + |e| topology.route_to_peer(required_routing, e), + &mut peers_to_send, + rng, + MIN_GOSSIP_PEERS, + ); + // We don't want to use less peers, than we would without any priority peers: + let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS); + // Make set full: + let needed_peers = min_size as i64 - priority_peers.len() as i64; + if needed_peers > 0 { + peers_to_send.truncate(needed_peers as usize); + // Order important here - priority peers are placed first, so will be sent first. + // This gives backers a chance to be among the first in requesting any large statement + // data. + priority_peers.append(&mut peers_to_send); + } + peers_to_send = priority_peers; + // We must not have duplicates: + debug_assert!( + peers_to_send.len() == peers_to_send.clone().into_iter().collect::>().len(), + "We filter out duplicates above. qed.", + ); + + let (v1_peers_to_send, vstaging_peers_to_send) = peers_to_send + .into_iter() + .filter_map(|peer_id| { + let peer_data = + peers.get_mut(&peer_id).expect("a subset is taken above, so it exists; qed"); + + let new = peer_data.send(&relay_parent, &fingerprint); + + Some((peer_id, new, peer_data.protocol_version)) + }) + .partition::, _>(|(_, _, version)| match version { + ValidationVersion::V1 => true, + ValidationVersion::VStaging => false, + }); // partition is handy here but not if we add more protocol versions + + let payload = v1_statement_message(relay_parent, stored.statement.clone(), metrics); + + // Send all these peers the initial statement. + if !v1_peers_to_send.is_empty() { + gum::trace!( + target: LOG_TARGET, + ?v1_peers_to_send, + ?relay_parent, + statement = ?stored.statement, + "Sending statement to v1 peers", + ); + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( + v1_peers_to_send.iter().map(|(p, _, _)| p.clone()).collect(), + compatible_v1_message(ValidationVersion::V1, payload.clone()).into(), + )) + .await; + } + if !vstaging_peers_to_send.is_empty() { + gum::trace!( + target: LOG_TARGET, + ?vstaging_peers_to_send, + ?relay_parent, + statement = ?stored.statement, + "Sending statement to vstaging peers", + ); + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( + vstaging_peers_to_send.iter().map(|(p, _, _)| p.clone()).collect(), + compatible_v1_message(ValidationVersion::VStaging, payload.clone()).into(), + )) + .await; + } + + v1_peers_to_send + .into_iter() + .chain(vstaging_peers_to_send) + .filter_map(|(peer, needs_dependent, _)| if needs_dependent { Some(peer) } else { None }) + .collect() +} + +/// Send all statements about a given candidate hash to a peer. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn send_statements_about( + peer: PeerId, + peer_data: &mut PeerData, + ctx: &mut Context, + relay_parent: Hash, + candidate_hash: CandidateHash, + active_head: &ActiveHeadData, + metrics: &Metrics, +) { + for statement in active_head.statements_about(candidate_hash) { + let fingerprint = statement.fingerprint(); + if !peer_data.can_send(&relay_parent, &fingerprint) { + continue + } + peer_data.send(&relay_parent, &fingerprint); + let payload = v1_statement_message(relay_parent, statement.statement.clone(), metrics); + + gum::trace!( + target: LOG_TARGET, + ?peer, + ?relay_parent, + ?candidate_hash, + statement = ?statement.statement, + "Sending statement", + ); + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( + vec![peer.clone()], + compatible_v1_message(peer_data.protocol_version, payload).into(), + )) + .await; + + metrics.on_statement_distributed(); + } +} + +/// Send all statements at a given relay-parent to a peer. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn send_statements( + peer: PeerId, + peer_data: &mut PeerData, + ctx: &mut Context, + relay_parent: Hash, + active_head: &ActiveHeadData, + metrics: &Metrics, +) { + for statement in active_head.statements() { + let fingerprint = statement.fingerprint(); + if !peer_data.can_send(&relay_parent, &fingerprint) { + continue + } + peer_data.send(&relay_parent, &fingerprint); + let payload = v1_statement_message(relay_parent, statement.statement.clone(), metrics); + + gum::trace!( + target: LOG_TARGET, + ?peer, + ?relay_parent, + statement = ?statement.statement, + "Sending statement" + ); + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( + vec![peer.clone()], + compatible_v1_message(peer_data.protocol_version, payload).into(), + )) + .await; + + metrics.on_statement_distributed(); + } +} + +async fn report_peer( + sender: &mut impl overseer::StatementDistributionSenderTrait, + peer: PeerId, + rep: Rep, +) { + sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await +} + +/// If message contains a statement, then retrieve it, otherwise fork task to fetch it. +/// +/// This function will also return `None` if the message did not pass some basic checks, in that +/// case no statement will be requested, on the flipside you get `ActiveHeadData` in addition to +/// your statement. +/// +/// If the message was large, but the result has been fetched already that one is returned. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn retrieve_statement_from_message<'a, Context>( + peer: PeerId, + peer_version: ValidationVersion, + message: protocol_v1::StatementDistributionMessage, + active_head: &'a mut ActiveHeadData, + ctx: &mut Context, + req_sender: &mpsc::Sender, + metrics: &Metrics, +) -> Option { + let fingerprint = message.get_fingerprint(); + let candidate_hash = *fingerprint.0.candidate_hash(); + + // Immediately return any Seconded statement: + let message = if let protocol_v1::StatementDistributionMessage::Statement(h, s) = message { + if let Statement::Seconded(_) = s.unchecked_payload() { + return Some(s) + } + protocol_v1::StatementDistributionMessage::Statement(h, s) + } else { + message + }; + + match active_head.waiting_large_statements.entry(candidate_hash) { + Entry::Occupied(mut occupied) => { + match occupied.get_mut() { + LargeStatementStatus::Fetching(info) => { + let is_large_statement = message.is_large_statement(); + + let is_new_peer = match info.available_peers.entry(peer) { + IEntry::Occupied(mut occupied) => { + occupied.get_mut().push(compatible_v1_message(peer_version, message)); + false + }, + IEntry::Vacant(vacant) => { + vacant.insert(vec![compatible_v1_message(peer_version, message)]); + true + }, + }; + + if is_new_peer & is_large_statement { + info.peers_to_try.push(peer); + // Answer any pending request for more peers: + if let Some(sender) = info.peer_sender.take() { + let to_send = std::mem::take(&mut info.peers_to_try); + if let Err(peers) = sender.send(to_send) { + // Requester no longer interested for now, might want them + // later: + info.peers_to_try = peers; + } + } + } + }, + LargeStatementStatus::FetchedOrShared(committed) => { + match message { + protocol_v1::StatementDistributionMessage::Statement(_, s) => { + // We can now immediately return any statements (should only be + // `Statement::Valid` ones, but we don't care at this point.) + return Some(s) + }, + protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => + return Some(UncheckedSignedFullStatement::new( + Statement::Seconded(committed.clone()), + metadata.signed_by, + metadata.signature.clone(), + )), + } + }, + } + }, + Entry::Vacant(vacant) => { + match message { + protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => { + if let Some(new_status) = launch_request( + metadata, + peer, + peer_version, + req_sender.clone(), + ctx, + metrics, + ) + .await + { + vacant.insert(new_status); + } + }, + protocol_v1::StatementDistributionMessage::Statement(_, s) => { + // No fetch in progress, safe to return any statement immediately (we don't bother + // about normal network jitter which might cause `Valid` statements to arrive early + // for now.). + return Some(s) + }, + } + }, + } + None +} + +/// Launch request for a large statement and get tracking status. +/// +/// Returns `None` if spawning task failed. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn launch_request( + meta: StatementMetadata, + peer: PeerId, + peer_version: ValidationVersion, + req_sender: mpsc::Sender, + ctx: &mut Context, + metrics: &Metrics, +) -> Option { + let (task, handle) = + fetch(meta.relay_parent, meta.candidate_hash, vec![peer], req_sender, metrics.clone()) + .remote_handle(); + + let result = ctx.spawn("large-statement-fetcher", task.boxed()); + if let Err(err) = result { + gum::error!(target: LOG_TARGET, ?err, "Spawning task failed."); + return None + } + let available_peers = { + let mut m = IndexMap::new(); + m.insert( + peer, + vec![compatible_v1_message( + peer_version, + protocol_v1::StatementDistributionMessage::LargeStatement(meta), + )], + ); + m + }; + Some(LargeStatementStatus::Fetching(FetchingInfo { + available_peers, + peers_to_try: Vec::new(), + peer_sender: None, + fetching_task: handle, + })) +} + +/// Handle incoming message and circulate it to peers, if we did not know it already. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn handle_incoming_message_and_circulate<'a, Context, R>( + peer: PeerId, + topology_storage: &SessionBoundGridTopologyStorage, + peers: &mut HashMap, + active_heads: &'a mut HashMap, + recent_outdated_heads: &RecentOutdatedHeads, + ctx: &mut Context, + message: net_protocol::StatementDistributionMessage, + req_sender: &mpsc::Sender, + metrics: &Metrics, + runtime: &mut RuntimeInfo, + rng: &mut R, +) where + R: rand::Rng, +{ + let handled_incoming = match peers.get_mut(&peer) { + Some(data) => + handle_incoming_message( + peer, + data, + active_heads, + recent_outdated_heads, + ctx, + message, + req_sender, + metrics, + ) + .await, + None => None, + }; + + // if we got a fresh message, we need to circulate it to all peers. + if let Some((relay_parent, statement)) = handled_incoming { + // we can ignore the set of peers who this function returns as now expecting + // dependent statements. + // + // we have the invariant in this subsystem that we never store a `Valid` or `Invalid` + // statement before a `Seconded` statement. `Seconded` statements are the only ones + // that require dependents. Thus, if this is a `Seconded` statement for a candidate we + // were not aware of before, we cannot have any dependent statements from the candidate. + let _ = metrics.time_network_bridge_update("circulate_statement"); + + let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await; + let topology = match session_index { + Ok(session_index) => topology_storage.get_topology_or_fallback(session_index), + Err(e) => { + gum::debug!( + target: LOG_TARGET, + %relay_parent, + "cannot get session index for the specific relay parent: {:?}", + e + ); + + topology_storage.get_current_topology() + }, + }; + let required_routing = + topology.required_routing_by_index(statement.statement.validator_index(), false); + + let _ = circulate_statement( + required_routing, + topology, + peers, + ctx, + relay_parent, + statement, + Vec::new(), + metrics, + rng, + ) + .await; + } +} + +// Handle a statement. Returns a reference to a newly-stored statement +// if we were not already aware of it, along with the corresponding relay-parent. +// +// This function checks the signature and ensures the statement is compatible with our +// view. It also notifies candidate backing if the statement was previously unknown. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn handle_incoming_message<'a, Context>( + peer: PeerId, + peer_data: &mut PeerData, + active_heads: &'a mut HashMap, + recent_outdated_heads: &RecentOutdatedHeads, + ctx: &mut Context, + message: net_protocol::StatementDistributionMessage, + req_sender: &mpsc::Sender, + metrics: &Metrics, +) -> Option<(Hash, StoredStatement<'a>)> { + let _ = metrics.time_network_bridge_update("handle_incoming_message"); + + let message = match message { + Versioned::V1(m) => m, + Versioned::VStaging(protocol_vstaging::StatementDistributionMessage::V1Compatibility( + m, + )) => m, + }; + + let relay_parent = message.get_relay_parent(); + + let active_head = match active_heads.get_mut(&relay_parent) { + Some(h) => h, + None => { + gum::debug!( + target: LOG_TARGET, + %relay_parent, + "our view out-of-sync with active heads; head not found", + ); + + if !recent_outdated_heads.is_recent_outdated(&relay_parent) { + report_peer(ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await; + } + + return None + }, + }; + + if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message { + if let Err(rep) = peer_data.receive_large_statement(&relay_parent) { + gum::debug!(target: LOG_TARGET, ?peer, ?message, ?rep, "Unexpected large statement.",); + report_peer(ctx.sender(), peer, rep).await; + return None + } + } + + let fingerprint = message.get_fingerprint(); + let candidate_hash = fingerprint.0.candidate_hash().clone(); + let handle_incoming_span = active_head + .span + .child("handle-incoming") + .with_candidate(candidate_hash) + .with_peer_id(&peer); + + let max_message_count = active_head.validators.len() * 2; + + // perform only basic checks before verifying the signature + // as it's more computationally heavy + if let Err(rep) = peer_data.check_can_receive(&relay_parent, &fingerprint, max_message_count) { + // This situation can happen when a peer's Seconded message was lost + // but we have received the Valid statement. + // So we check it once and then ignore repeated violation to avoid + // reputation change flood. + let unexpected_count = peer_data.receive_unexpected(&relay_parent); + + gum::debug!( + target: LOG_TARGET, + ?relay_parent, + ?peer, + ?message, + ?rep, + ?unexpected_count, + "Error inserting received statement" + ); + + match rep { + // This happens when a Valid statement has been received but there is no corresponding Seconded + COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE => { + metrics.on_unexpected_statement_valid(); + // Report peer merely if this is not a duplicate out-of-view statement that + // was caused by a missing Seconded statement from this peer + if unexpected_count == 0_usize { + report_peer(ctx.sender(), peer, rep).await; + } + }, + // This happens when we have an unexpected remote peer that announced Seconded + COST_UNEXPECTED_STATEMENT_REMOTE => { + metrics.on_unexpected_statement_seconded(); + report_peer(ctx.sender(), peer, rep).await; + }, + _ => { + report_peer(ctx.sender(), peer, rep).await; + }, + } + + return None + } + + let checked_compact = { + let (compact, validator_index) = message.get_fingerprint(); + let signature = message.get_signature(); + + let unchecked_compact = UncheckedSignedStatement::new(compact, validator_index, signature); + + match active_head.check_useful_or_unknown(&unchecked_compact) { + Ok(()) => {}, + Err(DeniedStatement::NotUseful) => return None, + Err(DeniedStatement::UsefulButKnown) => { + // Note a received statement in the peer data + peer_data + .receive(&relay_parent, &fingerprint, max_message_count) + .expect("checked in `check_can_receive` above; qed"); + report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await; + + return None + }, + } + + // check the signature on the statement. + match check_statement_signature(&active_head, relay_parent, unchecked_compact) { + Err(statement) => { + gum::debug!(target: LOG_TARGET, ?peer, ?statement, "Invalid statement signature"); + report_peer(ctx.sender(), peer, COST_INVALID_SIGNATURE).await; + return None + }, + Ok(statement) => statement, + } + }; + + // Fetch from the network only after signature and usefulness checks are completed. + let is_large_statement = message.is_large_statement(); + let statement = retrieve_statement_from_message( + peer, + peer_data.protocol_version, + message, + active_head, + ctx, + req_sender, + metrics, + ) + .await?; + + let payload = statement.unchecked_into_payload(); + + // Upgrade the `Signed` wrapper from the compact payload to the full payload. + // This fails if the payload doesn't encode correctly. + let statement: SignedFullStatement = match checked_compact.convert_to_superpayload(payload) { + Err((compact, _)) => { + gum::debug!( + target: LOG_TARGET, + ?peer, + ?compact, + is_large_statement, + "Full statement had bad payload." + ); + report_peer(ctx.sender(), peer, COST_WRONG_HASH).await; + return None + }, + Ok(statement) => statement, + }; + + // Ensure the statement is stored in the peer data. + // + // Note that if the peer is sending us something that is not within their view, + // it will not be kept within their log. + match peer_data.receive(&relay_parent, &fingerprint, max_message_count) { + Err(_) => { + unreachable!("checked in `check_can_receive` above; qed"); + }, + Ok(true) => { + gum::trace!(target: LOG_TARGET, ?peer, ?statement, "Statement accepted"); + // Send the peer all statements concerning the candidate that we have, + // since it appears to have just learned about the candidate. + send_statements_about( + peer.clone(), + peer_data, + ctx, + relay_parent, + candidate_hash, + &*active_head, + metrics, + ) + .await; + }, + Ok(false) => {}, + } + + // For `Seconded` statements `None` or `Err` means we couldn't fetch the PVD, which + // means the statement shouldn't be accepted. + // + // In case of `Valid` we should have it cached prior, therefore this performs + // no Runtime API calls and always returns `Ok(Some(_))`. + let pvd = if let Statement::Seconded(receipt) = statement.payload() { + let para_id = receipt.descriptor.para_id; + // Either call the Runtime API or check that validation data is cached. + let result = active_head + .fetch_persisted_validation_data(ctx.sender(), relay_parent, para_id) + .await; + + match result { + Ok(Some(pvd)) => Some(pvd.clone()), + Ok(None) | Err(_) => return None, + } + } else { + None + }; + + // Extend the payload with persisted validation data required by the backing + // subsystem. + // + // Do it in advance before noting the statement because we don't want to borrow active + // head mutable and use the cache. + let statement_with_pvd = statement + .clone() + .convert_to_superpayload_with(move |statement| match statement { + Statement::Seconded(receipt) => { + let persisted_validation_data = pvd + .expect("PVD is ensured to be `Some` for all `Seconded` messages above; qed"); + StatementWithPVD::Seconded(receipt, persisted_validation_data) + }, + Statement::Valid(candidate_hash) => StatementWithPVD::Valid(candidate_hash), + }) + .expect("payload was checked with conversion from compact; qed"); + + // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation + // or unpinned to a seconded candidate. So it is safe to place it into the storage. + match active_head.note_statement(statement) { + NotedStatement::NotUseful | NotedStatement::UsefulButKnown => { + unreachable!("checked in `is_useful_or_unknown` above; qed"); + }, + NotedStatement::Fresh(statement) => { + report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await; + + let mut _span = handle_incoming_span.child("notify-backing"); + + // When we receive a new message from a peer, we forward it to the + // candidate backing subsystem. + ctx.send_message(CandidateBackingMessage::Statement(relay_parent, statement_with_pvd)) + .await; + + Some((relay_parent, statement)) + }, + } +} + +/// Update a peer's view. Sends all newly unlocked statements based on the previous +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +async fn update_peer_view_and_maybe_send_unlocked( + peer: PeerId, + topology: &SessionGridTopology, + peer_data: &mut PeerData, + ctx: &mut Context, + active_heads: &HashMap, + new_view: View, + metrics: &Metrics, + rng: &mut R, +) where + R: rand::Rng, +{ + let old_view = std::mem::replace(&mut peer_data.view, new_view); + + // Remove entries for all relay-parents in the old view but not the new. + for removed in old_view.difference(&peer_data.view) { + let _ = peer_data.view_knowledge.remove(removed); + } + + // Use both grid directions + let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &peer); + let lucky = is_gossip_peer || + util::gen_ratio_rng( + util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()), + util::MIN_GOSSIP_PEERS, + rng, + ); + + // Add entries for all relay-parents in the new view but not the old. + // Furthermore, send all statements we have for those relay parents. + let new_view = peer_data.view.difference(&old_view).copied().collect::>(); + for new in new_view.iter().copied() { + peer_data.view_knowledge.insert(new, Default::default()); + if !lucky { + continue + } + if let Some(active_head) = active_heads.get(&new) { + send_statements(peer.clone(), peer_data, ctx, new, active_head, metrics).await; + } + } +} + +/// Handle a local network update. +#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] +pub(crate) async fn handle_network_update( + ctx: &mut Context, + state: &mut State, + req_sender: &mpsc::Sender, + update: NetworkBridgeEvent, + rng: &mut R, + metrics: &Metrics, +) where + R: rand::Rng, +{ + let peers = &mut state.peers; + let topology_storage = &mut state.topology_storage; + let authorities = &mut state.authorities; + let active_heads = &mut state.active_heads; + let recent_outdated_heads = &state.recent_outdated_heads; + let runtime = &mut state.runtime; + + match update { + NetworkBridgeEvent::PeerConnected(peer, role, protocol_version, maybe_authority) => { + gum::trace!(target: LOG_TARGET, ?peer, ?role, ?protocol_version, "Peer connected"); + + let protocol_version = match ValidationVersion::try_from(protocol_version).ok() { + Some(v) => v, + None => { + gum::trace!( + target: LOG_TARGET, + ?peer, + ?protocol_version, + "unknown protocol version, ignoring" + ); + return + }, + }; + + peers.insert( + peer, + PeerData { + view: Default::default(), + protocol_version, + view_knowledge: Default::default(), + maybe_authority: maybe_authority.clone(), + }, + ); + if let Some(authority_ids) = maybe_authority { + authority_ids.into_iter().for_each(|a| { + authorities.insert(a, peer); + }); + } + }, + NetworkBridgeEvent::PeerDisconnected(peer) => { + gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected"); + if let Some(auth_ids) = peers.remove(&peer).and_then(|p| p.maybe_authority) { + auth_ids.into_iter().for_each(|a| { + authorities.remove(&a); + }); + } + }, + NetworkBridgeEvent::NewGossipTopology(topology) => { + let _ = metrics.time_network_bridge_update("new_gossip_topology"); + + let new_session_index = topology.session; + let new_topology: SessionGridTopology = topology.into(); + let old_topology = topology_storage.get_current_topology(); + let newly_added = new_topology.peers_diff(old_topology); + topology_storage.update_topology(new_session_index, new_topology); + for peer in newly_added { + if let Some(data) = peers.get_mut(&peer) { + let view = std::mem::take(&mut data.view); + update_peer_view_and_maybe_send_unlocked( + peer, + topology_storage.get_current_topology(), + data, + ctx, + &*active_heads, + view, + metrics, + rng, + ) + .await + } + } + }, + NetworkBridgeEvent::PeerMessage(peer, message) => { + handle_incoming_message_and_circulate( + peer, + topology_storage, + peers, + active_heads, + &*recent_outdated_heads, + ctx, + message, + req_sender, + metrics, + runtime, + rng, + ) + .await; + }, + NetworkBridgeEvent::PeerViewChange(peer, view) => { + let _ = metrics.time_network_bridge_update("peer_view_change"); + gum::trace!(target: LOG_TARGET, ?peer, ?view, "Peer view change"); + match peers.get_mut(&peer) { + Some(data) => + update_peer_view_and_maybe_send_unlocked( + peer, + topology_storage.get_current_topology(), + data, + ctx, + &*active_heads, + view, + metrics, + rng, + ) + .await, + None => (), + } + }, + NetworkBridgeEvent::OurViewChange(_view) => { + // handled by `ActiveLeavesUpdate` + }, + } +} + +/// Handle messages from responder background task. +pub(crate) async fn handle_responder_message( + state: &mut State, + message: ResponderMessage, +) -> JfyiErrorResult<()> { + let peers = &state.peers; + let active_heads = &mut state.active_heads; + + match message { + ResponderMessage::GetData { requesting_peer, relay_parent, candidate_hash, tx } => { + if !requesting_peer_knows_about_candidate( + peers, + &requesting_peer, + &relay_parent, + &candidate_hash, + )? { + return Err(JfyiError::RequestedUnannouncedCandidate( + requesting_peer, + candidate_hash, + )) + } + + let active_head = + active_heads.get(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?; + + let committed = match active_head.waiting_large_statements.get(&candidate_hash) { + Some(LargeStatementStatus::FetchedOrShared(committed)) => committed.clone(), + _ => + return Err(JfyiError::NoSuchFetchedLargeStatement(relay_parent, candidate_hash)), + }; + + tx.send(committed).map_err(|_| JfyiError::ResponderGetDataCanceled)?; + }, + } + Ok(()) +} + +#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)] +pub(crate) async fn handle_requester_message( + ctx: &mut Context, + state: &mut State, + req_sender: &mpsc::Sender, + rng: &mut R, + message: RequesterMessage, + metrics: &Metrics, +) -> JfyiErrorResult<()> { + let topology_storage = &state.topology_storage; + let peers = &mut state.peers; + let active_heads = &mut state.active_heads; + let recent_outdated_heads = &state.recent_outdated_heads; + let runtime = &mut state.runtime; + + match message { + RequesterMessage::Finished { + relay_parent, + candidate_hash, + from_peer, + response, + bad_peers, + } => { + for bad in bad_peers { + report_peer(ctx.sender(), bad, COST_FETCH_FAIL).await; + } + report_peer(ctx.sender(), from_peer, BENEFIT_VALID_RESPONSE).await; + + let active_head = + active_heads.get_mut(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?; + + let status = active_head.waiting_large_statements.remove(&candidate_hash); + + let info = match status { + Some(LargeStatementStatus::Fetching(info)) => info, + Some(LargeStatementStatus::FetchedOrShared(_)) => { + // We are no longer interested in the data. + return Ok(()) + }, + None => + return Err(JfyiError::NoSuchLargeStatementStatus(relay_parent, candidate_hash)), + }; + + active_head + .waiting_large_statements + .insert(candidate_hash, LargeStatementStatus::FetchedOrShared(response)); + + // Cache is now populated, send all messages: + for (peer, messages) in info.available_peers { + for message in messages { + handle_incoming_message_and_circulate( + peer, + topology_storage, + peers, + active_heads, + recent_outdated_heads, + ctx, + message, + req_sender, + &metrics, + runtime, + rng, + ) + .await; + } + } + }, + RequesterMessage::SendRequest(req) => { + ctx.send_message(NetworkBridgeTxMessage::SendRequests( + vec![req], + IfDisconnected::ImmediateError, + )) + .await; + }, + RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx } => { + let active_head = + active_heads.get_mut(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?; + + let status = active_head.waiting_large_statements.get_mut(&candidate_hash); + + let info = match status { + Some(LargeStatementStatus::Fetching(info)) => info, + Some(LargeStatementStatus::FetchedOrShared(_)) => { + // This task is going to die soon - no need to send it anything. + gum::debug!(target: LOG_TARGET, "Zombie task wanted more peers."); + return Ok(()) + }, + None => + return Err(JfyiError::NoSuchLargeStatementStatus(relay_parent, candidate_hash)), + }; + + if info.peers_to_try.is_empty() { + info.peer_sender = Some(tx); + } else { + let peers_to_try = std::mem::take(&mut info.peers_to_try); + if let Err(peers) = tx.send(peers_to_try) { + // No longer interested for now - might want them later: + info.peers_to_try = peers; + } + } + }, + RequesterMessage::ReportPeer(peer, rep) => report_peer(ctx.sender(), peer, rep).await, + } + Ok(()) +} + +/// Handle a deactivated leaf. +pub(crate) fn handle_deactivate_leaf(state: &mut State, deactivated: Hash) { + if state.active_heads.remove(&deactivated).is_some() { + gum::trace!( + target: LOG_TARGET, + hash = ?deactivated, + "Deactivating leaf", + ); + + state.recent_outdated_heads.note_outdated(deactivated); + } +} + +/// Handle a new activated leaf. This assumes that the leaf does not +/// support prospective parachains. +#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)] +pub(crate) async fn handle_activated_leaf( + ctx: &mut Context, + state: &mut State, + activated: ActivatedLeaf, +) -> Result<()> { + let relay_parent = activated.hash; + let span = PerLeafSpan::new(activated.span, "statement-distribution-legacy"); + gum::trace!( + target: LOG_TARGET, + hash = ?relay_parent, + "New active leaf", + ); + + // Retrieve the parachain validators at the child of the head we track. + let session_index = + state.runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?; + let info = state + .runtime + .get_session_info_by_index(ctx.sender(), relay_parent, session_index) + .await?; + let session_info = &info.session_info; + + state.active_heads.entry(relay_parent).or_insert(ActiveHeadData::new( + session_info.validators.clone(), + session_index, + span, + )); + + Ok(()) +} + +/// Share a local statement with the rest of the network. +#[overseer::contextbounds(StatementDistribution, prefix = self::overseer)] +pub(crate) async fn share_local_statement( + ctx: &mut Context, + state: &mut State, + relay_parent: Hash, + statement: SignedFullStatement, + rng: &mut R, + metrics: &Metrics, +) -> Result<()> { + // Make sure we have data in cache: + if is_statement_large(&statement).0 { + if let Statement::Seconded(committed) = &statement.payload() { + let active_head = state + .active_heads + .get_mut(&relay_parent) + // This should never be out-of-sync with our view if the view + // updates correspond to actual `StartWork` messages. + .ok_or(JfyiError::NoSuchHead(relay_parent))?; + active_head.waiting_large_statements.insert( + statement.payload().candidate_hash(), + LargeStatementStatus::FetchedOrShared(committed.clone()), + ); + } + } + + let info = state.runtime.get_session_info(ctx.sender(), relay_parent).await?; + let session_info = &info.session_info; + let validator_info = &info.validator_info; + + // Get peers in our group, so we can make sure they get our statement + // directly: + let group_peers = { + if let Some(our_group) = validator_info.our_group { + let our_group = &session_info.validator_groups[our_group.0 as usize]; + + our_group + .into_iter() + .filter_map(|i| { + if Some(*i) == validator_info.our_index { + return None + } + let authority_id = &session_info.discovery_keys[i.0 as usize]; + state.authorities.get(authority_id).map(|p| *p) + }) + .collect() + } else { + Vec::new() + } + }; + circulate_statement_and_dependents( + &mut state.topology_storage, + &mut state.peers, + &mut state.active_heads, + ctx, + relay_parent, + statement, + group_peers, + metrics, + rng, + ) + .await; + + Ok(()) +} + +/// Check whether a peer knows about a candidate from us. +/// +/// If not, it is deemed illegal for it to request corresponding data from us. +fn requesting_peer_knows_about_candidate( + peers: &HashMap, + requesting_peer: &PeerId, + relay_parent: &Hash, + candidate_hash: &CandidateHash, +) -> JfyiErrorResult { + let peer_data = peers + .get(requesting_peer) + .ok_or_else(|| JfyiError::NoSuchPeer(*requesting_peer))?; + let knowledge = peer_data + .view_knowledge + .get(relay_parent) + .ok_or_else(|| JfyiError::NoSuchHead(*relay_parent))?; + Ok(knowledge.sent_candidates.get(&candidate_hash).is_some()) +} + +fn compatible_v1_message( + version: ValidationVersion, + message: protocol_v1::StatementDistributionMessage, +) -> net_protocol::StatementDistributionMessage { + match version { + ValidationVersion::V1 => Versioned::V1(message), + ValidationVersion::VStaging => Versioned::VStaging( + protocol_vstaging::StatementDistributionMessage::V1Compatibility(message), + ), + } +} diff --git a/node/network/statement-distribution/src/requester.rs b/node/network/statement-distribution/src/legacy_v1/requester.rs similarity index 98% rename from node/network/statement-distribution/src/requester.rs rename to node/network/statement-distribution/src/legacy_v1/requester.rs index 24ffa5e41742..f711b09d413a 100644 --- a/node/network/statement-distribution/src/requester.rs +++ b/node/network/statement-distribution/src/legacy_v1/requester.rs @@ -32,7 +32,10 @@ use polkadot_node_subsystem::{Span, Stage}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::v2::{CandidateHash, CommittedCandidateReceipt, Hash}; -use crate::{metrics::Metrics, COST_WRONG_HASH, LOG_TARGET}; +use crate::{ + legacy_v1::{COST_WRONG_HASH, LOG_TARGET}, + metrics::Metrics, +}; // In case we failed fetching from our known peers, how long we should wait before attempting a // retry, even though we have not yet discovered any new peers. Or in other words how long to diff --git a/node/network/statement-distribution/src/responder.rs b/node/network/statement-distribution/src/legacy_v1/responder.rs similarity index 100% rename from node/network/statement-distribution/src/responder.rs rename to node/network/statement-distribution/src/legacy_v1/responder.rs diff --git a/node/network/statement-distribution/src/tests.rs b/node/network/statement-distribution/src/legacy_v1/tests.rs similarity index 98% rename from node/network/statement-distribution/src/tests.rs rename to node/network/statement-distribution/src/legacy_v1/tests.rs index f1a4c0562c94..f3db07757e06 100644 --- a/node/network/statement-distribution/src/tests.rs +++ b/node/network/statement-distribution/src/legacy_v1/tests.rs @@ -14,7 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::{metrics::Metrics, *}; +use super::*; +use crate::{metrics::Metrics, *}; + use assert_matches::assert_matches; use futures::executor::{self, block_on}; use futures_timer::Delay; @@ -25,7 +27,7 @@ use polkadot_node_network_protocol::{ v1::{StatementFetchingRequest, StatementFetchingResponse}, IncomingRequest, Recipient, ReqProtocolNames, Requests, }, - view, ObservedRole, + view, ObservedRole, VersionedValidationProtocol, }; use polkadot_node_primitives::{ SignedFullStatementWithPVD, Statement, UncheckedSignedFullStatement, @@ -512,6 +514,7 @@ fn peer_view_update_sends_messages() { let mut peer_data = PeerData { view: old_view, + protocol_version: ValidationVersion::V1, view_knowledge: { let mut k = HashMap::new(); @@ -570,8 +573,9 @@ fn peer_view_update_sends_messages() { for statement in active_head.statements_about(candidate_hash) { let message = handle.recv().await; let expected_to = vec![peer.clone()]; - let expected_payload = - statement_message(hash_c, statement.statement.clone(), &Metrics::default()); + let expected_payload = VersionedValidationProtocol::from(Versioned::V1( + v1_statement_message(hash_c, statement.statement.clone(), &Metrics::default()), + )); assert_matches!( message, @@ -612,6 +616,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { let peer_data_from_view = |view: View| PeerData { view: view.clone(), + protocol_version: ValidationVersion::V1, view_knowledge: view.iter().map(|v| (v.clone(), Default::default())).collect(), maybe_authority: None, }; @@ -716,7 +721,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { assert_eq!( payload, - statement_message(hash_b, statement.statement.clone(), &Metrics::default()), + VersionedValidationProtocol::from(Versioned::V1(v1_statement_message(hash_b, statement.statement.clone(), &Metrics::default()))), ); } ) @@ -1688,9 +1693,17 @@ fn share_prioritizes_backing_group() { .await .unwrap(); - SignedFullStatement::sign( + // note: this is ignored by legacy-v1 code. + let pvd = PersistedValidationData { + parent_head: HeadData::from(vec![1, 2, 3]), + relay_parent_number: 0, + relay_parent_storage_root: Hash::repeat_byte(42), + max_pov_size: 100, + }; + + SignedFullStatementWithPVD::sign( &keystore, - Statement::Seconded(candidate.clone()), + Statement::Seconded(candidate.clone()).supply_pvd(pvd), &signing_context, ValidatorIndex(4), &ferdie_public.into(), @@ -1701,14 +1714,15 @@ fn share_prioritizes_backing_group() { .expect("should be signed") }; - let metadata = derive_metadata_assuming_seconded(hash_a, statement.clone().into()); - handle .send(FromOrchestra::Communication { msg: StatementDistributionMessage::Share(hash_a, statement.clone()), }) .await; + let statement = StatementWithPVD::drop_pvd_from_signed(statement); + let metadata = derive_metadata_assuming_seconded(hash_a, statement.clone().into()); + // Messages should go out: assert_matches!( handle.recv().await, @@ -2390,3 +2404,8 @@ fn derive_metadata_assuming_seconded( signature: statement.unchecked_signature().clone(), } } + +// TODO [now]: adapt most tests to v2 messages. +// TODO [now]: test that v2 peers send v1 messages to v1 peers +// TODO [now]: test that v2 peers handle v1 messages from v1 peers. +// TODO [now]: test that v2 peers send v2 messages to v2 peers. diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 40497f77557f..9731818a459a 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -22,109 +22,45 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] -use error::{log_error, FatalResult, JfyiErrorResult}; -use parity_scale_codec::Encode; +use error::{log_error, FatalResult}; use polkadot_node_network_protocol::{ - self as net_protocol, - grid_topology::{RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology}, - peer_set::{IsAuthority, PeerSet}, request_response::{v1 as request_v1, IncomingRequestReceiver}, - v1::{self as protocol_v1, StatementMetadata}, - vstaging as protocol_vstaging, IfDisconnected, PeerId, UnifiedReputationChange as Rep, - Versioned, View, + vstaging as protocol_vstaging, Versioned, }; -use polkadot_node_primitives::{ - SignedFullStatement, Statement, StatementWithPVD, UncheckedSignedFullStatement, -}; -use polkadot_node_subsystem_util::{self as util, rand, MIN_GOSSIP_PEERS}; - +use polkadot_node_primitives::StatementWithPVD; use polkadot_node_subsystem::{ - jaeger, - messages::{ - CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, - StatementDistributionMessage, - }, - overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, PerLeafSpan, SpawnedSubsystem, - StatementDistributionSenderTrait, SubsystemError, -}; -use polkadot_primitives::v2::{ - AuthorityDiscoveryId, CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash, - Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, SignedStatement, SigningContext, - UncheckedSignedStatement, ValidatorId, ValidatorIndex, ValidatorSignature, + messages::{NetworkBridgeEvent, StatementDistributionMessage}, + overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; +use polkadot_node_subsystem_util::rand; -use futures::{ - channel::{mpsc, oneshot}, - future::RemoteHandle, - prelude::*, -}; -use indexmap::{map::Entry as IEntry, IndexMap}; +use futures::{channel::mpsc, prelude::*}; use sp_keystore::SyncCryptoStorePtr; -use util::runtime::RuntimeInfo; - -use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; use fatality::Nested; mod error; pub use error::{Error, FatalError, JfyiError, Result}; -/// Background task logic for requesting of large statements. -mod requester; -use requester::{fetch, RequesterMessage}; - -/// Background task logic for responding for large statements. -mod responder; -use responder::{respond, ResponderMessage}; - /// Metrics for the statement distribution pub(crate) mod metrics; use metrics::Metrics; -#[cfg(test)] -mod tests; - -const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement"); -const COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE: Rep = - Rep::CostMinor("Unexpected Statement, missing knowlege for relay parent"); -const COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE: Rep = - Rep::CostMinor("Unexpected Statement, unknown candidate"); -const COST_UNEXPECTED_STATEMENT_REMOTE: Rep = - Rep::CostMinor("Unexpected Statement, remote not allowed"); - -const COST_FETCH_FAIL: Rep = - Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed"); -const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature"); -const COST_WRONG_HASH: Rep = Rep::CostMajor("Received candidate had wrong hash"); -const COST_DUPLICATE_STATEMENT: Rep = - Rep::CostMajorRepeated("Statement sent more than once by peer"); -const COST_APPARENT_FLOOD: Rep = Rep::Malicious("Peer appears to be flooding us with statements"); - -const BENEFIT_VALID_STATEMENT: Rep = Rep::BenefitMajor("Peer provided a valid statement"); -const BENEFIT_VALID_STATEMENT_FIRST: Rep = - Rep::BenefitMajorFirst("Peer was the first to provide a valid statement"); -const BENEFIT_VALID_RESPONSE: Rep = - Rep::BenefitMajor("Peer provided a valid large statement response"); - -/// The maximum amount of candidates each validator is allowed to second at any relay-parent. -/// Short for "Validator Candidate Threshold". -/// -/// This is the amount of candidates we keep per validator at any relay-parent. -/// Typically we will only keep 1, but when a validator equivocates we will need to track 2. -const VC_THRESHOLD: usize = 2; +mod legacy_v1; +use legacy_v1::{ + respond as v1_respond_task, RequesterMessage as V1RequesterMessage, + ResponderMessage as V1ResponderMessage, +}; const LOG_TARGET: &str = "parachain::statement-distribution"; -/// Large statements should be rare. -const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20; - /// The statement distribution subsystem. pub struct StatementDistributionSubsystem { /// Pointer to a keystore, which is required for determining this node's validator index. keystore: SyncCryptoStorePtr, /// Receiver for incoming large statement requests. - req_receiver: Option>, + v1_req_receiver: Option>, /// Prometheus metrics metrics: Metrics, /// Pseudo-random generator for peers selection logic @@ -146,1679 +82,35 @@ impl StatementDistributionSubsyst } } -#[derive(Default)] -struct RecentOutdatedHeads { - buf: VecDeque, -} - -impl RecentOutdatedHeads { - fn note_outdated(&mut self, hash: Hash) { - const MAX_BUF_LEN: usize = 10; - - self.buf.push_back(hash); - - while self.buf.len() > MAX_BUF_LEN { - let _ = self.buf.pop_front(); - } - } - - fn is_recent_outdated(&self, hash: &Hash) -> bool { - self.buf.contains(hash) - } -} - -/// Tracks our impression of a single peer's view of the candidates a validator has seconded -/// for a given relay-parent. -/// -/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD` -/// via other means. -#[derive(Default)] -struct VcPerPeerTracker { - local_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>, - remote_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>, -} - -impl VcPerPeerTracker { - /// Note that the remote should now be aware that a validator has seconded a given candidate (by hash) - /// based on a message that we have sent it from our local pool. - fn note_local(&mut self, h: CandidateHash) { - if !note_hash(&mut self.local_observed, h) { - gum::warn!( - target: LOG_TARGET, - "Statement distribution is erroneously attempting to distribute more \ - than {} candidate(s) per validator index. Ignoring", - VC_THRESHOLD, - ); - } - } - - /// Note that the remote should now be aware that a validator has seconded a given candidate (by hash) - /// based on a message that it has sent us. - /// - /// Returns `true` if the peer was allowed to send us such a message, `false` otherwise. - fn note_remote(&mut self, h: CandidateHash) -> bool { - note_hash(&mut self.remote_observed, h) - } - - /// Returns `true` if the peer is allowed to send us such a message, `false` otherwise. - fn is_wanted_candidate(&self, h: &CandidateHash) -> bool { - !self.remote_observed.contains(h) && !self.remote_observed.is_full() - } -} - -fn note_hash( - observed: &mut arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>, - h: CandidateHash, -) -> bool { - if observed.contains(&h) { - return true - } - - observed.try_push(h).is_ok() -} - -/// knowledge that a peer has about goings-on in a relay parent. -#[derive(Default)] -struct PeerRelayParentKnowledge { - /// candidates that the peer is aware of because we sent statements to it. This indicates that we can - /// send other statements pertaining to that candidate. - sent_candidates: HashSet, - /// candidates that peer is aware of, because we received statements from it. - received_candidates: HashSet, - /// fingerprints of all statements a peer should be aware of: those that - /// were sent to the peer by us. - sent_statements: HashSet<(CompactStatement, ValidatorIndex)>, - /// fingerprints of all statements a peer should be aware of: those that - /// were sent to us by the peer. - received_statements: HashSet<(CompactStatement, ValidatorIndex)>, - /// How many candidates this peer is aware of for each given validator index. - seconded_counts: HashMap, - /// How many statements we've received for each candidate that we're aware of. - received_message_count: HashMap, - - /// How many large statements this peer already sent us. - /// - /// Flood protection for large statements is rather hard and as soon as we get - /// `https://github.com/paritytech/polkadot/issues/2979` implemented also no longer necessary. - /// Reason: We keep messages around until we fetched the payload, but if a node makes up - /// statements and never provides the data, we will keep it around for the slot duration. Not - /// even signature checking would help, as the sender, if a validator, can just sign arbitrary - /// invalid statements and will not face any consequences as long as it won't provide the - /// payload. - /// - /// Quick and temporary fix, only accept `MAX_LARGE_STATEMENTS_PER_SENDER` per connected node. - /// - /// Large statements should be rare, if they were not, we would run into problems anyways, as - /// we would not be able to distribute them in a timely manner. Therefore - /// `MAX_LARGE_STATEMENTS_PER_SENDER` can be set to a relatively small number. It is also not - /// per candidate hash, but in total as candidate hashes can be made up, as illustrated above. - /// - /// An attacker could still try to fill up our memory, by repeatedly disconnecting and - /// connecting again with new peer ids, but we assume that the resulting effective bandwidth - /// for such an attack would be too low. - large_statement_count: usize, - - /// We have seen a message that that is unexpected from this peer, so note this fact - /// and stop subsequent logging and peer reputation flood. - unexpected_count: usize, -} - -impl PeerRelayParentKnowledge { - /// Updates our view of the peer's knowledge with this statement's fingerprint based - /// on something that we would like to send to the peer. - /// - /// NOTE: assumes `self.can_send` returned true before this call. - /// - /// Once the knowledge has incorporated a statement, it cannot be incorporated again. - /// - /// This returns `true` if this is the first time the peer has become aware of a - /// candidate with the given hash. - fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { - debug_assert!( - self.can_send(fingerprint), - "send is only called after `can_send` returns true; qed", - ); - - let new_known = match fingerprint.0 { - CompactStatement::Seconded(ref h) => { - self.seconded_counts.entry(fingerprint.1).or_default().note_local(h.clone()); - - let was_known = self.is_known_candidate(h); - self.sent_candidates.insert(h.clone()); - !was_known - }, - CompactStatement::Valid(_) => false, - }; - - self.sent_statements.insert(fingerprint.clone()); - - new_known - } - - /// This returns `true` if the peer cannot accept this statement, without altering internal - /// state, `false` otherwise. - fn can_send(&self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { - let already_known = self.sent_statements.contains(fingerprint) || - self.received_statements.contains(fingerprint); - - if already_known { - return false - } - - match fingerprint.0 { - CompactStatement::Valid(ref h) => { - // The peer can only accept Valid statements for which it is aware - // of the corresponding candidate. - self.is_known_candidate(h) - }, - CompactStatement::Seconded(_) => true, - } - } - - /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on - /// a message we are receiving from the peer. - /// - /// Provide the maximum message count that we can receive per candidate. In practice we should - /// not receive more statements for any one candidate than there are members in the group assigned - /// to that para, but this maximum needs to be lenient to account for equivocations that may be - /// cross-group. As such, a maximum of 2 * `n_validators` is recommended. - /// - /// This returns an error if the peer should not have sent us this message according to protocol - /// rules for flood protection. - /// - /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new - /// candidate, we are then cleared to send the peer further statements about that candidate. - /// - /// This returns `Ok(true)` if this is the first time the peer has become aware of a - /// candidate with given hash. - fn receive( - &mut self, - fingerprint: &(CompactStatement, ValidatorIndex), - max_message_count: usize, - ) -> std::result::Result { - // We don't check `sent_statements` because a statement could be in-flight from both - // sides at the same time. - if self.received_statements.contains(fingerprint) { - return Err(COST_DUPLICATE_STATEMENT) - } - - let (candidate_hash, fresh) = match fingerprint.0 { - CompactStatement::Seconded(ref h) => { - let allowed_remote = self - .seconded_counts - .entry(fingerprint.1) - .or_insert_with(Default::default) - .note_remote(h.clone()); - - if !allowed_remote { - return Err(COST_UNEXPECTED_STATEMENT_REMOTE) - } - - (h, !self.is_known_candidate(h)) - }, - CompactStatement::Valid(ref h) => { - if !self.is_known_candidate(h) { - return Err(COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE) - } - - (h, false) - }, - }; - - { - let received_per_candidate = - self.received_message_count.entry(*candidate_hash).or_insert(0); - - if *received_per_candidate >= max_message_count { - return Err(COST_APPARENT_FLOOD) - } - - *received_per_candidate += 1; - } - - self.received_statements.insert(fingerprint.clone()); - self.received_candidates.insert(candidate_hash.clone()); - Ok(fresh) - } - - /// Note a received large statement metadata. - fn receive_large_statement(&mut self) -> std::result::Result<(), Rep> { - if self.large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER { - return Err(COST_APPARENT_FLOOD) - } - self.large_statement_count += 1; - Ok(()) - } - - /// This method does the same checks as `receive` without modifying the internal state. - /// Returns an error if the peer should not have sent us this message according to protocol - /// rules for flood protection. - fn check_can_receive( - &self, - fingerprint: &(CompactStatement, ValidatorIndex), - max_message_count: usize, - ) -> std::result::Result<(), Rep> { - // We don't check `sent_statements` because a statement could be in-flight from both - // sides at the same time. - if self.received_statements.contains(fingerprint) { - return Err(COST_DUPLICATE_STATEMENT) - } - - let candidate_hash = match fingerprint.0 { - CompactStatement::Seconded(ref h) => { - let allowed_remote = self - .seconded_counts - .get(&fingerprint.1) - .map_or(true, |r| r.is_wanted_candidate(h)); - - if !allowed_remote { - return Err(COST_UNEXPECTED_STATEMENT_REMOTE) - } - - h - }, - CompactStatement::Valid(ref h) => { - if !self.is_known_candidate(&h) { - return Err(COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE) - } - - h - }, - }; - - let received_per_candidate = self.received_message_count.get(candidate_hash).unwrap_or(&0); - - if *received_per_candidate >= max_message_count { - Err(COST_APPARENT_FLOOD) - } else { - Ok(()) - } - } - - /// Check for candidates that the peer is aware of. This indicates that we can - /// send other statements pertaining to that candidate. - fn is_known_candidate(&self, candidate: &CandidateHash) -> bool { - self.sent_candidates.contains(candidate) || self.received_candidates.contains(candidate) - } -} - -struct PeerData { - view: View, - view_knowledge: HashMap, - /// Peer might be known as authority with the given ids. - maybe_authority: Option>, -} - -impl PeerData { - /// Updates our view of the peer's knowledge with this statement's fingerprint based - /// on something that we would like to send to the peer. - /// - /// NOTE: assumes `self.can_send` returned true before this call. - /// - /// Once the knowledge has incorporated a statement, it cannot be incorporated again. - /// - /// This returns `true` if this is the first time the peer has become aware of a - /// candidate with the given hash. - fn send( - &mut self, - relay_parent: &Hash, - fingerprint: &(CompactStatement, ValidatorIndex), - ) -> bool { - debug_assert!( - self.can_send(relay_parent, fingerprint), - "send is only called after `can_send` returns true; qed", - ); - self.view_knowledge - .get_mut(relay_parent) - .expect("send is only called after `can_send` returns true; qed") - .send(fingerprint) - } - - /// This returns `None` if the peer cannot accept this statement, without altering internal - /// state. - fn can_send( - &self, - relay_parent: &Hash, - fingerprint: &(CompactStatement, ValidatorIndex), - ) -> bool { - self.view_knowledge.get(relay_parent).map_or(false, |k| k.can_send(fingerprint)) - } - - /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on - /// a message we are receiving from the peer. - /// - /// Provide the maximum message count that we can receive per candidate. In practice we should - /// not receive more statements for any one candidate than there are members in the group assigned - /// to that para, but this maximum needs to be lenient to account for equivocations that may be - /// cross-group. As such, a maximum of 2 * `n_validators` is recommended. - /// - /// This returns an error if the peer should not have sent us this message according to protocol - /// rules for flood protection. - /// - /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new - /// candidate, we are then cleared to send the peer further statements about that candidate. - /// - /// This returns `Ok(true)` if this is the first time the peer has become aware of a - /// candidate with given hash. - fn receive( - &mut self, - relay_parent: &Hash, - fingerprint: &(CompactStatement, ValidatorIndex), - max_message_count: usize, - ) -> std::result::Result { - self.view_knowledge - .get_mut(relay_parent) - .ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)? - .receive(fingerprint, max_message_count) - } - - /// This method does the same checks as `receive` without modifying the internal state. - /// Returns an error if the peer should not have sent us this message according to protocol - /// rules for flood protection. - fn check_can_receive( - &self, - relay_parent: &Hash, - fingerprint: &(CompactStatement, ValidatorIndex), - max_message_count: usize, - ) -> std::result::Result<(), Rep> { - self.view_knowledge - .get(relay_parent) - .ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)? - .check_can_receive(fingerprint, max_message_count) - } - - /// Receive a notice about out of view statement and returns the value of the old flag - fn receive_unexpected(&mut self, relay_parent: &Hash) -> usize { - self.view_knowledge - .get_mut(relay_parent) - .map_or(0_usize, |relay_parent_peer_knowledge| { - let old = relay_parent_peer_knowledge.unexpected_count; - relay_parent_peer_knowledge.unexpected_count += 1_usize; - old - }) - } - - /// Basic flood protection for large statements. - fn receive_large_statement(&mut self, relay_parent: &Hash) -> std::result::Result<(), Rep> { - self.view_knowledge - .get_mut(relay_parent) - .ok_or(COST_UNEXPECTED_STATEMENT_MISSING_KNOWLEDGE)? - .receive_large_statement() - } -} - -// A statement stored while a relay chain head is active. -#[derive(Debug, Copy, Clone)] -struct StoredStatement<'a> { - comparator: &'a StoredStatementComparator, - statement: &'a SignedFullStatement, -} - -// A value used for comparison of stored statements to each other. -// -// The compact version of the statement, the validator index, and the signature of the validator -// is enough to differentiate between all types of equivocations, as long as the signature is -// actually checked to be valid. The same statement with 2 signatures and 2 statements with -// different (or same) signatures wll all be correctly judged to be unequal with this comparator. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -struct StoredStatementComparator { - compact: CompactStatement, - validator_index: ValidatorIndex, - signature: ValidatorSignature, -} - -impl<'a> From<(&'a StoredStatementComparator, &'a SignedFullStatement)> for StoredStatement<'a> { - fn from( - (comparator, statement): (&'a StoredStatementComparator, &'a SignedFullStatement), - ) -> Self { - Self { comparator, statement } - } -} - -impl<'a> StoredStatement<'a> { - fn compact(&self) -> &'a CompactStatement { - &self.comparator.compact - } - - fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) { - (self.comparator.compact.clone(), self.statement.validator_index()) - } -} - -#[derive(Debug)] -enum NotedStatement<'a> { - NotUseful, - Fresh(StoredStatement<'a>), - UsefulButKnown, -} - -/// Large statement fetching status. -enum LargeStatementStatus { - /// We are currently fetching the statement data from a remote peer. We keep a list of other nodes - /// claiming to have that data and will fallback on them. - Fetching(FetchingInfo), - /// Statement data is fetched or we got it locally via `StatementDistributionMessage::Share`. - FetchedOrShared(CommittedCandidateReceipt), -} - -/// Info about a fetch in progress. -struct FetchingInfo { - /// All peers that send us a `LargeStatement` or a `Valid` statement for the given - /// `CandidateHash`, together with their originally sent messages. - /// - /// We use an `IndexMap` here to preserve the ordering of peers sending us messages. This is - /// desirable because we reward first sending peers with reputation. - available_peers: IndexMap>, - /// Peers left to try in case the background task needs it. - peers_to_try: Vec, - /// Sender for sending fresh peers to the fetching task in case of failure. - peer_sender: Option>>, - /// Task taking care of the request. - /// - /// Will be killed once dropped. - #[allow(dead_code)] - fetching_task: RemoteHandle<()>, -} - /// Messages to be handled in this subsystem. enum MuxedMessage { /// Messages from other subsystems. Subsystem(FatalResult>), - /// Messages from spawned requester background tasks. - Requester(Option), - /// Messages from spawned responder background task. - Responder(Option), + /// Messages from spawned v1 (legacy) requester background tasks. + V1Requester(Option), + /// Messages from spawned v1 (legacy) responder background task. + V1Responder(Option), } #[overseer::contextbounds(StatementDistribution, prefix = self::overseer)] impl MuxedMessage { async fn receive( ctx: &mut Context, - from_requester: &mut mpsc::Receiver, - from_responder: &mut mpsc::Receiver, + from_v1_requester: &mut mpsc::Receiver, + from_v1_responder: &mut mpsc::Receiver, ) -> MuxedMessage { // We are only fusing here to make `select` happy, in reality we will quit if one of those // streams end: - let from_overseer = ctx.recv().fuse(); - let from_requester = from_requester.next(); - let from_responder = from_responder.next(); - futures::pin_mut!(from_overseer, from_requester, from_responder); + let from_orchestra = ctx.recv().fuse(); + let from_v1_requester = from_v1_requester.next(); + let from_v1_responder = from_v1_responder.next(); + futures::pin_mut!(from_orchestra, from_v1_requester, from_v1_responder); futures::select! { - msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), - msg = from_requester => MuxedMessage::Requester(msg), - msg = from_responder => MuxedMessage::Responder(msg), - } - } -} - -#[derive(Debug, PartialEq, Eq)] -enum DeniedStatement { - NotUseful, - UsefulButKnown, -} - -struct ActiveHeadData { - /// All candidates we are aware of for this head, keyed by hash. - candidates: HashSet, - /// Persisted validation data cache. - cached_validation_data: HashMap, - /// Stored statements for circulation to peers. - /// - /// These are iterable in insertion order, and `Seconded` statements are always - /// accepted before dependent statements. - statements: IndexMap, - /// Large statements we are waiting for with associated meta data. - waiting_large_statements: HashMap, - /// The parachain validators at the head's child session index. - validators: Vec, - /// The current session index of this fork. - session_index: sp_staking::SessionIndex, - /// How many `Seconded` statements we've seen per validator. - seconded_counts: HashMap, - /// A Jaeger span for this head, so we can attach data to it. - span: PerLeafSpan, -} - -impl ActiveHeadData { - fn new( - validators: Vec, - session_index: sp_staking::SessionIndex, - span: PerLeafSpan, - ) -> Self { - ActiveHeadData { - candidates: Default::default(), - cached_validation_data: Default::default(), - statements: Default::default(), - waiting_large_statements: Default::default(), - validators, - session_index, - seconded_counts: Default::default(), - span, - } - } - - async fn fetch_persisted_validation_data( - &mut self, - sender: &mut Sender, - relay_parent: Hash, - para_id: ParaId, - ) -> Result> - where - Sender: StatementDistributionSenderTrait, - { - if let Entry::Vacant(entry) = self.cached_validation_data.entry(para_id) { - let persisted_validation_data = - polkadot_node_subsystem_util::request_persisted_validation_data( - relay_parent, - para_id, - OccupiedCoreAssumption::Free, - sender, - ) - .await - .await - .map_err(Error::RuntimeApiUnavailable)? - .map_err(|err| Error::FetchPersistedValidationData(para_id, err))?; - - match persisted_validation_data { - Some(pvd) => entry.insert(pvd), - None => return Ok(None), - }; - } - - Ok(self.cached_validation_data.get(¶_id)) - } - - /// Note the given statement. - /// - /// If it was not already known and can be accepted, returns `NotedStatement::Fresh`, - /// with a handle to the statement. - /// - /// If it can be accepted, but we already know it, returns `NotedStatement::UsefulButKnown`. - /// - /// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements - /// per validator. These will be the first ones we see. The statement is assumed - /// to have been checked, including that the validator index is not out-of-bounds and - /// the signature is valid. - /// - /// Any other statements or those that reference a candidate we are not aware of cannot be accepted - /// and will return `NotedStatement::NotUseful`. - fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement { - let validator_index = statement.validator_index(); - let comparator = StoredStatementComparator { - compact: statement.payload().to_compact(), - validator_index, - signature: statement.signature().clone(), - }; - - match comparator.compact { - CompactStatement::Seconded(h) => { - let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); - if *seconded_so_far >= VC_THRESHOLD { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - ?statement, - "Extra statement is ignored" - ); - return NotedStatement::NotUseful - } - - self.candidates.insert(h); - if let Some(old) = self.statements.insert(comparator.clone(), statement) { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - statement = ?old, - "Known statement" - ); - NotedStatement::UsefulButKnown - } else { - *seconded_so_far += 1; - - gum::trace!( - target: LOG_TARGET, - ?validator_index, - statement = ?self.statements.last().expect("Just inserted").1, - "Noted new statement" - ); - // This will always return `Some` because it was just inserted. - let key_value = self - .statements - .get_key_value(&comparator) - .expect("Statement was just inserted; qed"); - - NotedStatement::Fresh(key_value.into()) - } - }, - CompactStatement::Valid(h) => { - if !self.candidates.contains(&h) { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - ?statement, - "Statement for unknown candidate" - ); - return NotedStatement::NotUseful - } - - if let Some(old) = self.statements.insert(comparator.clone(), statement) { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - statement = ?old, - "Known statement" - ); - NotedStatement::UsefulButKnown - } else { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - statement = ?self.statements.last().expect("Just inserted").1, - "Noted new statement" - ); - // This will always return `Some` because it was just inserted. - NotedStatement::Fresh( - self.statements - .get_key_value(&comparator) - .expect("Statement was just inserted; qed") - .into(), - ) - } - }, - } - } - - /// Returns an error if the statement is already known or not useful - /// without modifying the internal state. - fn check_useful_or_unknown( - &self, - statement: &UncheckedSignedStatement, - ) -> std::result::Result<(), DeniedStatement> { - let validator_index = statement.unchecked_validator_index(); - let compact = statement.unchecked_payload(); - let comparator = StoredStatementComparator { - compact: compact.clone(), - validator_index, - signature: statement.unchecked_signature().clone(), - }; - - match compact { - CompactStatement::Seconded(_) => { - let seconded_so_far = self.seconded_counts.get(&validator_index).unwrap_or(&0); - if *seconded_so_far >= VC_THRESHOLD { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - ?statement, - "Extra statement is ignored", - ); - return Err(DeniedStatement::NotUseful) - } - - if self.statements.contains_key(&comparator) { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - ?statement, - "Known statement", - ); - return Err(DeniedStatement::UsefulButKnown) - } - }, - CompactStatement::Valid(h) => { - if !self.candidates.contains(&h) { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - ?statement, - "Statement for unknown candidate", - ); - return Err(DeniedStatement::NotUseful) - } - - if self.statements.contains_key(&comparator) { - gum::trace!( - target: LOG_TARGET, - ?validator_index, - ?statement, - "Known statement", - ); - return Err(DeniedStatement::UsefulButKnown) - } - }, - } - Ok(()) - } - - /// Get an iterator over all statements for the active head. Seconded statements come first. - fn statements(&self) -> impl Iterator> + '_ { - self.statements.iter().map(Into::into) - } - - /// Get an iterator over all statements for the active head that are for a particular candidate. - fn statements_about( - &self, - candidate_hash: CandidateHash, - ) -> impl Iterator> + '_ { - self.statements() - .filter(move |s| s.compact().candidate_hash() == &candidate_hash) - } -} - -/// Check a statement signature under this parent hash. -fn check_statement_signature( - head: &ActiveHeadData, - relay_parent: Hash, - statement: UncheckedSignedStatement, -) -> std::result::Result { - let signing_context = - SigningContext { session_index: head.session_index, parent_hash: relay_parent }; - - head.validators - .get(statement.unchecked_validator_index().0 as usize) - .ok_or_else(|| statement.clone()) - .and_then(|v| statement.try_into_checked(&signing_context, v)) -} - -/// Places the statement in storage if it is new, and then -/// circulates the statement to all peers who have not seen it yet, and -/// sends all statements dependent on that statement to peers who could previously not receive -/// them but now can. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn circulate_statement_and_dependents( - topology_store: &SessionBoundGridTopologyStorage, - peers: &mut HashMap, - active_heads: &mut HashMap, - ctx: &mut Context, - relay_parent: Hash, - statement: SignedFullStatement, - priority_peers: Vec, - metrics: &Metrics, - rng: &mut impl rand::Rng, -) { - let active_head = match active_heads.get_mut(&relay_parent) { - Some(res) => res, - None => return, - }; - - let _span = active_head - .span - .child("circulate-statement") - .with_candidate(statement.payload().candidate_hash()) - .with_stage(jaeger::Stage::StatementDistribution); - - let topology = topology_store.get_topology_or_fallback(active_head.session_index); - // First circulate the statement directly to all peers needing it. - // The borrow of `active_head` needs to encompass only this (Rust) statement. - let outputs: Option<(CandidateHash, Vec)> = { - match active_head.note_statement(statement) { - NotedStatement::Fresh(stored) => Some(( - *stored.compact().candidate_hash(), - circulate_statement( - RequiredRouting::GridXY, - topology, - peers, - ctx, - relay_parent, - stored, - priority_peers, - metrics, - rng, - ) - .await, - )), - _ => None, - } - }; - - let _span = _span.child("send-to-peers"); - // Now send dependent statements to all peers needing them, if any. - if let Some((candidate_hash, peers_needing_dependents)) = outputs { - for peer in peers_needing_dependents { - if let Some(peer_data) = peers.get_mut(&peer) { - let _span_loop = _span.child("to-peer").with_peer_id(&peer); - // defensive: the peer data should always be some because the iterator - // of peers is derived from the set of peers. - send_statements_about( - peer, - peer_data, - ctx, - relay_parent, - candidate_hash, - &*active_head, - metrics, - ) - .await; - } - } - } -} - -/// Create a network message from a given statement. -fn statement_message( - relay_parent: Hash, - statement: SignedFullStatement, - metrics: &Metrics, -) -> net_protocol::VersionedValidationProtocol { - let (is_large, size) = is_statement_large(&statement); - if let Some(size) = size { - metrics.on_created_message(size); - } - - let msg = if is_large { - protocol_v1::StatementDistributionMessage::LargeStatement(StatementMetadata { - relay_parent, - candidate_hash: statement.payload().candidate_hash(), - signed_by: statement.validator_index(), - signature: statement.signature().clone(), - }) - } else { - protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement.into()) - }; - - protocol_v1::ValidationProtocol::StatementDistribution(msg).into() -} - -/// Check whether a statement should be treated as large statement. -/// -/// Also report size of statement - if it is a `Seconded` statement, otherwise `None`. -fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option) { - match &statement.payload() { - Statement::Seconded(committed) => { - let size = statement.as_unchecked().encoded_size(); - // Runtime upgrades will always be large and even if not - no harm done. - if committed.commitments.new_validation_code.is_some() { - return (true, Some(size)) - } - - // Half max size seems to be a good threshold to start not using notifications: - let threshold = - PeerSet::Validation.get_max_notification_size(IsAuthority::Yes) as usize / 2; - - (size >= threshold, Some(size)) - }, - Statement::Valid(_) => (false, None), - } -} - -/// Circulates a statement to all peers who have not seen it yet, and returns -/// an iterator over peers who need to have dependent statements sent. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn circulate_statement<'a, Context>( - required_routing: RequiredRouting, - topology: &SessionGridTopology, - peers: &mut HashMap, - ctx: &mut Context, - relay_parent: Hash, - stored: StoredStatement<'a>, - mut priority_peers: Vec, - metrics: &Metrics, - rng: &mut impl rand::Rng, -) -> Vec { - let fingerprint = stored.fingerprint(); - - let mut peers_to_send: Vec = peers - .iter() - .filter_map(|(peer, data)| { - if data.can_send(&relay_parent, &fingerprint) { - Some(peer.clone()) - } else { - None - } - }) - .collect(); - - let good_peers: HashSet<&PeerId> = peers_to_send.iter().collect(); - // Only take priority peers we can send data to: - priority_peers.retain(|p| good_peers.contains(p)); - - // Avoid duplicates: - let priority_set: HashSet<&PeerId> = priority_peers.iter().collect(); - peers_to_send.retain(|p| !priority_set.contains(p)); - - util::choose_random_subset_with_rng( - |e| topology.route_to_peer(required_routing, e), - &mut peers_to_send, - rng, - MIN_GOSSIP_PEERS, - ); - // We don't want to use less peers, than we would without any priority peers: - let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS); - // Make set full: - let needed_peers = min_size as i64 - priority_peers.len() as i64; - if needed_peers > 0 { - peers_to_send.truncate(needed_peers as usize); - // Order important here - priority peers are placed first, so will be sent first. - // This gives backers a chance to be among the first in requesting any large statement - // data. - priority_peers.append(&mut peers_to_send); - } - peers_to_send = priority_peers; - // We must not have duplicates: - debug_assert!( - peers_to_send.len() == peers_to_send.clone().into_iter().collect::>().len(), - "We filter out duplicates above. qed.", - ); - let peers_to_send: Vec<(PeerId, bool)> = peers_to_send - .into_iter() - .map(|peer_id| { - let new = peers - .get_mut(&peer_id) - .expect("a subset is taken above, so it exists; qed") - .send(&relay_parent, &fingerprint); - (peer_id, new) - }) - .collect(); - - // Send all these peers the initial statement. - if !peers_to_send.is_empty() { - let payload = statement_message(relay_parent, stored.statement.clone(), metrics); - gum::trace!( - target: LOG_TARGET, - ?peers_to_send, - ?relay_parent, - statement = ?stored.statement, - "Sending statement", - ); - ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( - peers_to_send.iter().map(|(p, _)| p.clone()).collect(), - payload, - )) - .await; - } - - peers_to_send - .into_iter() - .filter_map(|(peer, needs_dependent)| if needs_dependent { Some(peer) } else { None }) - .collect() -} - -/// Send all statements about a given candidate hash to a peer. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn send_statements_about( - peer: PeerId, - peer_data: &mut PeerData, - ctx: &mut Context, - relay_parent: Hash, - candidate_hash: CandidateHash, - active_head: &ActiveHeadData, - metrics: &Metrics, -) { - for statement in active_head.statements_about(candidate_hash) { - let fingerprint = statement.fingerprint(); - if !peer_data.can_send(&relay_parent, &fingerprint) { - continue - } - peer_data.send(&relay_parent, &fingerprint); - let payload = statement_message(relay_parent, statement.statement.clone(), metrics); - - gum::trace!( - target: LOG_TARGET, - ?peer, - ?relay_parent, - ?candidate_hash, - statement = ?statement.statement, - "Sending statement", - ); - ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( - vec![peer.clone()], - payload, - )) - .await; - - metrics.on_statement_distributed(); - } -} - -/// Send all statements at a given relay-parent to a peer. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn send_statements( - peer: PeerId, - peer_data: &mut PeerData, - ctx: &mut Context, - relay_parent: Hash, - active_head: &ActiveHeadData, - metrics: &Metrics, -) { - for statement in active_head.statements() { - let fingerprint = statement.fingerprint(); - if !peer_data.can_send(&relay_parent, &fingerprint) { - continue - } - peer_data.send(&relay_parent, &fingerprint); - let payload = statement_message(relay_parent, statement.statement.clone(), metrics); - - gum::trace!( - target: LOG_TARGET, - ?peer, - ?relay_parent, - statement = ?statement.statement, - "Sending statement" - ); - ctx.send_message(NetworkBridgeTxMessage::SendValidationMessage( - vec![peer.clone()], - payload, - )) - .await; - - metrics.on_statement_distributed(); - } -} - -async fn report_peer( - sender: &mut impl overseer::StatementDistributionSenderTrait, - peer: PeerId, - rep: Rep, -) { - sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await -} - -/// If message contains a statement, then retrieve it, otherwise fork task to fetch it. -/// -/// This function will also return `None` if the message did not pass some basic checks, in that -/// case no statement will be requested, on the flipside you get `ActiveHeadData` in addition to -/// your statement. -/// -/// If the message was large, but the result has been fetched already that one is returned. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn retrieve_statement_from_message<'a, Context>( - peer: PeerId, - message: protocol_v1::StatementDistributionMessage, - active_head: &'a mut ActiveHeadData, - ctx: &mut Context, - req_sender: &mpsc::Sender, - metrics: &Metrics, -) -> Option { - let fingerprint = message.get_fingerprint(); - let candidate_hash = *fingerprint.0.candidate_hash(); - - // Immediately return any Seconded statement: - let message = if let protocol_v1::StatementDistributionMessage::Statement(h, s) = message { - if let Statement::Seconded(_) = s.unchecked_payload() { - return Some(s) - } - protocol_v1::StatementDistributionMessage::Statement(h, s) - } else { - message - }; - - match active_head.waiting_large_statements.entry(candidate_hash) { - Entry::Occupied(mut occupied) => { - match occupied.get_mut() { - LargeStatementStatus::Fetching(info) => { - let is_large_statement = message.is_large_statement(); - - let is_new_peer = match info.available_peers.entry(peer) { - IEntry::Occupied(mut occupied) => { - occupied.get_mut().push(Versioned::V1(message)); - false - }, - IEntry::Vacant(vacant) => { - vacant.insert(vec![Versioned::V1(message)]); - true - }, - }; - - if is_new_peer & is_large_statement { - info.peers_to_try.push(peer); - // Answer any pending request for more peers: - if let Some(sender) = info.peer_sender.take() { - let to_send = std::mem::take(&mut info.peers_to_try); - if let Err(peers) = sender.send(to_send) { - // Requester no longer interested for now, might want them - // later: - info.peers_to_try = peers; - } - } - } - }, - LargeStatementStatus::FetchedOrShared(committed) => { - match message { - protocol_v1::StatementDistributionMessage::Statement(_, s) => { - // We can now immediately return any statements (should only be - // `Statement::Valid` ones, but we don't care at this point.) - return Some(s) - }, - protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => - return Some(UncheckedSignedFullStatement::new( - Statement::Seconded(committed.clone()), - metadata.signed_by, - metadata.signature.clone(), - )), - } - }, - } - }, - Entry::Vacant(vacant) => { - match message { - protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => { - if let Some(new_status) = - launch_request(metadata, peer, req_sender.clone(), ctx, metrics).await - { - vacant.insert(new_status); - } - }, - protocol_v1::StatementDistributionMessage::Statement(_, s) => { - // No fetch in progress, safe to return any statement immediately (we don't bother - // about normal network jitter which might cause `Valid` statements to arrive early - // for now.). - return Some(s) - }, - } - }, - } - None -} - -/// Launch request for a large statement and get tracking status. -/// -/// Returns `None` if spawning task failed. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn launch_request( - meta: StatementMetadata, - peer: PeerId, - req_sender: mpsc::Sender, - ctx: &mut Context, - metrics: &Metrics, -) -> Option { - let (task, handle) = - fetch(meta.relay_parent, meta.candidate_hash, vec![peer], req_sender, metrics.clone()) - .remote_handle(); - - let result = ctx.spawn("large-statement-fetcher", task.boxed()); - if let Err(err) = result { - gum::error!(target: LOG_TARGET, ?err, "Spawning task failed."); - return None - } - let available_peers = { - let mut m = IndexMap::new(); - m.insert( - peer, - vec![Versioned::V1(protocol_v1::StatementDistributionMessage::LargeStatement(meta))], - ); - m - }; - Some(LargeStatementStatus::Fetching(FetchingInfo { - available_peers, - peers_to_try: Vec::new(), - peer_sender: None, - fetching_task: handle, - })) -} - -/// Handle incoming message and circulate it to peers, if we did not know it already. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn handle_incoming_message_and_circulate<'a, Context, R>( - peer: PeerId, - topology_storage: &SessionBoundGridTopologyStorage, - peers: &mut HashMap, - active_heads: &'a mut HashMap, - recent_outdated_heads: &RecentOutdatedHeads, - ctx: &mut Context, - message: net_protocol::StatementDistributionMessage, - req_sender: &mpsc::Sender, - metrics: &Metrics, - runtime: &mut RuntimeInfo, - rng: &mut R, -) where - R: rand::Rng, -{ - let handled_incoming = match peers.get_mut(&peer) { - Some(data) => - handle_incoming_message( - peer, - data, - active_heads, - recent_outdated_heads, - ctx, - message, - req_sender, - metrics, - ) - .await, - None => None, - }; - - // if we got a fresh message, we need to circulate it to all peers. - if let Some((relay_parent, statement)) = handled_incoming { - // we can ignore the set of peers who this function returns as now expecting - // dependent statements. - // - // we have the invariant in this subsystem that we never store a `Valid` or `Invalid` - // statement before a `Seconded` statement. `Seconded` statements are the only ones - // that require dependents. Thus, if this is a `Seconded` statement for a candidate we - // were not aware of before, we cannot have any dependent statements from the candidate. - let _ = metrics.time_network_bridge_update("circulate_statement"); - - let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await; - let topology = match session_index { - Ok(session_index) => topology_storage.get_topology_or_fallback(session_index), - Err(e) => { - gum::debug!( - target: LOG_TARGET, - %relay_parent, - "cannot get session index for the specific relay parent: {:?}", - e - ); - - topology_storage.get_current_topology() - }, - }; - let required_routing = - topology.required_routing_by_index(statement.statement.validator_index(), false); - - let _ = circulate_statement( - required_routing, - topology, - peers, - ctx, - relay_parent, - statement, - Vec::new(), - metrics, - rng, - ) - .await; - } -} - -// Handle a statement. Returns a reference to a newly-stored statement -// if we were not already aware of it, along with the corresponding relay-parent. -// -// This function checks the signature and ensures the statement is compatible with our -// view. It also notifies candidate backing if the statement was previously unknown. -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn handle_incoming_message<'a, Context>( - peer: PeerId, - peer_data: &mut PeerData, - active_heads: &'a mut HashMap, - recent_outdated_heads: &RecentOutdatedHeads, - ctx: &mut Context, - message: net_protocol::StatementDistributionMessage, - req_sender: &mpsc::Sender, - metrics: &Metrics, -) -> Option<(Hash, StoredStatement<'a>)> { - let _ = metrics.time_network_bridge_update("handle_incoming_message"); - - // TODO [now] handle vstaging messages - let message = match message { - Versioned::V1(m) => m, - Versioned::VStaging(_) => unimplemented!(), - }; - - let relay_parent = message.get_relay_parent(); - - let active_head = match active_heads.get_mut(&relay_parent) { - Some(h) => h, - None => { - gum::debug!( - target: LOG_TARGET, - %relay_parent, - "our view out-of-sync with active heads; head not found", - ); - - if !recent_outdated_heads.is_recent_outdated(&relay_parent) { - report_peer(ctx.sender(), peer, COST_UNEXPECTED_STATEMENT).await; - } - - return None - }, - }; - - if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message { - if let Err(rep) = peer_data.receive_large_statement(&relay_parent) { - gum::debug!(target: LOG_TARGET, ?peer, ?message, ?rep, "Unexpected large statement.",); - report_peer(ctx.sender(), peer, rep).await; - return None - } - } - - let fingerprint = message.get_fingerprint(); - let candidate_hash = fingerprint.0.candidate_hash().clone(); - let handle_incoming_span = active_head - .span - .child("handle-incoming") - .with_candidate(candidate_hash) - .with_peer_id(&peer); - - let max_message_count = active_head.validators.len() * 2; - - // perform only basic checks before verifying the signature - // as it's more computationally heavy - if let Err(rep) = peer_data.check_can_receive(&relay_parent, &fingerprint, max_message_count) { - // This situation can happen when a peer's Seconded message was lost - // but we have received the Valid statement. - // So we check it once and then ignore repeated violation to avoid - // reputation change flood. - let unexpected_count = peer_data.receive_unexpected(&relay_parent); - - gum::debug!( - target: LOG_TARGET, - ?relay_parent, - ?peer, - ?message, - ?rep, - ?unexpected_count, - "Error inserting received statement" - ); - - match rep { - // This happens when a Valid statement has been received but there is no corresponding Seconded - COST_UNEXPECTED_STATEMENT_UNKNOWN_CANDIDATE => { - metrics.on_unexpected_statement_valid(); - // Report peer merely if this is not a duplicate out-of-view statement that - // was caused by a missing Seconded statement from this peer - if unexpected_count == 0_usize { - report_peer(ctx.sender(), peer, rep).await; - } - }, - // This happens when we have an unexpected remote peer that announced Seconded - COST_UNEXPECTED_STATEMENT_REMOTE => { - metrics.on_unexpected_statement_seconded(); - report_peer(ctx.sender(), peer, rep).await; - }, - _ => { - report_peer(ctx.sender(), peer, rep).await; - }, - } - - return None - } - - let checked_compact = { - let (compact, validator_index) = message.get_fingerprint(); - let signature = message.get_signature(); - - let unchecked_compact = UncheckedSignedStatement::new(compact, validator_index, signature); - - match active_head.check_useful_or_unknown(&unchecked_compact) { - Ok(()) => {}, - Err(DeniedStatement::NotUseful) => return None, - Err(DeniedStatement::UsefulButKnown) => { - // Note a received statement in the peer data - peer_data - .receive(&relay_parent, &fingerprint, max_message_count) - .expect("checked in `check_can_receive` above; qed"); - report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT).await; - - return None - }, - } - - // check the signature on the statement. - match check_statement_signature(&active_head, relay_parent, unchecked_compact) { - Err(statement) => { - gum::debug!(target: LOG_TARGET, ?peer, ?statement, "Invalid statement signature"); - report_peer(ctx.sender(), peer, COST_INVALID_SIGNATURE).await; - return None - }, - Ok(statement) => statement, - } - }; - - // Fetch from the network only after signature and usefulness checks are completed. - let is_large_statement = message.is_large_statement(); - let statement = - retrieve_statement_from_message(peer, message, active_head, ctx, req_sender, metrics) - .await?; - - let payload = statement.unchecked_into_payload(); - - // Upgrade the `Signed` wrapper from the compact payload to the full payload. - // This fails if the payload doesn't encode correctly. - let statement: SignedFullStatement = match checked_compact.convert_to_superpayload(payload) { - Err((compact, _)) => { - gum::debug!( - target: LOG_TARGET, - ?peer, - ?compact, - is_large_statement, - "Full statement had bad payload." - ); - report_peer(ctx.sender(), peer, COST_WRONG_HASH).await; - return None - }, - Ok(statement) => statement, - }; - - // Ensure the statement is stored in the peer data. - // - // Note that if the peer is sending us something that is not within their view, - // it will not be kept within their log. - match peer_data.receive(&relay_parent, &fingerprint, max_message_count) { - Err(_) => { - unreachable!("checked in `check_can_receive` above; qed"); - }, - Ok(true) => { - gum::trace!(target: LOG_TARGET, ?peer, ?statement, "Statement accepted"); - // Send the peer all statements concerning the candidate that we have, - // since it appears to have just learned about the candidate. - send_statements_about( - peer.clone(), - peer_data, - ctx, - relay_parent, - candidate_hash, - &*active_head, - metrics, - ) - .await; - }, - Ok(false) => {}, - } - - // TODO [https://github.com/paritytech/polkadot/issues/5055] - // - // For `Seconded` statements `None` or `Err` means we couldn't fetch the PVD, which - // means the statement shouldn't be accepted. - // - // In case of `Valid` we should have it cached prior, therefore this performs - // no Runtime API calls and always returns `Ok(Some(_))`. - if let Statement::Seconded(receipt) = statement.payload() { - let para_id = receipt.descriptor.para_id; - // Either call the Runtime API or check that validation data is cached. - let result = active_head - .fetch_persisted_validation_data(ctx.sender(), relay_parent, para_id) - .await; - if !matches!(result, Ok(Some(_))) { - return None + msg = from_orchestra => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), + msg = from_v1_requester => MuxedMessage::V1Requester(msg), + msg = from_v1_responder => MuxedMessage::V1Responder(msg), } } - - // Extend the payload with persisted validation data required by the backing - // subsystem. - // - // Do it in advance before noting the statement because we don't want to borrow active - // head mutable and use the cache. - let statement_with_pvd = statement - .clone() - .convert_to_superpayload_with(|statement| match statement { - Statement::Seconded(receipt) => { - let para_id = &receipt.descriptor.para_id; - let persisted_validation_data = active_head - .cached_validation_data - .get(para_id) - .cloned() - .expect("pvd is ensured to be cached above; qed"); - StatementWithPVD::Seconded(receipt, persisted_validation_data) - }, - Statement::Valid(candidate_hash) => StatementWithPVD::Valid(candidate_hash), - }) - .expect("payload was checked with conversion from compact; qed"); - - // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation - // or unpinned to a seconded candidate. So it is safe to place it into the storage. - match active_head.note_statement(statement) { - NotedStatement::NotUseful | NotedStatement::UsefulButKnown => { - unreachable!("checked in `is_useful_or_unknown` above; qed"); - }, - NotedStatement::Fresh(statement) => { - report_peer(ctx.sender(), peer, BENEFIT_VALID_STATEMENT_FIRST).await; - - let mut _span = handle_incoming_span.child("notify-backing"); - - // When we receive a new message from a peer, we forward it to the - // candidate backing subsystem. - ctx.send_message(CandidateBackingMessage::Statement( - relay_parent, - unimplemented!(), // TODO [now]: fixme - )) - .await; - - Some((relay_parent, statement)) - }, - } -} - -/// Update a peer's view. Sends all newly unlocked statements based on the previous -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn update_peer_view_and_maybe_send_unlocked( - peer: PeerId, - topology: &SessionGridTopology, - peer_data: &mut PeerData, - ctx: &mut Context, - active_heads: &HashMap, - new_view: View, - metrics: &Metrics, - rng: &mut R, -) where - R: rand::Rng, -{ - let old_view = std::mem::replace(&mut peer_data.view, new_view); - - // Remove entries for all relay-parents in the old view but not the new. - for removed in old_view.difference(&peer_data.view) { - let _ = peer_data.view_knowledge.remove(removed); - } - - // Use both grid directions - let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &peer); - let lucky = is_gossip_peer || - util::gen_ratio_rng( - util::MIN_GOSSIP_PEERS.saturating_sub(topology.len()), - util::MIN_GOSSIP_PEERS, - rng, - ); - - // Add entries for all relay-parents in the new view but not the old. - // Furthermore, send all statements we have for those relay parents. - let new_view = peer_data.view.difference(&old_view).copied().collect::>(); - for new in new_view.iter().copied() { - peer_data.view_knowledge.insert(new, Default::default()); - if !lucky { - continue - } - if let Some(active_head) = active_heads.get(&new) { - send_statements(peer.clone(), peer_data, ctx, new, active_head, metrics).await; - } - } -} - -#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] -async fn handle_network_update( - peers: &mut HashMap, - topology_storage: &mut SessionBoundGridTopologyStorage, - authorities: &mut HashMap, - active_heads: &mut HashMap, - recent_outdated_heads: &RecentOutdatedHeads, - ctx: &mut Context, - req_sender: &mpsc::Sender, - update: NetworkBridgeEvent, - metrics: &Metrics, - runtime: &mut RuntimeInfo, - rng: &mut R, -) where - R: rand::Rng, -{ - match update { - NetworkBridgeEvent::PeerConnected(peer, role, _, maybe_authority) => { - gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected"); - peers.insert( - peer, - PeerData { - view: Default::default(), - view_knowledge: Default::default(), - maybe_authority: maybe_authority.clone(), - }, - ); - if let Some(authority_ids) = maybe_authority { - authority_ids.into_iter().for_each(|a| { - authorities.insert(a, peer); - }); - } - }, - NetworkBridgeEvent::PeerDisconnected(peer) => { - gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected"); - if let Some(auth_ids) = peers.remove(&peer).and_then(|p| p.maybe_authority) { - auth_ids.into_iter().for_each(|a| { - authorities.remove(&a); - }); - } - }, - NetworkBridgeEvent::NewGossipTopology(topology) => { - let _ = metrics.time_network_bridge_update("new_gossip_topology"); - - let new_session_index = topology.session; - let new_topology: SessionGridTopology = topology.into(); - let old_topology = topology_storage.get_current_topology(); - let newly_added = new_topology.peers_diff(old_topology); - topology_storage.update_topology(new_session_index, new_topology); - for peer in newly_added { - if let Some(data) = peers.get_mut(&peer) { - let view = std::mem::take(&mut data.view); - update_peer_view_and_maybe_send_unlocked( - peer, - topology_storage.get_current_topology(), - data, - ctx, - &*active_heads, - view, - metrics, - rng, - ) - .await - } - } - }, - NetworkBridgeEvent::PeerMessage(peer, message) => { - handle_incoming_message_and_circulate( - peer, - topology_storage, - peers, - active_heads, - &*recent_outdated_heads, - ctx, - message, - req_sender, - metrics, - runtime, - rng, - ) - .await; - }, - NetworkBridgeEvent::PeerViewChange(peer, view) => { - let _ = metrics.time_network_bridge_update("peer_view_change"); - gum::trace!(target: LOG_TARGET, ?peer, ?view, "Peer view change"); - match peers.get_mut(&peer) { - Some(data) => - update_peer_view_and_maybe_send_unlocked( - peer, - topology_storage.get_current_topology(), - data, - ctx, - &*active_heads, - view, - metrics, - rng, - ) - .await, - None => (), - } - }, - NetworkBridgeEvent::OurViewChange(_view) => { - // handled by `ActiveLeavesUpdate` - }, - } } #[overseer::contextbounds(StatementDistribution, prefix = self::overseer)] @@ -1826,32 +118,26 @@ impl StatementDistributionSubsystem { /// Create a new Statement Distribution Subsystem pub fn new( keystore: SyncCryptoStorePtr, - req_receiver: IncomingRequestReceiver, + v1_req_receiver: IncomingRequestReceiver, metrics: Metrics, rng: R, ) -> Self { - Self { keystore, req_receiver: Some(req_receiver), metrics, rng } + Self { keystore, v1_req_receiver: Some(v1_req_receiver), metrics, rng } } async fn run(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> { - let mut peers: HashMap = HashMap::new(); - let mut topology_storage: SessionBoundGridTopologyStorage = Default::default(); - let mut authorities: HashMap = HashMap::new(); - let mut active_heads: HashMap = HashMap::new(); - let mut recent_outdated_heads = RecentOutdatedHeads::default(); - - let mut runtime = RuntimeInfo::new(Some(self.keystore.clone())); + let mut legacy_v1_state = crate::legacy_v1::State::new(self.keystore.clone()); // Sender/Receiver for getting news from our statement fetching tasks. - let (req_sender, mut req_receiver) = mpsc::channel(1); + let (v1_req_sender, mut v1_req_receiver) = mpsc::channel(1); // Sender/Receiver for getting news from our responder task. - let (res_sender, mut res_receiver) = mpsc::channel(1); + let (v1_res_sender, mut v1_res_receiver) = mpsc::channel(1); ctx.spawn( "large-statement-responder", - respond( - self.req_receiver.take().expect("Mandatory argument to new. qed"), - res_sender.clone(), + v1_respond_task( + self.v1_req_receiver.take().expect("Mandatory argument to new. qed"), + v1_res_sender.clone(), ) .boxed(), ) @@ -1859,19 +145,14 @@ impl StatementDistributionSubsystem { loop { let message = - MuxedMessage::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await; + MuxedMessage::receive(&mut ctx, &mut v1_req_receiver, &mut v1_res_receiver).await; match message { MuxedMessage::Subsystem(result) => { let result = self .handle_subsystem_message( &mut ctx, - &mut runtime, - &mut peers, - &mut topology_storage, - &mut authorities, - &mut active_heads, - &mut recent_outdated_heads, - &req_sender, + &mut legacy_v1_state, + &v1_req_sender, result?, ) .await; @@ -1881,29 +162,24 @@ impl StatementDistributionSubsystem { Err(jfyi) => gum::debug!(target: LOG_TARGET, error = ?jfyi), } }, - MuxedMessage::Requester(result) => { - let result = self - .handle_requester_message( - &mut ctx, - &topology_storage, - &mut peers, - &mut active_heads, - &recent_outdated_heads, - &req_sender, - &mut runtime, - result.ok_or(FatalError::RequesterReceiverFinished)?, - ) - .await; + MuxedMessage::V1Requester(result) => { + let result = crate::legacy_v1::handle_requester_message( + &mut ctx, + &mut legacy_v1_state, + &v1_req_sender, + &mut self.rng, + result.ok_or(FatalError::RequesterReceiverFinished)?, + &self.metrics, + ) + .await; log_error(result.map_err(From::from), "handle_requester_message")?; }, - MuxedMessage::Responder(result) => { - let result = self - .handle_responder_message( - &peers, - &mut active_heads, - result.ok_or(FatalError::ResponderReceiverFinished)?, - ) - .await; + MuxedMessage::V1Responder(result) => { + let result = crate::legacy_v1::handle_responder_message( + &mut legacy_v1_state, + result.ok_or(FatalError::ResponderReceiverFinished)?, + ) + .await; log_error(result.map_err(From::from), "handle_responder_message")?; }, }; @@ -1911,165 +187,11 @@ impl StatementDistributionSubsystem { Ok(()) } - /// Handle messages from responder background task. - async fn handle_responder_message( - &self, - peers: &HashMap, - active_heads: &mut HashMap, - message: ResponderMessage, - ) -> JfyiErrorResult<()> { - match message { - ResponderMessage::GetData { requesting_peer, relay_parent, candidate_hash, tx } => { - if !requesting_peer_knows_about_candidate( - peers, - &requesting_peer, - &relay_parent, - &candidate_hash, - )? { - return Err(JfyiError::RequestedUnannouncedCandidate( - requesting_peer, - candidate_hash, - )) - } - - let active_head = - active_heads.get(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?; - - let committed = match active_head.waiting_large_statements.get(&candidate_hash) { - Some(LargeStatementStatus::FetchedOrShared(committed)) => committed.clone(), - _ => - return Err(JfyiError::NoSuchFetchedLargeStatement( - relay_parent, - candidate_hash, - )), - }; - - tx.send(committed).map_err(|_| JfyiError::ResponderGetDataCanceled)?; - }, - } - Ok(()) - } - - async fn handle_requester_message( - &mut self, - ctx: &mut Context, - topology_storage: &SessionBoundGridTopologyStorage, - peers: &mut HashMap, - active_heads: &mut HashMap, - recent_outdated_heads: &RecentOutdatedHeads, - req_sender: &mpsc::Sender, - runtime: &mut RuntimeInfo, - message: RequesterMessage, - ) -> JfyiErrorResult<()> { - match message { - RequesterMessage::Finished { - relay_parent, - candidate_hash, - from_peer, - response, - bad_peers, - } => { - for bad in bad_peers { - report_peer(ctx.sender(), bad, COST_FETCH_FAIL).await; - } - report_peer(ctx.sender(), from_peer, BENEFIT_VALID_RESPONSE).await; - - let active_head = active_heads - .get_mut(&relay_parent) - .ok_or(JfyiError::NoSuchHead(relay_parent))?; - - let status = active_head.waiting_large_statements.remove(&candidate_hash); - - let info = match status { - Some(LargeStatementStatus::Fetching(info)) => info, - Some(LargeStatementStatus::FetchedOrShared(_)) => { - // We are no longer interested in the data. - return Ok(()) - }, - None => - return Err(JfyiError::NoSuchLargeStatementStatus( - relay_parent, - candidate_hash, - )), - }; - - active_head - .waiting_large_statements - .insert(candidate_hash, LargeStatementStatus::FetchedOrShared(response)); - - // Cache is now populated, send all messages: - for (peer, messages) in info.available_peers { - for message in messages { - handle_incoming_message_and_circulate( - peer, - topology_storage, - peers, - active_heads, - recent_outdated_heads, - ctx, - message, - req_sender, - &self.metrics, - runtime, - &mut self.rng, - ) - .await; - } - } - }, - RequesterMessage::SendRequest(req) => { - ctx.send_message(NetworkBridgeTxMessage::SendRequests( - vec![req], - IfDisconnected::ImmediateError, - )) - .await; - }, - RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx } => { - let active_head = active_heads - .get_mut(&relay_parent) - .ok_or(JfyiError::NoSuchHead(relay_parent))?; - - let status = active_head.waiting_large_statements.get_mut(&candidate_hash); - - let info = match status { - Some(LargeStatementStatus::Fetching(info)) => info, - Some(LargeStatementStatus::FetchedOrShared(_)) => { - // This task is going to die soon - no need to send it anything. - gum::debug!(target: LOG_TARGET, "Zombie task wanted more peers."); - return Ok(()) - }, - None => - return Err(JfyiError::NoSuchLargeStatementStatus( - relay_parent, - candidate_hash, - )), - }; - - if info.peers_to_try.is_empty() { - info.peer_sender = Some(tx); - } else { - let peers_to_try = std::mem::take(&mut info.peers_to_try); - if let Err(peers) = tx.send(peers_to_try) { - // No longer interested for now - might want them later: - info.peers_to_try = peers; - } - } - }, - RequesterMessage::ReportPeer(peer, rep) => report_peer(ctx.sender(), peer, rep).await, - } - Ok(()) - } - async fn handle_subsystem_message( &mut self, ctx: &mut Context, - runtime: &mut RuntimeInfo, - peers: &mut HashMap, - topology_storage: &mut SessionBoundGridTopologyStorage, - authorities: &mut HashMap, - active_heads: &mut HashMap, - recent_outdated_heads: &mut RecentOutdatedHeads, - req_sender: &mpsc::Sender, + legacy_v1_state: &mut legacy_v1::State, + v1_req_sender: &mpsc::Sender, message: FromOrchestra, ) -> Result { let metrics = &self.metrics; @@ -2082,138 +204,66 @@ impl StatementDistributionSubsystem { let _timer = metrics.time_active_leaves_update(); for deactivated in deactivated { - if active_heads.remove(&deactivated).is_some() { - gum::trace!( - target: LOG_TARGET, - hash = ?deactivated, - "Deactivating leaf", - ); - - recent_outdated_heads.note_outdated(deactivated); - } + crate::legacy_v1::handle_deactivate_leaf(legacy_v1_state, deactivated); } for activated in activated { - let relay_parent = activated.hash; - let span = PerLeafSpan::new(activated.span, "statement-distribution"); - gum::trace!( - target: LOG_TARGET, - hash = ?relay_parent, - "New active leaf", - ); - - // Retrieve the parachain validators at the child of the head we track. - let session_index = - runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?; - let info = runtime - .get_session_info_by_index(ctx.sender(), relay_parent, session_index) + // TODO [now]: legacy, activate only if no prospective parachains support. + crate::legacy_v1::handle_activated_leaf(ctx, legacy_v1_state, activated) .await?; - let session_info = &info.session_info; - - active_heads.entry(relay_parent).or_insert(ActiveHeadData::new( - session_info.validators.clone(), - session_index, - span, - )); } }, FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => { // do nothing }, FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(true), - FromOrchestra::Communication { msg } => match msg { - StatementDistributionMessage::Share(relay_parent, statement) => { - let _timer = metrics.time_share(); - - // Make sure we have data in cache: - if is_statement_large(&statement).0 { - if let Statement::Seconded(committed) = &statement.payload() { - let active_head = active_heads - .get_mut(&relay_parent) - // This should never be out-of-sync with our view if the view - // updates correspond to actual `StartWork` messages. - .ok_or(JfyiError::NoSuchHead(relay_parent))?; - active_head.waiting_large_statements.insert( - statement.payload().candidate_hash(), - LargeStatementStatus::FetchedOrShared(committed.clone()), - ); + FromOrchestra::Communication { msg } => + match msg { + StatementDistributionMessage::Share(relay_parent, statement) => { + let _timer = metrics.time_share(); + + // pass to legacy if legacy state contains head. + if legacy_v1_state.contains_relay_parent(&relay_parent) { + crate::legacy_v1::share_local_statement( + ctx, + legacy_v1_state, + relay_parent, + StatementWithPVD::drop_pvd_from_signed(statement), + &mut self.rng, + metrics, + ) + .await?; } - } - - let info = runtime.get_session_info(ctx.sender(), relay_parent).await?; - let session_info = &info.session_info; - let validator_info = &info.validator_info; - - // Get peers in our group, so we can make sure they get our statement - // directly: - let group_peers = { - if let Some(our_group) = validator_info.our_group { - let our_group = &session_info.validator_groups[our_group.0 as usize]; + }, + StatementDistributionMessage::NetworkBridgeUpdate(event) => { + // pass to legacy, but not if the message isn't + // v1. + let legacy = match &event { + &NetworkBridgeEvent::PeerMessage(_, ref message) => match message { + Versioned::VStaging(protocol_vstaging::StatementDistributionMessage::V1Compatibility(_)) => true, + Versioned::V1(_) => true, + // TODO [now]: _ => false, + }, + _ => true, + }; - our_group - .into_iter() - .filter_map(|i| { - if Some(*i) == validator_info.our_index { - return None - } - let authority_id = &session_info.discovery_keys[i.0 as usize]; - authorities.get(authority_id).map(|p| *p) - }) - .collect() - } else { - Vec::new() + if legacy { + crate::legacy_v1::handle_network_update( + ctx, + legacy_v1_state, + v1_req_sender, + event, + &mut self.rng, + metrics, + ) + .await; } - }; - circulate_statement_and_dependents( - topology_storage, - peers, - active_heads, - ctx, - relay_parent, - statement, - group_peers, - metrics, - &mut self.rng, - ) - .await; - }, - StatementDistributionMessage::NetworkBridgeUpdate(event) => { - handle_network_update( - peers, - topology_storage, - authorities, - active_heads, - &*recent_outdated_heads, - ctx, - req_sender, - event, - metrics, - runtime, - &mut self.rng, - ) - .await; + + // TODO [now]: pass to vstaging, but not if the message is + // v1 or the connecting peer is v1. + }, }, - }, } Ok(false) } } - -/// Check whether a peer knows about a candidate from us. -/// -/// If not, it is deemed illegal for it to request corresponding data from us. -fn requesting_peer_knows_about_candidate( - peers: &HashMap, - requesting_peer: &PeerId, - relay_parent: &Hash, - candidate_hash: &CandidateHash, -) -> JfyiErrorResult { - let peer_data = peers - .get(requesting_peer) - .ok_or_else(|| JfyiError::NoSuchPeer(*requesting_peer))?; - let knowledge = peer_data - .view_knowledge - .get(relay_parent) - .ok_or_else(|| JfyiError::NoSuchHead(*relay_parent))?; - Ok(knowledge.sent_candidates.get(&candidate_hash).is_some()) -} diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index 78477ed0e2d4..59a7c3a57e98 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -930,7 +930,7 @@ fn test_prospective_parachains_msg() -> ProspectiveParachainsMessage { // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - const NUM_SUBSYSTEMS: usize = 22; + const NUM_SUBSYSTEMS: usize = 23; // -4 for BitfieldSigning, GossipSupport, AvailabilityDistribution and PvfCheckerSubsystem. const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 4; diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index cd3653ce4ae9..ae961701a8d6 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -180,6 +180,14 @@ impl Statement { Statement::Valid(hash) => CompactStatement::Valid(hash), } } + + /// Add the [`PersistedValidationData`] to the statement, if seconded. + pub fn supply_pvd(self, pvd: PersistedValidationData) -> StatementWithPVD { + match self { + Statement::Seconded(c) => StatementWithPVD::Seconded(c, pvd), + Statement::Valid(hash) => StatementWithPVD::Valid(hash), + } + } } impl From<&'_ Statement> for CompactStatement { diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 7c907506803c..2ff8454acc3a 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -732,7 +732,7 @@ impl RuntimeApiMessage { pub enum StatementDistributionMessage { /// We have originated a signed statement in the context of /// given relay-parent hash and it should be distributed to other validators. - Share(Hash, SignedFullStatement), + Share(Hash, SignedFullStatementWithPVD), /// Event from the network bridge. #[from] NetworkBridgeUpdate(NetworkBridgeEvent), diff --git a/roadmap/implementers-guide/src/node/backing/candidate-backing.md b/roadmap/implementers-guide/src/node/backing/candidate-backing.md index 6c3eace313c3..0eee0cc532ef 100644 --- a/roadmap/implementers-guide/src/node/backing/candidate-backing.md +++ b/roadmap/implementers-guide/src/node/backing/candidate-backing.md @@ -130,7 +130,7 @@ Dispatch a `CandidateValidationMessage::Validate(validation function, candidate, ### Distribute Signed Statement -Dispatch a [`StatementDistributionMessage`][SDM]`::Share(relay_parent, SignedFullStatement)`. +Dispatch a [`StatementDistributionMessage`][SDM]`::Share(relay_parent, SignedFullStatementWithPVD)`. [OverseerSignal]: ../../types/overseer-protocol.md#overseer-signal [Statement]: ../../types/backing.md#statement-type diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index b2559c4cfda7..f47fefe23097 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -753,7 +753,7 @@ enum StatementDistributionMessage { /// /// The statement distribution subsystem assumes that the statement should be correctly /// signed. - Share(Hash, SignedFullStatement), + Share(Hash, SignedFullStatementWithPVD), } ```