Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[DNM] scale testing #6530

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5f0b8e9
Make all subsystems run in blocking threads
sandreim Jan 10, 2023
a61710d
blocking rx worker
sandreim Jan 12, 2023
8db8224
Decrease bitfield signing timer to 1s.
sandreim Jan 16, 2023
3b67955
Bump message channel capacity.
sandreim Jan 17, 2023
1aa0bd2
Test: Use unbounded channel to make sure we don't delay peer view upd…
sandreim Jan 17, 2023
4f18158
Revert "Test: Use unbounded channel to make sure we don't delay peer …
sandreim Jan 19, 2023
451d156
ridiculous channel size 32k
sandreim Jan 19, 2023
ecfda93
128k tryhard.
sandreim Jan 26, 2023
4d4fbc3
blocking recovery-task due to chunk reconstruction
sandreim Jan 27, 2023
76cc400
Merge branch 'master' of github.com:paritytech/polkadot into sandreim…
sandreim Jan 27, 2023
906a1a7
Revert "blocking recovery-task due to chunk reconstruction"
sandreim Jan 27, 2023
8e181f1
channel size override only for approval distribution
sandreim Jan 27, 2023
020b5ad
use fast path for av recovery
sandreim Jan 30, 2023
036fa25
disable aggression
sandreim Jan 31, 2023
7ebf744
Disable approval/assignment checks
sandreim Feb 2, 2023
919cf69
fix test
sandreim Feb 2, 2023
435a104
disable approval distribution gossip
sandreim Feb 3, 2023
3e76631
Merge branch 'master' of github.com:paritytech/polkadot into sandreim…
sandreim Feb 3, 2023
1fc461a
Revert "Disable approval/assignment checks"
sandreim Feb 3, 2023
3975adb
Revert "fix test"
sandreim Feb 3, 2023
a760cbc
fmt and comment
sandreim Feb 3, 2023
63c304c
larger statement dist queue
sandreim Feb 4, 2023
d68f99c
bitfield distribution gets a queue size bump
sandreim Feb 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use self::metrics::Metrics;
mod tests;

/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1000);
const LOG_TARGET: &str = "parachain::bitfield-signing";

// TODO: use `fatality` (https://github.com/paritytech/polkadot/issues/5540).
Expand Down
2 changes: 2 additions & 0 deletions node/network/approval-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
rand = "0.8"

futures = "0.3.21"
Expand Down
134 changes: 90 additions & 44 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ use polkadot_node_subsystem::{
},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{runtime, runtime::RuntimeInfo};
use polkadot_primitives::{
BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature,
AuthorityDiscoveryId, BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex,
ValidatorSignature,
};
use rand::{CryptoRng, Rng, SeedableRng};
use sp_keystore::SyncCryptoStorePtr;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque};

use self::metrics::Metrics;
Expand All @@ -66,6 +69,7 @@ const BENEFIT_VALID_MESSAGE_FIRST: Rep =
/// The Approval Distribution subsystem.
pub struct ApprovalDistribution {
metrics: Metrics,
keystore: SyncCryptoStorePtr,
}

