diff --git a/.gitlab/pipeline/publish.yml b/.gitlab/pipeline/publish.yml index 8b27c7247487..44cd1933a9cf 100644 --- a/.gitlab/pipeline/publish.yml +++ b/.gitlab/pipeline/publish.yml @@ -76,6 +76,8 @@ publish-subsystem-benchmarks: artifacts: true - job: subsystem-benchmark-approval-voting artifacts: true + - job: subsystem-benchmark-statement-distribution + artifacts: true - job: publish-rustdoc artifacts: false script: @@ -119,6 +121,8 @@ trigger_workflow: artifacts: true - job: subsystem-benchmark-approval-voting artifacts: true + - job: subsystem-benchmark-statement-distribution + artifacts: true script: - echo "Triggering workflow" - > diff --git a/.gitlab/pipeline/test.yml b/.gitlab/pipeline/test.yml index eea561ff8baa..fbb4dafc370d 100644 --- a/.gitlab/pipeline/test.yml +++ b/.gitlab/pipeline/test.yml @@ -619,3 +619,10 @@ subsystem-benchmark-approval-voting: script: - cargo bench -p polkadot-node-core-approval-voting --bench approval-voting-regression-bench --features subsystem-benchmarks allow_failure: true + +subsystem-benchmark-statement-distribution: + extends: + - .subsystem-benchmark-template + script: + - cargo bench -p polkadot-statement-distribution --bench statement-distribution-regression-bench --features subsystem-benchmarks + allow_failure: true diff --git a/Cargo.lock b/Cargo.lock index 642fe88db006..ba028f2c569a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14097,6 +14097,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-primitives", "polkadot-primitives-test-helpers", + "polkadot-subsystem-bench", "rand_chacha 0.3.1", "sc-keystore", "sc-network", @@ -14161,6 +14162,7 @@ dependencies = [ "polkadot-overseer", "polkadot-primitives", "polkadot-primitives-test-helpers", + "polkadot-statement-distribution", "prometheus", "pyroscope", "pyroscope_pprofrs", diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml index 1fe761bd0e3b..65224f9e2be6 100644 --- a/polkadot/node/network/statement-distribution/Cargo.toml +++ b/polkadot/node/network/statement-distribution/Cargo.toml @@ -42,3 +42,13 @@ sc-network = { path = "../../../../substrate/client/network" } futures-timer = "3.0.2" polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" } rand_chacha = "0.3" +polkadot-subsystem-bench = { path = "../../subsystem-bench" } + +[[bench]] +name = "statement-distribution-regression-bench" +path = "benches/statement-distribution-regression-bench.rs" +harness = false +required-features = ["subsystem-benchmarks"] + +[features] +subsystem-benchmarks = [] diff --git a/polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs b/polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs new file mode 100644 index 000000000000..abcb1e6783fb --- /dev/null +++ b/polkadot/node/network/statement-distribution/benches/statement-distribution-regression-bench.rs @@ -0,0 +1,78 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! statement-distribution regression tests +//! +//! Statement distribution benchmark based on Kusama parameters and scale. + +use polkadot_subsystem_bench::{ + configuration::TestConfiguration, + statement::{benchmark_statement_distribution, prepare_test, TestState}, + usage::BenchmarkUsage, + utils::save_to_file, +}; +use std::io::Write; + +const BENCH_COUNT: usize = 50; + +fn main() -> Result<(), String> { + let mut messages = vec![]; + let mut config = TestConfiguration::default(); + config.n_cores = 100; + config.n_validators = 500; + config.num_blocks = 10; + config.connectivity = 100; + config.generate_pov_sizes(); + let state = TestState::new(&config); + + println!("Benchmarking..."); + let usages: Vec = (0..BENCH_COUNT) + .map(|n| { + print!("\r[{}{}]", "#".repeat(n), "_".repeat(BENCH_COUNT - n)); + std::io::stdout().flush().unwrap(); + let (mut env, _cfgs) = prepare_test(&state, false); + env.runtime().block_on(benchmark_statement_distribution( + "statement-distribution", + &mut env, + &state, + )) + }) + .collect(); + println!("\rDone!{}", " ".repeat(BENCH_COUNT)); + + let average_usage = BenchmarkUsage::average(&usages); + save_to_file( + "charts/statement-distribution-regression-bench.json", + average_usage.to_chart_json().map_err(|e| e.to_string())?, + ) + .map_err(|e| e.to_string())?; + println!("{}", average_usage); + + // We expect no variance for received and sent + // but use 0.001 because we operate with floats + messages.extend(average_usage.check_network_usage(&[ + ("Received from peers", 106.4000, 0.001), + ("Sent to peers", 127.9100, 0.001), + ])); + messages.extend(average_usage.check_cpu_usage(&[("statement-distribution", 0.0390, 0.1)])); + + if messages.is_empty() { + Ok(()) + } else { + eprintln!("{}", messages.join("\n")); + Err("Regressions found".to_string()) + } +} diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 4ca199c3378b..4d56c795f13b 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -19,7 +19,6 @@ //! This is responsible for distributing signed statements about candidate //! validity among validators. -#![deny(unused_crate_dependencies)] #![warn(missing_docs)] use error::{log_error, FatalResult}; diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 37c6681b2734..21eaed832c4b 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -28,6 +28,7 @@ polkadot-primitives = { path = "../../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-availability-recovery = { path = "../network/availability-recovery", features = ["subsystem-benchmarks"] } polkadot-availability-distribution = { path = "../network/availability-distribution" } +polkadot-statement-distribution = { path = "../network/statement-distribution" } polkadot-node-core-av-store = { path = "../core/av-store" } polkadot-node-core-chain-api = { path = "../core/chain-api" } polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution" } diff --git a/polkadot/node/subsystem-bench/examples/statement_distribution.yaml b/polkadot/node/subsystem-bench/examples/statement_distribution.yaml new file mode 100644 index 000000000000..e86669ffefc3 --- /dev/null +++ b/polkadot/node/subsystem-bench/examples/statement_distribution.yaml @@ -0,0 +1,5 @@ +TestConfiguration: +- objective: StatementDistribution + num_blocks: 10 + n_cores: 100 + n_validators: 500 diff --git a/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs b/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs index 10953b6c7839..1e921500a4d2 100644 --- a/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs +++ b/polkadot/node/subsystem-bench/src/cli/subsystem-bench.rs @@ -20,7 +20,7 @@ use clap::Parser; use color_eyre::eyre; use colored::Colorize; -use polkadot_subsystem_bench::{approval, availability, configuration}; +use polkadot_subsystem_bench::{approval, availability, configuration, statement}; use pyroscope::PyroscopeAgent; use pyroscope_pprofrs::{pprof_backend, PprofConfig}; use serde::{Deserialize, Serialize}; @@ -40,6 +40,8 @@ pub enum TestObjective { DataAvailabilityWrite, /// Benchmark the approval-voting and approval-distribution subsystems. ApprovalVoting(approval::ApprovalsOptions), + // Benchmark the statement-distribution subsystem + StatementDistribution, } impl std::fmt::Display for TestObjective { @@ -51,6 +53,7 @@ impl std::fmt::Display for TestObjective { Self::DataAvailabilityRead(_) => "DataAvailabilityRead", Self::DataAvailabilityWrite => "DataAvailabilityWrite", Self::ApprovalVoting(_) => "ApprovalVoting", + Self::StatementDistribution => "StatementDistribution", } ) } @@ -170,6 +173,15 @@ impl BenchCli { state, )) }, + TestObjective::StatementDistribution => { + let state = statement::TestState::new(&test_config); + let (mut env, _protocol_config) = statement::prepare_test(&state, true); + env.runtime().block_on(statement::benchmark_statement_distribution( + &benchmark_name, + &mut env, + &state, + )) + }, }; println!("{}", usage); } diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index 6ac0776d2d35..4a479b6af29e 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -30,7 +30,7 @@ use crate::{ mock::{ chain_api::{ChainApiState, MockChainApi}, network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx}, - runtime_api::MockRuntimeApi, + runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState}, AlwaysSupportsParachains, TestSyncOracle, }, network::{ @@ -465,8 +465,9 @@ impl ApprovalTestState { } } +#[async_trait::async_trait] impl HandleNetworkMessage for ApprovalTestState { - fn handle( + async fn handle( &self, _message: crate::network::NetworkMessage, _node_sender: &mut futures::channel::mpsc::UnboundedSender, @@ -807,6 +808,7 @@ fn build_overseer( state.candidate_events_by_block(), Some(state.babe_epoch.clone()), 1, + MockRuntimeApiCoreState::Occupied, ); let mock_tx_bridge = MockNetworkBridgeTx::new( network.clone(), @@ -915,7 +917,9 @@ pub async fn bench_approvals_run( // First create the initialization messages that make sure that then node under // tests receives notifications about the topology used and the connected peers. - let mut initialization_messages = env.network().generate_peer_connected(); + let mut initialization_messages = env.network().generate_peer_connected(|e| { + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(e)) + }); initialization_messages.extend(generate_new_session_topology( &state.test_authorities, ValidatorIndex(NODE_UNDER_TEST), diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs index 5b93c3d862de..f7d65589565b 100644 --- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs @@ -22,7 +22,7 @@ use crate::{ av_store::{self, MockAvailabilityStore, NetworkAvailabilityState}, chain_api::{ChainApiState, MockChainApi}, network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx}, - runtime_api::{self, MockRuntimeApi}, + runtime_api::{self, MockRuntimeApi, MockRuntimeApiCoreState}, AlwaysSupportsParachains, }, network::new_network, @@ -189,6 +189,7 @@ pub fn prepare_test( Default::default(), Default::default(), 0, + MockRuntimeApiCoreState::Occupied, ); let (overseer, overseer_handle) = match &mode { diff --git a/polkadot/node/subsystem-bench/src/lib/configuration.rs b/polkadot/node/subsystem-bench/src/lib/configuration.rs index 1e0efb72a7df..f614a5e552a8 100644 --- a/polkadot/node/subsystem-bench/src/lib/configuration.rs +++ b/polkadot/node/subsystem-bench/src/lib/configuration.rs @@ -18,12 +18,13 @@ use crate::keyring::Keyring; use itertools::Itertools; -use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId}; +use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId, ValidatorPair}; use rand::thread_rng; use rand_distr::{Distribution, Normal, Uniform}; use sc_network_types::PeerId; use serde::{Deserialize, Serialize}; use sp_consensus_babe::AuthorityId; +use sp_core::Pair; use std::collections::HashMap; /// Peer networking latency configuration. @@ -89,6 +90,15 @@ fn default_n_delay_tranches() -> usize { fn default_no_show_slots() -> usize { 3 } +fn default_minimum_backing_votes() -> u32 { + 2 +} +fn default_max_candidate_depth() -> u32 { + 3 +} +fn default_allowed_ancestry_len() -> u32 { + 2 +} /// The test input parameters #[derive(Clone, Debug, Serialize, Deserialize)] @@ -137,6 +147,15 @@ pub struct TestConfiguration { pub connectivity: usize, /// Number of blocks to run the test for pub num_blocks: usize, + /// Number of minimum backing votes + #[serde(default = "default_minimum_backing_votes")] + pub minimum_backing_votes: u32, + /// Async Backing max_candidate_depth + #[serde(default = "default_max_candidate_depth")] + pub max_candidate_depth: u32, + /// Async Backing allowed_ancestry_len + #[serde(default = "default_allowed_ancestry_len")] + pub allowed_ancestry_len: u32, } impl Default for TestConfiguration { @@ -158,6 +177,9 @@ impl Default for TestConfiguration { latency: default_peer_latency(), connectivity: default_connectivity(), num_blocks: Default::default(), + minimum_backing_votes: default_minimum_backing_votes(), + max_candidate_depth: default_max_candidate_depth(), + allowed_ancestry_len: default_allowed_ancestry_len(), } } } @@ -208,6 +230,11 @@ impl TestConfiguration { .map(|(peer_id, authority_id)| (*peer_id, authority_id.clone())) .collect(); + let validator_pairs = key_seeds + .iter() + .map(|seed| ValidatorPair::from_string_with_seed(seed, None).unwrap().0) + .collect(); + TestAuthorities { keyring, validator_public, @@ -217,6 +244,7 @@ impl TestConfiguration { validator_assignment_id, key_seeds, peer_id_to_authority, + validator_pairs, } } } @@ -246,6 +274,7 @@ pub struct TestAuthorities { pub key_seeds: Vec, pub peer_ids: Vec, pub peer_id_to_authority: HashMap, + pub validator_pairs: Vec, } /// Sample latency (in milliseconds) from a normal distribution with parameters diff --git a/polkadot/node/subsystem-bench/src/lib/lib.rs b/polkadot/node/subsystem-bench/src/lib/lib.rs index ef2724abc989..e18227af8be3 100644 --- a/polkadot/node/subsystem-bench/src/lib/lib.rs +++ b/polkadot/node/subsystem-bench/src/lib/lib.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -// The validator index that represent the node that is under test. +// The validator index that represents the node that is under test. pub const NODE_UNDER_TEST: u32 = 0; pub mod approval; @@ -25,5 +25,6 @@ pub(crate) mod environment; pub(crate) mod keyring; pub(crate) mod mock; pub(crate) mod network; +pub mod statement; pub mod usage; pub mod utils; diff --git a/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs b/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs index fba33523be85..a035bf018977 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/av_store.rs @@ -49,8 +49,9 @@ pub struct NetworkAvailabilityState { } // Implement access to the state. +#[async_trait::async_trait] impl HandleNetworkMessage for NetworkAvailabilityState { - fn handle( + async fn handle( &self, message: NetworkMessage, _node_sender: &mut futures::channel::mpsc::UnboundedSender, diff --git a/polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs b/polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs new file mode 100644 index 000000000000..51494016e185 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/lib/mock/candidate_backing.rs @@ -0,0 +1,171 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A generic candidate backing subsystem mockup suitable to be used in benchmarks. + +use crate::{configuration::TestConfiguration, NODE_UNDER_TEST}; +use futures::FutureExt; +use polkadot_node_primitives::{SignedFullStatementWithPVD, Statement, StatementWithPVD}; +use polkadot_node_subsystem::{ + messages::CandidateBackingMessage, overseer, SpawnedSubsystem, SubsystemError, +}; +use polkadot_node_subsystem_types::OverseerSignal; +use polkadot_primitives::{ + CandidateHash, Hash, PersistedValidationData, SigningContext, ValidatorIndex, ValidatorPair, +}; +use sp_core::Pair; +use std::collections::HashMap; + +const LOG_TARGET: &str = "subsystem-bench::candidate-backing-mock"; + +struct MockCandidateBackingState { + pair: ValidatorPair, + pvd: PersistedValidationData, + own_backing_group: Vec, +} + +pub struct MockCandidateBacking { + config: TestConfiguration, + state: MockCandidateBackingState, +} + +impl MockCandidateBacking { + pub fn new( + config: TestConfiguration, + pair: ValidatorPair, + pvd: PersistedValidationData, + own_backing_group: Vec, + ) -> Self { + Self { config, state: MockCandidateBackingState { pair, pvd, own_backing_group } } + } + + fn handle_statement( + &self, + relay_parent: Hash, + statement: SignedFullStatementWithPVD, + statements_tracker: &mut HashMap, + ) -> Vec { + let mut messages = vec![]; + let validator_id = statement.validator_index(); + let is_own_backing_group = self.state.own_backing_group.contains(&validator_id); + + match statement.payload() { + StatementWithPVD::Seconded(receipt, _pvd) => { + let candidate_hash = receipt.hash(); + statements_tracker + .entry(candidate_hash) + .and_modify(|v| { + *v += 1; + }) + .or_insert(1); + + let statements_received_count = *statements_tracker.get(&candidate_hash).unwrap(); + if statements_received_count == (self.config.minimum_backing_votes - 1) && + is_own_backing_group + { + let statement = Statement::Valid(candidate_hash); + let context = SigningContext { parent_hash: relay_parent, session_index: 0 }; + let payload = statement.to_compact().signing_payload(&context); + let message = + polkadot_node_subsystem::messages::StatementDistributionMessage::Share( + relay_parent, + SignedFullStatementWithPVD::new( + statement.supply_pvd(self.state.pvd.clone()), + ValidatorIndex(NODE_UNDER_TEST), + self.state.pair.sign(&payload[..]), + &context, + &self.state.pair.public(), + ) + .unwrap(), + ); + messages.push(message); + } + + if statements_received_count == self.config.minimum_backing_votes { + let message = + polkadot_node_subsystem::messages::StatementDistributionMessage::Backed( + candidate_hash, + ); + messages.push(message); + } + }, + StatementWithPVD::Valid(candidate_hash) => { + statements_tracker + .entry(*candidate_hash) + .and_modify(|v| { + *v += 1; + }) + .or_insert(1); + + let statements_received_count = *statements_tracker.get(candidate_hash).unwrap(); + if statements_received_count == self.config.minimum_backing_votes { + let message = + polkadot_node_subsystem::messages::StatementDistributionMessage::Backed( + *candidate_hash, + ); + messages.push(message); + } + }, + } + + messages + } +} + +#[overseer::subsystem(CandidateBacking, error=SubsystemError, prefix=self::overseer)] +impl MockCandidateBacking { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = self.run(ctx).map(|_| Ok(())).boxed(); + + SpawnedSubsystem { name: "test-environment", future } + } +} + +#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)] +impl MockCandidateBacking { + async fn run(self, mut ctx: Context) { + let mut statements_tracker: HashMap = Default::default(); + + loop { + let msg = ctx.recv().await.expect("Overseer never fails us"); + match msg { + orchestra::FromOrchestra::Signal(signal) => + if signal == OverseerSignal::Conclude { + return + }, + orchestra::FromOrchestra::Communication { msg } => { + gum::trace!(target: LOG_TARGET, msg=?msg, "recv message"); + + match msg { + CandidateBackingMessage::Statement(relay_parent, statement) => { + let messages = self.handle_statement( + relay_parent, + statement, + &mut statements_tracker, + ); + for message in messages { + ctx.send_message(message).await; + } + }, + _ => { + unimplemented!("Unexpected candidate-backing message") + }, + } + }, + } + } + } +} diff --git a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs index 6dda9a47d398..12766374bfa9 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs @@ -19,9 +19,11 @@ use polkadot_node_subsystem_types::Hash; use sp_consensus::SyncOracle; pub mod av_store; +pub mod candidate_backing; pub mod chain_api; pub mod dummy; pub mod network_bridge; +pub mod prospective_parachains; pub mod runtime_api; pub struct AlwaysSupportsParachains {} diff --git a/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs b/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs index ec66ad4e279c..10508f456a48 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs @@ -27,14 +27,19 @@ use polkadot_node_subsystem::{ messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_types::{ - messages::{ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent}, + messages::{ + ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent, + StatementDistributionMessage, + }, OverseerSignal, }; use sc_network::{request_responses::ProtocolConfig, RequestFailure}; const LOG_TARGET: &str = "subsystem-bench::network-bridge"; -const CHUNK_REQ_PROTOCOL_NAME_V1: &str = - "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1"; +const ALLOWED_PROTOCOLS: &[&str] = &[ + "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1", + "/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_attested_candidate/2", +]; /// A mock of the network bridge tx subsystem. pub struct MockNetworkBridgeTx { @@ -106,8 +111,15 @@ impl MockNetworkBridgeTx { NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => { for request in requests { gum::debug!(target: LOG_TARGET, request = ?request, "Processing request"); - let peer_id = - request.authority_id().expect("all nodes are authorities").clone(); + let peer_id = match request.authority_id() { + Some(v) => v.clone(), + None => self + .test_authorities + .peer_id_to_authority + .get(request.peer_id().expect("Should exist")) + .expect("Should exist") + .clone(), + }; if !self.network.is_peer_connected(&peer_id) { // Attempting to send a request to a disconnected peer. @@ -141,7 +153,23 @@ impl MockNetworkBridgeTx { .expect("Should not fail"); } }, - _ => unimplemented!("Unexpected network bridge message"), + NetworkBridgeTxMessage::SendValidationMessages(messages) => { + for (peers, message) in messages { + for peer in peers { + self.to_network_interface + .unbounded_send(NetworkMessage::MessageFromNode( + self.test_authorities + .peer_id_to_authority + .get(&peer) + .unwrap() + .clone(), + message.clone(), + )) + .expect("Should not fail"); + } + } + }, + message => unimplemented!("Unexpected network bridge message {:?}", message), }, } } @@ -175,13 +203,20 @@ impl MockNetworkBridgeRx { ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg))) ).await; } + Versioned::V3( + polkadot_node_network_protocol::v3::ValidationProtocol::StatementDistribution(msg) + ) => { + ctx.send_message( + StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg))) + ).await; + } _ => { unimplemented!("We only talk v2 network protocol") }, }, NetworkMessage::RequestFromPeer(request) => { if let Some(protocol) = self.chunk_request_sender.as_mut() { - assert_eq!(&*protocol.name, CHUNK_REQ_PROTOCOL_NAME_V1); + assert!(ALLOWED_PROTOCOLS.contains(&&*protocol.name)); if let Some(inbound_queue) = protocol.inbound_queue.as_ref() { inbound_queue .send(request) diff --git a/polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs b/polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs new file mode 100644 index 000000000000..8a865af21a07 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/lib/mock/prospective_parachains.rs @@ -0,0 +1,74 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A generic prospective parachains subsystem mockup suitable to be used in benchmarks. + +use futures::FutureExt; +use polkadot_node_subsystem::{ + messages::ProspectiveParachainsMessage, overseer, SpawnedSubsystem, SubsystemError, +}; +use polkadot_node_subsystem_types::OverseerSignal; +use polkadot_primitives::Hash; + +pub struct MockProspectiveParachains {} + +impl MockProspectiveParachains { + pub fn new() -> Self { + Self {} + } +} + +#[overseer::subsystem(ProspectiveParachains, error=SubsystemError, prefix=self::overseer)] +impl MockProspectiveParachains { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = self.run(ctx).map(|_| Ok(())).boxed(); + + SpawnedSubsystem { name: "test-environment", future } + } +} + +#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)] +impl MockProspectiveParachains { + async fn run(self, mut ctx: Context) { + loop { + let msg = ctx.recv().await.expect("Overseer never fails us"); + match msg { + orchestra::FromOrchestra::Signal(signal) => + if signal == OverseerSignal::Conclude { + return + }, + orchestra::FromOrchestra::Communication { msg } => match msg { + ProspectiveParachainsMessage::GetMinimumRelayParents(_relay_parent, tx) => { + tx.send(vec![]).unwrap(); + }, + ProspectiveParachainsMessage::GetHypotheticalMembership(req, tx) => { + tx.send( + req.candidates + .iter() + .cloned() + .map(|candidate| (candidate, vec![Hash::repeat_byte(0)])) + .collect(), + ) + .unwrap(); + }, + _ => { + unimplemented!("Unexpected chain-api message") + }, + }, + } + } + } +} diff --git a/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs b/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs index b73d61321cd3..9788a1123ec0 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/runtime_api.rs @@ -26,8 +26,9 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_types::OverseerSignal; use polkadot_primitives::{ - CandidateEvent, CandidateReceipt, CoreState, GroupIndex, IndexedVec, NodeFeatures, - OccupiedCore, SessionIndex, SessionInfo, ValidatorIndex, + AsyncBackingParams, CandidateEvent, CandidateReceipt, CoreState, GroupIndex, GroupRotationInfo, + IndexedVec, NodeFeatures, OccupiedCore, ScheduledCore, SessionIndex, SessionInfo, + ValidatorIndex, }; use sp_consensus_babe::Epoch as BabeEpoch; use sp_core::H256; @@ -49,11 +50,20 @@ pub struct RuntimeApiState { session_index: SessionIndex, } +#[derive(Clone)] +pub enum MockRuntimeApiCoreState { + Occupied, + Scheduled, + #[allow(dead_code)] + Free, +} + /// A mocked `runtime-api` subsystem. #[derive(Clone)] pub struct MockRuntimeApi { state: RuntimeApiState, config: TestConfiguration, + core_state: MockRuntimeApiCoreState, } impl MockRuntimeApi { @@ -64,6 +74,7 @@ impl MockRuntimeApi { included_candidates: HashMap>, babe_epoch: Option, session_index: SessionIndex, + core_state: MockRuntimeApiCoreState, ) -> MockRuntimeApi { Self { state: RuntimeApiState { @@ -74,6 +85,7 @@ impl MockRuntimeApi { session_index, }, config, + core_state, } } @@ -198,16 +210,26 @@ impl MockRuntimeApi { // Ensure test breaks if badly configured. assert!(index < validator_group_count); - CoreState::Occupied(OccupiedCore { - next_up_on_available: None, - occupied_since: 0, - time_out_at: 0, - next_up_on_time_out: None, - availability: BitVec::default(), - group_responsible: GroupIndex(index as u32), - candidate_hash: candidate_receipt.hash(), - candidate_descriptor: candidate_receipt.descriptor.clone(), - }) + use MockRuntimeApiCoreState::*; + match self.core_state { + Occupied => CoreState::Occupied(OccupiedCore { + next_up_on_available: None, + occupied_since: 0, + time_out_at: 0, + next_up_on_time_out: None, + availability: BitVec::default(), + group_responsible: GroupIndex(index as u32), + candidate_hash: candidate_receipt.hash(), + candidate_descriptor: candidate_receipt + .descriptor + .clone(), + }), + Scheduled => CoreState::Scheduled(ScheduledCore { + para_id: (index + 1).into(), + collator: None, + }), + Free => todo!(), + } }) .collect::>(); @@ -223,6 +245,43 @@ impl MockRuntimeApi { .clone() .expect("Babe epoch unpopulated"))); }, + RuntimeApiMessage::Request( + _block_hash, + RuntimeApiRequest::AsyncBackingParams(sender), + ) => { + let _ = sender.send(Ok(AsyncBackingParams { + max_candidate_depth: self.config.max_candidate_depth, + allowed_ancestry_len: self.config.allowed_ancestry_len, + })); + }, + RuntimeApiMessage::Request(_parent, RuntimeApiRequest::Version(tx)) => { + tx.send(Ok(RuntimeApiRequest::DISABLED_VALIDATORS_RUNTIME_REQUIREMENT)) + .unwrap(); + }, + RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::DisabledValidators(tx), + ) => { + tx.send(Ok(vec![])).unwrap(); + }, + RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::MinimumBackingVotes(_session_index, tx), + ) => { + tx.send(Ok(self.config.minimum_backing_votes)).unwrap(); + }, + RuntimeApiMessage::Request( + _parent, + RuntimeApiRequest::ValidatorGroups(tx), + ) => { + let groups = self.session_info().validator_groups.to_vec(); + let group_rotation_info = GroupRotationInfo { + session_start_block: 1, + group_rotation_frequency: 12, + now: 1, + }; + tx.send(Ok((groups, group_rotation_info))).unwrap(); + }, // Long term TODO: implement more as needed. message => { unimplemented!("Unexpected runtime-api message: {:?}", message) diff --git a/polkadot/node/subsystem-bench/src/lib/network.rs b/polkadot/node/subsystem-bench/src/lib/network.rs index 9bf2415e5a86..9686f456b9e6 100644 --- a/polkadot/node/subsystem-bench/src/lib/network.rs +++ b/polkadot/node/subsystem-bench/src/lib/network.rs @@ -51,13 +51,14 @@ use futures::{ }; use itertools::Itertools; use net_protocol::{ - peer_set::{ProtocolVersion, ValidationVersion}, + peer_set::ValidationVersion, request_response::{Recipient, Requests, ResponseSender}, - ObservedRole, VersionedValidationProtocol, + ObservedRole, VersionedValidationProtocol, View, }; use parity_scale_codec::Encode; use polkadot_node_network_protocol::{self as net_protocol, Versioned}; -use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, NetworkBridgeEvent}; +use polkadot_node_subsystem::messages::StatementDistributionMessage; +use polkadot_node_subsystem_types::messages::NetworkBridgeEvent; use polkadot_node_subsystem_util::metrics::prometheus::{ self, CounterVec, Opts, PrometheusError, Registry, }; @@ -437,6 +438,7 @@ pub struct EmulatedPeerHandle { /// Send actions to be performed by the peer. actions_tx: UnboundedSender, peer_id: PeerId, + authority_id: AuthorityDiscoveryId, } impl EmulatedPeerHandle { @@ -496,29 +498,31 @@ impl EmulatedPeer { } /// Interceptor pattern for handling messages. +#[async_trait::async_trait] pub trait HandleNetworkMessage { /// Returns `None` if the message was handled, or the `message` /// otherwise. /// /// `node_sender` allows sending of messages to the node in response /// to the handled message. - fn handle( + async fn handle( &self, message: NetworkMessage, node_sender: &mut UnboundedSender, ) -> Option; } +#[async_trait::async_trait] impl HandleNetworkMessage for Arc where - T: HandleNetworkMessage, + T: HandleNetworkMessage + Sync + Send, { - fn handle( + async fn handle( &self, message: NetworkMessage, node_sender: &mut UnboundedSender, ) -> Option { - self.as_ref().handle(message, node_sender) + T::handle(self, message, node_sender).await } } @@ -551,7 +555,7 @@ async fn emulated_peer_loop( for handler in handlers.iter() { // The check below guarantees that message is always `Some`: we are still // inside the loop. - message = handler.handle(message.unwrap(), &mut to_network_interface); + message = handler.handle(message.unwrap(), &mut to_network_interface).await; if message.is_none() { break } @@ -613,6 +617,7 @@ async fn emulated_peer_loop( } /// Creates a new peer emulator task and returns a handle to it. +#[allow(clippy::too_many_arguments)] pub fn new_peer( bandwidth: usize, spawn_task_handle: SpawnTaskHandle, @@ -621,6 +626,7 @@ pub fn new_peer( to_network_interface: UnboundedSender, latency_ms: usize, peer_id: PeerId, + authority_id: AuthorityDiscoveryId, ) -> EmulatedPeerHandle { let (messages_tx, messages_rx) = mpsc::unbounded::(); let (actions_tx, actions_rx) = mpsc::unbounded::(); @@ -649,7 +655,7 @@ pub fn new_peer( .boxed(), ); - EmulatedPeerHandle { messages_tx, actions_tx, peer_id } + EmulatedPeerHandle { messages_tx, actions_tx, peer_id, authority_id } } /// Book keeping of sent and received bytes. @@ -714,6 +720,18 @@ impl Peer { Peer::Disconnected(ref emulator) => emulator, } } + + pub fn authority_id(&self) -> AuthorityDiscoveryId { + match self { + Peer::Connected(handle) | Peer::Disconnected(handle) => handle.authority_id.clone(), + } + } + + pub fn peer_id(&self) -> PeerId { + match self { + Peer::Connected(handle) | Peer::Disconnected(handle) => handle.peer_id, + } + } } /// A ha emulated network implementation. @@ -728,21 +746,34 @@ pub struct NetworkEmulatorHandle { } impl NetworkEmulatorHandle { + pub fn generate_statement_distribution_peer_view_change(&self, view: View) -> Vec { + self.peers + .iter() + .filter(|peer| peer.is_connected()) + .map(|peer| { + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(peer.peer_id(), view.clone()), + ), + ) + }) + .collect_vec() + } + /// Generates peer_connected messages for all peers in `test_authorities` - pub fn generate_peer_connected(&self) -> Vec { + pub fn generate_peer_connected(&self, mapper: F) -> Vec + where + F: Fn(NetworkBridgeEvent) -> AllMessages, + { self.peers .iter() .filter(|peer| peer.is_connected()) .map(|peer| { - let network = NetworkBridgeEvent::PeerConnected( + mapper(NetworkBridgeEvent::PeerConnected( peer.handle().peer_id, - ObservedRole::Full, - ProtocolVersion::from(ValidationVersion::V3), - None, - ); - - AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate( - network, + ObservedRole::Authority, + ValidationVersion::V3.into(), + Some(vec![peer.authority_id()].into_iter().collect()), )) }) .collect_vec() @@ -772,7 +803,7 @@ pub fn new_network( let (stats, mut peers): (_, Vec<_>) = (0..n_peers) .zip(authorities.validator_authority_id.clone()) .map(|(peer_index, authority_id)| { - validator_authority_id_mapping.insert(authority_id, peer_index); + validator_authority_id_mapping.insert(authority_id.clone(), peer_index); let stats = Arc::new(PeerEmulatorStats::new(peer_index, metrics.clone())); ( stats.clone(), @@ -784,6 +815,7 @@ pub fn new_network( to_network_interface.clone(), random_latency(config.latency.as_ref()), *authorities.peer_ids.get(peer_index).unwrap(), + authority_id, )), ) }) @@ -971,6 +1003,8 @@ impl Metrics { pub trait RequestExt { /// Get the authority id if any from the request. fn authority_id(&self) -> Option<&AuthorityDiscoveryId>; + /// Get the peer id if any from the request. + fn peer_id(&self) -> Option<&PeerId>; /// Consume self and return the response sender. fn into_response_sender(self) -> ResponseSender; /// Allows to change the `ResponseSender` in place. @@ -996,12 +1030,26 @@ impl RequestExt for Requests { None } }, + // Requested by PeerId + Requests::AttestedCandidateV2(_) => None, request => { unimplemented!("RequestAuthority not implemented for {:?}", request) }, } } + fn peer_id(&self) -> Option<&PeerId> { + match self { + Requests::AttestedCandidateV2(request) => match &request.peer { + Recipient::Authority(_) => None, + Recipient::Peer(peer_id) => Some(peer_id), + }, + request => { + unimplemented!("peer_id() is not implemented for {:?}", request) + }, + } + } + fn into_response_sender(self) -> ResponseSender { match self { Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.pending_response, @@ -1018,6 +1066,8 @@ impl RequestExt for Requests { std::mem::replace(&mut outgoing_request.pending_response, new_sender), Requests::AvailableDataFetchingV1(outgoing_request) => std::mem::replace(&mut outgoing_request.pending_response, new_sender), + Requests::AttestedCandidateV2(outgoing_request) => + std::mem::replace(&mut outgoing_request.pending_response, new_sender), _ => unimplemented!("unsupported request type"), } } @@ -1028,6 +1078,8 @@ impl RequestExt for Requests { Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.payload.encoded_size(), Requests::AvailableDataFetchingV1(outgoing_request) => outgoing_request.payload.encoded_size(), + Requests::AttestedCandidateV2(outgoing_request) => + outgoing_request.payload.encoded_size(), _ => unimplemented!("received an unexpected request"), } } diff --git a/polkadot/node/subsystem-bench/src/lib/statement/mod.rs b/polkadot/node/subsystem-bench/src/lib/statement/mod.rs new file mode 100644 index 000000000000..508dd9179f7b --- /dev/null +++ b/polkadot/node/subsystem-bench/src/lib/statement/mod.rs @@ -0,0 +1,450 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::{ + configuration::TestAuthorities, + dummy_builder, + environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH}, + mock::{ + candidate_backing::MockCandidateBacking, + chain_api::{ChainApiState, MockChainApi}, + network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx}, + prospective_parachains::MockProspectiveParachains, + runtime_api::{MockRuntimeApi, MockRuntimeApiCoreState}, + AlwaysSupportsParachains, + }, + network::{new_network, NetworkEmulatorHandle, NetworkInterface, NetworkInterfaceReceiver}, + usage::BenchmarkUsage, + NODE_UNDER_TEST, +}; +use bitvec::vec::BitVec; +use colored::Colorize; +use itertools::Itertools; +use polkadot_node_metrics::metrics::Metrics; +use polkadot_node_network_protocol::{ + grid_topology::{SessionGridTopology, TopologyPeerInfo}, + request_response::{IncomingRequest, ReqProtocolNames}, + v3::{self, BackedCandidateManifest, StatementFilter}, + view, Versioned, View, +}; +use polkadot_node_subsystem::messages::{ + network_bridge_event::NewGossipTopology, AllMessages, NetworkBridgeEvent, + StatementDistributionMessage, +}; +use polkadot_overseer::{ + Handle as OverseerHandle, Overseer, OverseerConnector, OverseerMetrics, SpawnGlue, +}; +use polkadot_primitives::{ + AuthorityDiscoveryId, Block, GroupIndex, Hash, Id, ValidatorId, ValidatorIndex, +}; +use polkadot_statement_distribution::StatementDistributionSubsystem; +use rand::SeedableRng; +use sc_keystore::LocalKeystore; +use sc_network::request_responses::ProtocolConfig; +use sc_network_types::PeerId; +use sc_service::SpawnTaskHandle; +use sp_keystore::{Keystore, KeystorePtr}; +use sp_runtime::RuntimeAppPublic; +use std::{ + sync::{atomic::Ordering, Arc}, + time::{Duration, Instant}, +}; +pub use test_state::TestState; + +mod test_state; + +const LOG_TARGET: &str = "subsystem-bench::statement"; + +pub fn make_keystore() -> KeystorePtr { + let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory()); + Keystore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some("//Node0")) + .expect("Insert key into keystore"); + Keystore::sr25519_generate_new(&*keystore, AuthorityDiscoveryId::ID, Some("//Node0")) + .expect("Insert key into keystore"); + keystore +} + +fn build_overseer( + state: &TestState, + network: NetworkEmulatorHandle, + network_interface: NetworkInterface, + network_receiver: NetworkInterfaceReceiver, + dependencies: &TestEnvironmentDependencies, +) -> ( + Overseer, AlwaysSupportsParachains>, + OverseerHandle, + Vec, +) { + let overseer_connector = OverseerConnector::with_event_capacity(64000); + let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap(); + let spawn_task_handle = dependencies.task_manager.spawn_handle(); + let mock_runtime_api = MockRuntimeApi::new( + state.config.clone(), + state.test_authorities.clone(), + state.candidate_receipts.clone(), + Default::default(), + Default::default(), + 0, + MockRuntimeApiCoreState::Scheduled, + ); + let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() }; + let mock_chain_api = MockChainApi::new(chain_api_state); + let mock_prospective_parachains = MockProspectiveParachains::new(); + let mock_candidate_backing = MockCandidateBacking::new( + state.config.clone(), + state + .test_authorities + .validator_pairs + .get(NODE_UNDER_TEST as usize) + .unwrap() + .clone(), + state.pvd.clone(), + state.own_backing_group.clone(), + ); + let (statement_req_receiver, statement_req_cfg) = IncomingRequest::get_config_receiver::< + Block, + sc_network::NetworkWorker, + >(&ReqProtocolNames::new(GENESIS_HASH, None)); + let (candidate_req_receiver, candidate_req_cfg) = IncomingRequest::get_config_receiver::< + Block, + sc_network::NetworkWorker, + >(&ReqProtocolNames::new(GENESIS_HASH, None)); + let keystore = make_keystore(); + let subsystem = StatementDistributionSubsystem::new( + keystore.clone(), + statement_req_receiver, + candidate_req_receiver, + Metrics::try_register(&dependencies.registry).unwrap(), + rand::rngs::StdRng::from_entropy(), + ); + let network_bridge_tx = MockNetworkBridgeTx::new( + network, + network_interface.subsystem_sender(), + state.test_authorities.clone(), + ); + let network_bridge_rx = MockNetworkBridgeRx::new(network_receiver, Some(candidate_req_cfg)); + + let dummy = dummy_builder!(spawn_task_handle, overseer_metrics) + .replace_runtime_api(|_| mock_runtime_api) + .replace_chain_api(|_| mock_chain_api) + .replace_prospective_parachains(|_| mock_prospective_parachains) + .replace_candidate_backing(|_| mock_candidate_backing) + .replace_statement_distribution(|_| subsystem) + .replace_network_bridge_tx(|_| network_bridge_tx) + .replace_network_bridge_rx(|_| network_bridge_rx); + let (overseer, raw_handle) = dummy.build_with_connector(overseer_connector).unwrap(); + let overseer_handle = OverseerHandle::new(raw_handle); + + (overseer, overseer_handle, vec![statement_req_cfg]) +} + +pub fn prepare_test( + state: &TestState, + with_prometheus_endpoint: bool, +) -> (TestEnvironment, Vec) { + let dependencies = TestEnvironmentDependencies::default(); + let (network, network_interface, network_receiver) = new_network( + &state.config, + &dependencies, + &state.test_authorities, + vec![Arc::new(state.clone())], + ); + let (overseer, overseer_handle, cfg) = + build_overseer(state, network.clone(), network_interface, network_receiver, &dependencies); + + ( + TestEnvironment::new( + dependencies, + state.config.clone(), + network, + overseer, + overseer_handle, + state.test_authorities.clone(), + with_prometheus_endpoint, + ), + cfg, + ) +} + +pub fn generate_peer_view_change(block_hash: Hash, peer_id: PeerId) -> AllMessages { + let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0)); + + AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(network)) +} + +pub fn generate_new_session_topology( + topology: &SessionGridTopology, + test_node: ValidatorIndex, +) -> Vec { + let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session: 0, + topology: topology.clone(), + local_index: Some(test_node), + }); + vec![AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate( + event, + ))] +} + +/// Generates a topology to be used for this benchmark. +pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology { + let keyrings = test_authorities + .validator_authority_id + .clone() + .into_iter() + .zip(test_authorities.peer_ids.clone()) + .collect_vec(); + + let topology = keyrings + .clone() + .into_iter() + .enumerate() + .map(|(index, (discovery_id, peer_id))| TopologyPeerInfo { + peer_ids: vec![peer_id], + validator_index: ValidatorIndex(index as u32), + discovery_id, + }) + .collect_vec(); + let shuffled = (0..keyrings.len()).collect_vec(); + + SessionGridTopology::new(shuffled, topology) +} + +pub async fn benchmark_statement_distribution( + benchmark_name: &str, + env: &mut TestEnvironment, + state: &TestState, +) -> BenchmarkUsage { + state.reset_trackers(); + + let connected_validators = state + .test_authorities + .validator_authority_id + .iter() + .enumerate() + .filter_map(|(i, id)| if env.network().is_peer_connected(id) { Some(i) } else { None }) + .collect_vec(); + let seconding_validator_in_own_backing_group = state + .own_backing_group + .iter() + .find(|v| connected_validators.contains(&(v.0 as usize))) + .unwrap() + .to_owned(); + + let config = env.config().clone(); + let groups = state.session_info.validator_groups.clone(); + let own_backing_group_index = groups + .iter() + .position(|group| group.iter().any(|v| v.0 == NODE_UNDER_TEST)) + .unwrap(); + + env.metrics().set_n_validators(config.n_validators); + env.metrics().set_n_cores(config.n_cores); + + let topology = generate_topology(&state.test_authorities); + let peer_connected_messages = env.network().generate_peer_connected(|e| { + AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(e)) + }); + let new_session_topology_messages = + generate_new_session_topology(&topology, ValidatorIndex(NODE_UNDER_TEST)); + for message in peer_connected_messages.into_iter().chain(new_session_topology_messages) { + env.send_message(message).await; + } + + let test_start = Instant::now(); + let mut candidates_advertised = 0; + for block_info in state.block_infos.iter() { + let block_num = block_info.number as usize; + gum::info!(target: LOG_TARGET, "Current block {}/{} {:?}", block_num, config.num_blocks, block_info.hash); + env.metrics().set_current_block(block_num); + env.import_block(block_info.clone()).await; + + for peer_view_change in env + .network() + .generate_statement_distribution_peer_view_change(view![block_info.hash]) + { + env.send_message(peer_view_change).await; + } + + let seconding_peer_id = *state + .test_authorities + .peer_ids + .get(seconding_validator_in_own_backing_group.0 as usize) + .unwrap(); + let candidate = state.candidate_receipts.get(&block_info.hash).unwrap().first().unwrap(); + let candidate_hash = candidate.hash(); + let statement = state + .statements + .get(&candidate_hash) + .unwrap() + .get(seconding_validator_in_own_backing_group.0 as usize) + .unwrap() + .clone(); + let message = AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( + seconding_peer_id, + Versioned::V3(v3::StatementDistributionMessage::Statement( + block_info.hash, + statement, + )), + )), + ); + env.send_message(message).await; + + let max_messages_per_candidate = state.config.max_candidate_depth + 1; + // One was just sent for the own backing group + let mut messages_tracker = (0..groups.len()) + .map(|i| if i == own_backing_group_index { max_messages_per_candidate } else { 0 }) + .collect_vec(); + + let neighbors = + topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap(); + let connected_neighbors_x = neighbors + .validator_indices_x + .iter() + .filter(|&v| connected_validators.contains(&(v.0 as usize))) + .cloned() + .collect_vec(); + let connected_neighbors_y = neighbors + .validator_indices_y + .iter() + .filter(|&v| connected_validators.contains(&(v.0 as usize))) + .cloned() + .collect_vec(); + let one_hop_peers_and_groups = connected_neighbors_x + .iter() + .chain(connected_neighbors_y.iter()) + .map(|validator_index| { + let peer_id = + *state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap(); + let group_index = + groups.iter().position(|group| group.contains(validator_index)).unwrap(); + (peer_id, group_index) + }) + .collect_vec(); + let two_hop_x_peers_and_groups = connected_neighbors_x + .iter() + .flat_map(|validator_index| { + let peer_id = + *state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap(); + topology + .compute_grid_neighbors_for(*validator_index) + .unwrap() + .validator_indices_y + .iter() + .map(|validator_neighbor| { + let group_index = groups + .iter() + .position(|group| group.contains(validator_neighbor)) + .unwrap(); + (peer_id, group_index) + }) + .collect_vec() + }) + .collect_vec(); + let two_hop_y_peers_and_groups = connected_neighbors_y + .iter() + .flat_map(|validator_index| { + let peer_id = + *state.test_authorities.peer_ids.get(validator_index.0 as usize).unwrap(); + topology + .compute_grid_neighbors_for(*validator_index) + .unwrap() + .validator_indices_x + .iter() + .map(|validator_neighbor| { + let group_index = groups + .iter() + .position(|group| group.contains(validator_neighbor)) + .unwrap(); + (peer_id, group_index) + }) + .collect_vec() + }) + .collect_vec(); + + for (seconding_peer_id, group_index) in one_hop_peers_and_groups + .into_iter() + .chain(two_hop_x_peers_and_groups) + .chain(two_hop_y_peers_and_groups) + { + let messages_sent_count = messages_tracker.get_mut(group_index).unwrap(); + if *messages_sent_count == max_messages_per_candidate { + continue + } + *messages_sent_count += 1; + + let candidate_hash = state + .candidate_receipts + .get(&block_info.hash) + .unwrap() + .get(group_index) + .unwrap() + .hash(); + let manifest = BackedCandidateManifest { + relay_parent: block_info.hash, + candidate_hash, + group_index: GroupIndex(group_index as u32), + para_id: Id::new(group_index as u32 + 1), + parent_head_data_hash: state.pvd.parent_head.hash(), + statement_knowledge: StatementFilter { + seconded_in_group: BitVec::from_iter( + groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| true), + ), + validated_in_group: BitVec::from_iter( + groups.get(GroupIndex(group_index as u32)).unwrap().iter().map(|_| false), + ), + }, + }; + let message = AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( + seconding_peer_id, + Versioned::V3(v3::StatementDistributionMessage::BackedCandidateManifest( + manifest, + )), + )), + ); + env.send_message(message).await; + } + + candidates_advertised += messages_tracker.iter().filter(|&&v| v > 0).collect_vec().len(); + + loop { + let manifests_count = state + .manifests_tracker + .values() + .filter(|v| v.load(Ordering::SeqCst)) + .collect::>() + .len(); + gum::debug!(target: LOG_TARGET, "{}/{} manifest exchanges", manifests_count, candidates_advertised); + + if manifests_count == candidates_advertised { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + + let duration: u128 = test_start.elapsed().as_millis(); + gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan()); + gum::info!(target: LOG_TARGET, + "Avg block time: {}", + format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red() + ); + + env.stop().await; + env.collect_resource_usage(benchmark_name, &["statement-distribution"]) +} diff --git a/polkadot/node/subsystem-bench/src/lib/statement/test_state.rs b/polkadot/node/subsystem-bench/src/lib/statement/test_state.rs new file mode 100644 index 000000000000..b8ea64c7e331 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/lib/statement/test_state.rs @@ -0,0 +1,436 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::{ + configuration::{TestAuthorities, TestConfiguration}, + mock::runtime_api::session_info_for_peers, + network::{HandleNetworkMessage, NetworkMessage}, + NODE_UNDER_TEST, +}; +use bitvec::vec::BitVec; +use futures::channel::oneshot; +use itertools::Itertools; +use parity_scale_codec::{Decode, Encode}; +use polkadot_node_network_protocol::{ + request_response::{ + v2::{AttestedCandidateRequest, AttestedCandidateResponse}, + Requests, + }, + v3::{ + BackedCandidateAcknowledgement, StatementDistributionMessage, StatementFilter, + ValidationProtocol, + }, + Versioned, +}; +use polkadot_node_primitives::{AvailableData, BlockData, PoV}; +use polkadot_node_subsystem_test_helpers::{ + derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info, +}; +use polkadot_overseer::BlockInfo; +use polkadot_primitives::{ + BlockNumber, CandidateHash, CandidateReceipt, CommittedCandidateReceipt, CompactStatement, + Hash, Header, Id, PersistedValidationData, SessionInfo, SignedStatement, SigningContext, + UncheckedSigned, ValidatorIndex, ValidatorPair, +}; +use polkadot_primitives_test_helpers::{ + dummy_committed_candidate_receipt, dummy_hash, dummy_head_data, dummy_pvd, +}; +use sc_network::{config::IncomingRequest, ProtocolName}; +use sp_core::{Pair, H256}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +#[derive(Clone)] +pub struct TestState { + // Full test config + pub config: TestConfiguration, + // Authority keys for the network emulation. + pub test_authorities: TestAuthorities, + // Relay chain block infos + pub block_infos: Vec, + // Map from generated candidate receipts + pub candidate_receipts: HashMap>, + // Map from generated commited candidate receipts + pub commited_candidate_receipts: HashMap>, + // PersistedValidationData, we use one for all candidates + pub pvd: PersistedValidationData, + // Relay chain block headers + pub block_headers: HashMap, + // Session info + pub session_info: SessionInfo, + // Pregenerated statements + pub statements: HashMap>>, + // Indices in the backing group where the node under test is + pub own_backing_group: Vec, + // Tracks how many statements we received for a candidates + pub statements_tracker: HashMap>>, + // Tracks if manifest exchange happened + pub manifests_tracker: HashMap>, +} + +impl TestState { + pub fn new(config: &TestConfiguration) -> Self { + let test_authorities = config.generate_authorities(); + let session_info = session_info_for_peers(config, &test_authorities); + let own_backing_group = session_info + .validator_groups + .iter() + .find(|g| g.contains(&ValidatorIndex(NODE_UNDER_TEST))) + .unwrap() + .clone(); + let mut state = Self { + config: config.clone(), + test_authorities, + block_infos: (1..=config.num_blocks).map(generate_block_info).collect(), + candidate_receipts: Default::default(), + commited_candidate_receipts: Default::default(), + pvd: dummy_pvd(dummy_head_data(), 0), + block_headers: Default::default(), + statements_tracker: Default::default(), + manifests_tracker: Default::default(), + session_info, + own_backing_group, + statements: Default::default(), + }; + + state.block_headers = state.block_infos.iter().map(generate_block_header).collect(); + + // For each unique pov we create a candidate receipt. + let pov_sizes = Vec::from(config.pov_sizes()); // For n_cores + let pov_size_to_candidate = generate_pov_size_to_candidate(&pov_sizes); + let receipt_templates = + generate_receipt_templates(&pov_size_to_candidate, config.n_validators, &state.pvd); + + for block_info in state.block_infos.iter() { + for core_idx in 0..config.n_cores { + let pov_size = pov_sizes.get(core_idx).expect("This is a cycle; qed"); + let candidate_index = + *pov_size_to_candidate.get(pov_size).expect("pov_size always exists; qed"); + let mut receipt = receipt_templates[candidate_index].clone(); + receipt.descriptor.para_id = Id::new(core_idx as u32 + 1); + receipt.descriptor.relay_parent = block_info.hash; + + state.candidate_receipts.entry(block_info.hash).or_default().push( + CandidateReceipt { + descriptor: receipt.descriptor.clone(), + commitments_hash: receipt.commitments.hash(), + }, + ); + state.statements_tracker.entry(receipt.hash()).or_default().extend( + (0..config.n_validators) + .map(|_| Arc::new(AtomicBool::new(false))) + .collect_vec(), + ); + state.manifests_tracker.insert(receipt.hash(), Arc::new(AtomicBool::new(false))); + state + .commited_candidate_receipts + .entry(block_info.hash) + .or_default() + .push(receipt); + } + } + + let groups = state.session_info.validator_groups.clone(); + + for block_info in state.block_infos.iter() { + for (index, group) in groups.iter().enumerate() { + let candidate = + state.candidate_receipts.get(&block_info.hash).unwrap().get(index).unwrap(); + let statements = group + .iter() + .map(|&v| { + sign_statement( + CompactStatement::Seconded(candidate.hash()), + block_info.hash, + v, + state.test_authorities.validator_pairs.get(v.0 as usize).unwrap(), + ) + }) + .collect_vec(); + state.statements.insert(candidate.hash(), statements); + } + } + + state + } + + pub fn reset_trackers(&self) { + self.statements_tracker.values().for_each(|v| { + v.iter() + .enumerate() + .for_each(|(index, v)| v.as_ref().store(index <= 1, Ordering::SeqCst)) + }); + self.manifests_tracker + .values() + .for_each(|v| v.as_ref().store(false, Ordering::SeqCst)); + } +} + +fn sign_statement( + statement: CompactStatement, + relay_parent: H256, + validator_index: ValidatorIndex, + pair: &ValidatorPair, +) -> UncheckedSigned { + let context = SigningContext { parent_hash: relay_parent, session_index: 0 }; + let payload = statement.signing_payload(&context); + + SignedStatement::new( + statement, + validator_index, + pair.sign(&payload[..]), + &context, + &pair.public(), + ) + .unwrap() + .as_unchecked() + .to_owned() +} + +fn generate_block_info(block_num: usize) -> BlockInfo { + new_block_import_info(Hash::repeat_byte(block_num as u8), block_num as BlockNumber) +} + +fn generate_block_header(info: &BlockInfo) -> (H256, Header) { + ( + info.hash, + Header { + digest: Default::default(), + number: info.number, + parent_hash: info.parent_hash, + extrinsics_root: Default::default(), + state_root: Default::default(), + }, + ) +} + +fn generate_pov_size_to_candidate(pov_sizes: &[usize]) -> HashMap { + pov_sizes + .iter() + .cloned() + .unique() + .enumerate() + .map(|(index, pov_size)| (pov_size, index)) + .collect() +} + +fn generate_receipt_templates( + pov_size_to_candidate: &HashMap, + n_validators: usize, + pvd: &PersistedValidationData, +) -> Vec { + pov_size_to_candidate + .iter() + .map(|(&pov_size, &index)| { + let mut receipt = dummy_committed_candidate_receipt(dummy_hash()); + let (_, erasure_root) = derive_erasure_chunks_with_proofs_and_root( + n_validators, + &AvailableData { + validation_data: pvd.clone(), + pov: Arc::new(PoV { block_data: BlockData(vec![index as u8; pov_size]) }), + }, + |_, _| {}, + ); + receipt.descriptor.persisted_validation_data_hash = pvd.hash(); + receipt.descriptor.erasure_root = erasure_root; + receipt + }) + .collect() +} + +#[async_trait::async_trait] +impl HandleNetworkMessage for TestState { + async fn handle( + &self, + message: NetworkMessage, + node_sender: &mut futures::channel::mpsc::UnboundedSender, + ) -> Option { + match message { + NetworkMessage::RequestFromNode(_authority_id, Requests::AttestedCandidateV2(req)) => { + let payload = req.payload; + let candidate_receipt = self + .commited_candidate_receipts + .values() + .flatten() + .find(|v| v.hash() == payload.candidate_hash) + .unwrap() + .clone(); + let persisted_validation_data = self.pvd.clone(); + let statements = self.statements.get(&payload.candidate_hash).unwrap().clone(); + let res = AttestedCandidateResponse { + candidate_receipt, + persisted_validation_data, + statements, + }; + let _ = req.pending_response.send(Ok((res.encode(), ProtocolName::from("")))); + None + }, + NetworkMessage::MessageFromNode( + authority_id, + Versioned::V3(ValidationProtocol::StatementDistribution( + StatementDistributionMessage::Statement(relay_parent, statement), + )), + ) => { + let index = self + .test_authorities + .validator_authority_id + .iter() + .position(|v| v == &authority_id) + .unwrap(); + let candidate_hash = *statement.unchecked_payload().candidate_hash(); + + let statements_sent_count = self + .statements_tracker + .get(&candidate_hash) + .unwrap() + .get(index) + .unwrap() + .as_ref(); + if statements_sent_count.load(Ordering::SeqCst) { + return None + } else { + statements_sent_count.store(true, Ordering::SeqCst); + } + + let group_statements = self.statements.get(&candidate_hash).unwrap(); + if !group_statements.iter().any(|s| s.unchecked_validator_index().0 == index as u32) + { + return None + } + + let statement = CompactStatement::Valid(candidate_hash); + let context = SigningContext { parent_hash: relay_parent, session_index: 0 }; + let payload = statement.signing_payload(&context); + let pair = self.test_authorities.validator_pairs.get(index).unwrap(); + let signature = pair.sign(&payload[..]); + let statement = SignedStatement::new( + statement, + ValidatorIndex(index as u32), + signature, + &context, + &pair.public(), + ) + .unwrap() + .as_unchecked() + .to_owned(); + + node_sender + .start_send(NetworkMessage::MessageFromPeer( + *self.test_authorities.peer_ids.get(index).unwrap(), + Versioned::V3(ValidationProtocol::StatementDistribution( + StatementDistributionMessage::Statement(relay_parent, statement), + )), + )) + .unwrap(); + None + }, + NetworkMessage::MessageFromNode( + authority_id, + Versioned::V3(ValidationProtocol::StatementDistribution( + StatementDistributionMessage::BackedCandidateManifest(manifest), + )), + ) => { + let index = self + .test_authorities + .validator_authority_id + .iter() + .position(|v| v == &authority_id) + .unwrap(); + let backing_group = + self.session_info.validator_groups.get(manifest.group_index).unwrap(); + let group_size = backing_group.len(); + let is_own_backing_group = backing_group.contains(&ValidatorIndex(NODE_UNDER_TEST)); + let mut seconded_in_group = + BitVec::from_iter((0..group_size).map(|_| !is_own_backing_group)); + let mut validated_in_group = BitVec::from_iter((0..group_size).map(|_| false)); + + if is_own_backing_group { + let (pending_response, response_receiver) = oneshot::channel(); + let peer_id = self.test_authorities.peer_ids.get(index).unwrap().to_owned(); + node_sender + .start_send(NetworkMessage::RequestFromPeer(IncomingRequest { + peer: peer_id, + payload: AttestedCandidateRequest { + candidate_hash: manifest.candidate_hash, + mask: StatementFilter::blank(self.own_backing_group.len()), + } + .encode(), + pending_response, + })) + .unwrap(); + + let response = response_receiver.await.unwrap(); + let response = + AttestedCandidateResponse::decode(&mut response.result.unwrap().as_ref()) + .unwrap(); + + for statement in response.statements { + let validator_index = statement.unchecked_validator_index(); + let position_in_group = + backing_group.iter().position(|v| *v == validator_index).unwrap(); + match statement.unchecked_payload() { + CompactStatement::Seconded(_) => + seconded_in_group.set(position_in_group, true), + CompactStatement::Valid(_) => + validated_in_group.set(position_in_group, true), + } + } + } + + let ack = BackedCandidateAcknowledgement { + candidate_hash: manifest.candidate_hash, + statement_knowledge: StatementFilter { seconded_in_group, validated_in_group }, + }; + node_sender + .start_send(NetworkMessage::MessageFromPeer( + *self.test_authorities.peer_ids.get(index).unwrap(), + Versioned::V3(ValidationProtocol::StatementDistribution( + StatementDistributionMessage::BackedCandidateKnown(ack), + )), + )) + .unwrap(); + + self.manifests_tracker + .get(&manifest.candidate_hash) + .unwrap() + .as_ref() + .store(true, Ordering::SeqCst); + + None + }, + NetworkMessage::MessageFromNode( + _authority_id, + Versioned::V3(ValidationProtocol::StatementDistribution( + StatementDistributionMessage::BackedCandidateKnown(ack), + )), + ) => { + self.manifests_tracker + .get(&ack.candidate_hash) + .unwrap() + .as_ref() + .store(true, Ordering::SeqCst); + + None + }, + _ => Some(message), + } + } +}