Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

refactor grid topology to expose more info to subsystems #6140

Merged
merged 8 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 31 additions & 12 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,13 @@ impl State {
})
},
NetworkBridgeEvent::NewGossipTopology(topology) => {
let session = topology.session;
self.handle_new_session_topology(ctx, session, SessionGridTopology::from(topology))
.await;
self.handle_new_session_topology(
ctx,
topology.session,
topology.topology,
topology.local_index,
)
.await;
},
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await;
Expand Down Expand Up @@ -500,8 +504,14 @@ impl State {
ctx: &mut Context,
session: SessionIndex,
topology: SessionGridTopology,
local_index: Option<ValidatorIndex>,
) {
self.topologies.insert_topology(session, topology);
if local_index.is_none() {
// this subsystem only matters to validators.
return
}

self.topologies.insert_topology(session, topology, local_index);
let topology = self.topologies.get_topology(session).expect("just inserted above; qed");

adjust_required_routing_and_propagate(
Expand All @@ -511,7 +521,9 @@ impl State {
|block_entry| block_entry.session == session,
|required_routing, local, validator_index| {
if *required_routing == RequiredRouting::PendingTopology {
*required_routing = topology.required_routing_by_index(*validator_index, local);
*required_routing = topology
.local_grid_neighbors()
.required_routing_by_index(*validator_index, local);
}
},
)
Expand Down Expand Up @@ -861,7 +873,7 @@ impl State {
let local = source == MessageSource::Local;

let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.required_routing_by_index(validator_index, local)
t.local_grid_neighbors().required_routing_by_index(validator_index, local)
});

let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) {
Expand Down Expand Up @@ -902,7 +914,10 @@ impl State {
return false
}

if let Some(true) = topology.as_ref().map(|t| t.route_to_peer(required_routing, peer)) {
if let Some(true) = topology
.as_ref()
.map(|t| t.local_grid_neighbors().route_to_peer(required_routing, peer))
{
return true
}

Expand Down Expand Up @@ -1169,7 +1184,8 @@ impl State {
// the assignment to all aware peers in the required routing _except_ the original
// source of the assignment. Hence the `in_topology_check`.
// 3. Any randomly selected peers have been sent the assignment already.
let in_topology = topology.map_or(false, |t| t.route_to_peer(required_routing, peer));
let in_topology = topology
.map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
in_topology || knowledge.sent.contains(message_subject, MessageKind::Assignment)
};

Expand Down Expand Up @@ -1301,9 +1317,9 @@ impl State {
let required_routing = message_state.required_routing;
let rng = &mut *rng;
let mut peer_filter = move |peer_id| {
let in_topology = topology
.as_ref()
.map_or(false, |t| t.route_to_peer(required_routing, peer_id));
let in_topology = topology.as_ref().map_or(false, |t| {
t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
});
in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
Expand Down Expand Up @@ -1564,7 +1580,10 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi
});

for (peer, peer_knowledge) in &mut block_entry.known_by {
if !topology.route_to_peer(message_state.required_routing, peer) {
if !topology
.local_grid_neighbors()
.route_to_peer(message_state.required_routing, peer)
{
continue
}

Expand Down
95 changes: 73 additions & 22 deletions node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
use super::*;
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use polkadot_node_network_protocol::{our_view, peer_set::ValidationVersion, view, ObservedRole};
use polkadot_node_network_protocol::{
grid_topology::{SessionGridTopology, TopologyPeerInfo},
our_view,
peer_set::ValidationVersion,
view, ObservedRole,
};
use polkadot_node_primitives::approval::{
AssignmentCertKind, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT,
};
Expand Down Expand Up @@ -119,33 +124,79 @@ fn make_gossip_topology(
neighbors_x: &[usize],
neighbors_y: &[usize],
) -> network_bridge_event::NewGossipTopology {
let mut t = network_bridge_event::NewGossipTopology {
session,
our_neighbors_x: HashMap::new(),
our_neighbors_y: HashMap::new(),
// This builds a grid topology which is a square matrix.
// The local validator occupies the top left-hand corner.
// The X peers occupy the same row and the Y peers occupy
// the same column.

let local_index = 1;

assert_eq!(
neighbors_x.len(),
neighbors_y.len(),
"mocking grid topology only implemented for squares",
);

let d = neighbors_x.len() + 1;

let grid_size = d * d;
assert!(grid_size > 0);
assert!(all_peers.len() >= grid_size);

let peer_info = |i: usize| TopologyPeerInfo {
peer_ids: vec![all_peers[i].0.clone()],
validator_index: ValidatorIndex::from(i as u32),
discovery_id: all_peers[i].1.clone(),
};

for &i in neighbors_x {
t.our_neighbors_x.insert(
all_peers[i].1.clone(),
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![all_peers[i].0.clone()],
validator_index: ValidatorIndex::from(i as u32),
},
);
let mut canonical_shuffling: Vec<_> = (0..)
.filter(|i| local_index != *i)
.filter(|i| !neighbors_x.contains(i))
.filter(|i| !neighbors_y.contains(i))
.take(grid_size)
.map(peer_info)
.collect();

// filled with junk except for own.
let mut shuffled_indices = vec![d + 1; grid_size];
shuffled_indices[local_index] = 0;
canonical_shuffling[0] = peer_info(local_index);

for (x_pos, v) in neighbors_x.iter().enumerate() {
let pos = 1 + x_pos;
canonical_shuffling[pos] = peer_info(*v);
}

for &i in neighbors_y {
t.our_neighbors_y.insert(
all_peers[i].1.clone(),
network_bridge_event::TopologyPeerInfo {
peer_ids: vec![all_peers[i].0.clone()],
validator_index: ValidatorIndex::from(i as u32),
},
);
for (y_pos, v) in neighbors_y.iter().enumerate() {
let pos = d * (1 + y_pos);
canonical_shuffling[pos] = peer_info(*v);
}

let topology = SessionGridTopology::new(shuffled_indices, canonical_shuffling);

// sanity check.
{
let g_n = topology
.compute_grid_neighbors_for(ValidatorIndex(local_index as _))
.expect("topology just constructed with this validator index");

assert_eq!(g_n.validator_indices_x.len(), neighbors_x.len());
assert_eq!(g_n.validator_indices_y.len(), neighbors_y.len());

for i in neighbors_x {
assert!(g_n.validator_indices_x.contains(&ValidatorIndex(*i as _)));
}

for i in neighbors_y {
assert!(g_n.validator_indices_y.contains(&ValidatorIndex(*i as _)));
}
}

t
network_bridge_event::NewGossipTopology {
session,
topology,
local_index: Some(ValidatorIndex(local_index as _)),
}
}

async fn setup_gossip_topology(
Expand Down
1 change: 1 addition & 0 deletions node/network/bitfield-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
maplit = "1.0.2"
Expand Down
33 changes: 22 additions & 11 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::{channel::oneshot, FutureExt};
use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{
RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology,
GridNeighbors, RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage,
},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
Expand Down Expand Up @@ -327,7 +327,7 @@ async fn handle_bitfield_distribution<Context>(
};

let msg = BitfieldGossipMessage { relay_parent, signed_availability };
let topology = state.topologies.get_topology_or_fallback(session_idx);
let topology = state.topologies.get_topology_or_fallback(session_idx).local_grid_neighbors();
let required_routing = topology.required_routing_by_index(validator_index, true);

relay_message(
Expand All @@ -352,7 +352,7 @@ async fn handle_bitfield_distribution<Context>(
async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
topology: &SessionGridTopology,
topology_neighbors: &GridNeighbors,
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
Expand Down Expand Up @@ -384,7 +384,7 @@ async fn relay_message<Context>(
let message_needed =
job_data.message_from_validator_needed_by_peer(&peer, &validator);
if message_needed {
let in_topology = topology.route_to_peer(required_routing, &peer);
let in_topology = topology_neighbors.route_to_peer(required_routing, &peer);
let need_routing = in_topology || {
let route_random = random_routing.sample(total_peers, rng);
if route_random {
Expand Down Expand Up @@ -533,7 +533,8 @@ async fn process_incoming_peer_message<Context>(

let topology = state
.topologies
.get_topology_or_fallback(job_data.signing_context.session_index);
.get_topology_or_fallback(job_data.signing_context.session_index)
.local_grid_neighbors();
let required_routing = topology.required_routing_by_index(validator_index, false);

metrics.on_bitfield_received();
Expand Down Expand Up @@ -579,14 +580,24 @@ async fn handle_network_msg<Context>(
},
NetworkBridgeEvent::NewGossipTopology(gossip_topology) => {
let session_index = gossip_topology.session;
let new_topology = SessionGridTopology::from(gossip_topology);
let newly_added = new_topology.peers_diff(&new_topology);
state.topologies.update_topology(session_index, new_topology);
let new_topology = gossip_topology.topology;
let prev_neighbors =
state.topologies.get_current_topology().local_grid_neighbors().clone();

state.topologies.update_topology(
session_index,
new_topology,
gossip_topology.local_index,
);
let current_topology = state.topologies.get_current_topology();

let newly_added = current_topology.local_grid_neighbors().peers_diff(&prev_neighbors);

gum::debug!(
target: LOG_TARGET,
?session_index,
"New gossip topology received {} unseen peers",
newly_added.len()
newly_added_peers = ?newly_added.len(),
"New gossip topology received",
);

for new_peer in newly_added {
Expand Down Expand Up @@ -651,7 +662,7 @@ async fn handle_peer_view_change<Context>(
.cloned()
.collect::<Vec<_>>();

let topology = state.topologies.get_current_topology();
let topology = state.topologies.get_current_topology().local_grid_neighbors();
let is_gossip_peer = topology.route_to_peer(RequiredRouting::GridXY, &origin);
let lucky = is_gossip_peer ||
util::gen_ratio_rng(
Expand Down
Loading