/// Contains recently finalized
Expand Down Expand Up @@ -138,8 +142,8 @@ impl AggressionConfig {
impl Default for AggressionConfig {
fn default() -> Self {
AggressionConfig {
l1_threshold: Some(13),
l2_threshold: Some(28),
l1_threshold: Some(130),
l2_threshold: Some(280),
resend_unfinalized_period: Some(8),
}
}
Expand Down Expand Up @@ -180,6 +184,9 @@ struct State {

/// Config for aggression.
aggression_config: AggressionConfig,

/// PeerId to AuthorityId mapping.
maybe_authority_ids: HashMap<PeerId, Option<HashSet<AuthorityDiscoveryId>>>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -329,16 +336,28 @@ impl State {
metrics: &Metrics,
event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
rng: &mut (impl CryptoRng + Rng),
_runtime: &RuntimeInfo,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role, _, _) => {
NetworkBridgeEvent::PeerConnected(peer_id, role, _, maybe_authority_id) => {
// insert a blank view if none already present
gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected");
self.peer_views.entry(peer_id).or_default();
if maybe_authority_id.is_none() {
// Just for debug, but for scale testing we run with info levels only.
gum::warn!(
target: LOG_TARGET,
?peer_id,
?role,
"No authority ID found for peer"
);
}
self.peer_views.entry(peer_id.clone()).or_default();
*self.maybe_authority_ids.entry(peer_id).or_default() = maybe_authority_id;
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
self.peer_views.remove(&peer_id);
self.maybe_authority_ids.remove(&peer_id);
self.blocks.iter_mut().for_each(|(_hash, entry)| {
entry.known_by.remove(&peer_id);
})
Expand Down Expand Up @@ -513,18 +532,19 @@ impl State {
}

self.topologies.insert_topology(session, topology, local_index);
let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
// let topology = self.topologies.get_topology(session).expect("just inserted above; qed");

adjust_required_routing_and_propagate(
ctx,
&mut self.blocks,
&self.topologies,
|block_entry| block_entry.session == session,
|required_routing, local, validator_index| {
|required_routing, _local, _validator_index| {
if *required_routing == RequiredRouting::PendingTopology {
*required_routing = topology
.local_grid_neighbors()
.required_routing_by_index(*validator_index, local);
// *required_routing = topology
// .local_grid_neighbors()
// .required_routing_by_index(*validator_index, local);
*required_routing = RequiredRouting::All;
}
},
)
Expand Down Expand Up @@ -708,6 +728,7 @@ impl State {
) where
R: CryptoRng + Rng,
{
// TODO: implement checking to match authority against message validator index to not process gossiped assignments.
let block_hash = assignment.block_hash;
let validator_index = assignment.validator;

Expand Down Expand Up @@ -846,6 +867,10 @@ impl State {
return
},
}

// Do not gossip assignments, just send our own.
metrics.on_assignment_imported();
return
} else {
if !entry.knowledge.insert(message_subject.clone(), message_kind) {
// if we already imported an assignment, there is no need to distribute it again
Expand All @@ -870,9 +895,10 @@ impl State {
let topology = self.topologies.get_topology(entry.session);
let local = source == MessageSource::Local;

let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.local_grid_neighbors().required_routing_by_index(validator_index, local)
});
// let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
// t.local_grid_neighbors().required_routing_by_index(validator_index, local)
// });
let required_routing = RequiredRouting::All;

let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) {
Some(candidate_entry) => {
Expand Down Expand Up @@ -1081,6 +1107,9 @@ impl State {
return
},
}
// Do not gossip.
metrics.on_approval_imported();
return
} else {
if !entry.knowledge.insert(message_subject.clone(), message_kind) {
// if we already imported an approval, there is no need to distribute it again
Expand Down Expand Up @@ -1126,7 +1155,7 @@ impl State {
},
);

required_routing
RequiredRouting::All
},
Some(_) => {
unreachable!(
Expand Down Expand Up @@ -1270,11 +1299,11 @@ impl State {
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
metrics: &Metrics,
entries: &mut HashMap<Hash, BlockEntry>,
topologies: &SessionGridTopologies,
total_peers: usize,
_topologies: &SessionGridTopologies,
_total_peers: usize,
peer_id: PeerId,
view: View,
rng: &mut (impl CryptoRng + Rng),
_rng: &mut (impl CryptoRng + Rng),
) {
metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
Expand All @@ -1300,37 +1329,42 @@ impl State {

let peer_knowledge = entry.known_by.entry(peer_id).or_default();

let topology = topologies.get_topology(entry.session);
// let topology = topologies.get_topology(entry.session);

// Iterate all messages in all candidates.
for (candidate_index, validator, message_state) in
entry.candidates.iter_mut().enumerate().flat_map(|(c_i, c)| {
c.messages.iter_mut().map(move |(k, v)| (c_i as _, k, v))
}) {
// No gossip.
if !message_state.local {
continue
}

// Propagate the message to all peers in the required routing set OR
// randomly sample peers.
{
let random_routing = &mut message_state.random_routing;
let required_routing = message_state.required_routing;
let rng = &mut *rng;
let mut peer_filter = move |peer_id| {
let in_topology = topology.as_ref().map_or(false, |t| {
t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
});
in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
random_routing.inc_sent();
}

route_random
}
};

if !peer_filter(&peer_id) {
continue
}
}
// {
// let random_routing = &mut message_state.random_routing;
// let required_routing = message_state.required_routing;
// let rng = &mut *rng;
// let mut peer_filter = move |peer_id| {
// let in_topology = topology.as_ref().map_or(false, |t| {
// t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
// });
// in_topology || {
// let route_random = random_routing.sample(total_peers, rng);
// if route_random {
// random_routing.inc_sent();
// }

// route_random
// }
// };

// if !peer_filter(&peer_id) {
// continue
// }
// }

let message_subject = MessageSubject(block, candidate_index, *validator);

Expand Down Expand Up @@ -1528,6 +1562,11 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi
.enumerate()
.flat_map(|(c_i, c)| c.messages.iter_mut().map(move |(k, v)| (c_i as _, k, v)))
{
// No gossip.
if !message_state.local {
continue
}

routing_modifier(&mut message_state.required_routing, message_state.local, validator);

if message_state.required_routing.is_empty() {
Expand Down Expand Up @@ -1619,8 +1658,8 @@ async fn modify_reputation(
#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
impl ApprovalDistribution {
/// Create a new instance of the [`ApprovalDistribution`] subsystem.
pub fn new(metrics: Metrics) -> Self {
Self { metrics }
pub fn new(metrics: Metrics, keystore: SyncCryptoStorePtr) -> Self {
Self { metrics, keystore }
}

async fn run<Context>(self, ctx: Context) {
Expand All @@ -1639,6 +1678,12 @@ impl ApprovalDistribution {
state: &mut State,
rng: &mut (impl CryptoRng + Rng),
) {
let runtime = RuntimeInfo::new_with_config(runtime::Config {
keystore: Some(self.keystore),
session_cache_lru_size: std::num::NonZeroUsize::new(6 as usize)
.expect("Cache size can not be 0; qed"),
});

loop {
let message = match ctx.recv().await {
Ok(message) => message,
Expand All @@ -1649,7 +1694,7 @@ impl ApprovalDistribution {
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng, &runtime).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
..
})) => {
Expand All @@ -1673,10 +1718,11 @@ impl ApprovalDistribution {
msg: ApprovalDistributionMessage,
metrics: &Metrics,
rng: &mut (impl CryptoRng + Rng),
runtime: &RuntimeInfo,
) {
match msg {
ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
state.handle_network_msg(ctx, metrics, event, rng).await;
state.handle_network_msg(ctx, metrics, event, rng, runtime).await;
},
ApprovalDistributionMessage::NewBlocks(metas) => {
state.handle_new_blocks(ctx, metrics, metas, rng).await;
Expand Down
Loading