Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

validator_discovery: cache by (Hash, ParaId) #2402

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,13 @@ async fn distribute_collation(
}

// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(ctx, relay_parent, state, current_validators.union(&next_validators).cloned().collect()).await?;
connect_to_validators(
ctx,
relay_parent,
id,
state,
current_validators.union(&next_validators).cloned().collect(),
).await?;

state.our_validators_groups.insert(relay_parent, current_validators.into());

Expand Down Expand Up @@ -360,6 +366,7 @@ async fn declare(
async fn connect_to_validators(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
relay_parent: Hash,
para_id: ParaId,
state: &mut State,
validators: Vec<ValidatorId>,
) -> Result<()> {
Expand All @@ -370,7 +377,7 @@ async fn connect_to_validators(
PeerSet::Collation,
).await?;

state.connection_requests.put(relay_parent, request);
state.connection_requests.put(relay_parent, para_id, request);

Ok(())
}
Expand Down Expand Up @@ -680,7 +687,7 @@ async fn handle_our_view_change(
for removed in state.view.difference(&view) {
state.collations.remove(removed);
state.our_validators_groups.remove(removed);
state.connection_requests.remove(removed);
state.connection_requests.remove_all(removed);
state.span_per_relay_parent.remove(removed);
}

Expand Down
9 changes: 5 additions & 4 deletions node/network/pov-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async fn handle_signal(
}

for relay_parent in deactivated {
state.connection_requests.remove(&relay_parent);
state.connection_requests.remove_all(&relay_parent);
state.relay_parent_state.remove(&relay_parent);
}

Expand Down Expand Up @@ -303,13 +303,14 @@ async fn connect_to_relevant_validators(
relay_parent: Hash,
descriptor: &CandidateDescriptor,
) {
let para_id = descriptor.para_id;
if let Ok(Some(relevant_validators)) =
determine_relevant_validators(ctx, relay_parent, descriptor.para_id).await
determine_relevant_validators(ctx, relay_parent, para_id).await
{
// We only need one connection request per (relay_parent, para_id)
// so here we take this shortcut to avoid calling `connect_to_validators`
// more than once.
if !connection_requests.contains_request(&relay_parent) {
if !connection_requests.contains_request(&relay_parent, para_id) {
tracing::debug!(target: LOG_TARGET, validators=?relevant_validators, "connecting to validators");
match validator_discovery::connect_to_validators(
ctx,
Expand All @@ -318,7 +319,7 @@ async fn connect_to_relevant_validators(
PeerSet::Validation,
).await {
Ok(new_connection_request) => {
connection_requests.put(relay_parent, new_connection_request);
connection_requests.put(relay_parent, para_id, new_connection_request);
}
Err(e) => {
tracing::debug!(
Expand Down
146 changes: 114 additions & 32 deletions node/subsystem-util/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use polkadot_node_subsystem::{
messages::{AllMessages, NetworkBridgeMessage},
SubsystemContext,
};
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex};
use polkadot_primitives::v1::{
Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex, Id as ParaId,
};
use polkadot_node_network_protocol::peer_set::PeerSet;
use sc_network::PeerId;
use crate::Error;
Expand Down Expand Up @@ -140,19 +142,22 @@ async fn connect_to_authorities<Context: SubsystemContext>(
pub struct DiscoveredValidator {
/// The relay parent associated with the connection request that returned a result.
pub relay_parent: Hash,
/// The para ID associated with the connection request that returned a result.
pub para_id: ParaId,
/// The [`ValidatorId`] that was resolved.
pub validator_id: ValidatorId,
/// The [`PeerId`] associated to the validator id.
pub peer_id: PeerId,
}

/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`].
struct ConnectionRequestForRelayParent {
struct ConnectionRequestForRelayParentAndParaId {
request: ConnectionRequest,
relay_parent: Hash,
para_id: ParaId,
}

impl stream::Stream for ConnectionRequestForRelayParent {
impl stream::Stream for ConnectionRequestForRelayParentAndParaId {
type Item = DiscoveredValidator;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
Expand All @@ -162,45 +167,63 @@ impl stream::Stream for ConnectionRequestForRelayParent {
validator_id,
peer_id,
relay_parent: self.relay_parent,
para_id: self.para_id,
}))
}
}

/// A struct that assists performing multiple concurrent connection requests.
///
/// This allows concurrent connections to validator sets at different `relay_parents`.
/// This allows concurrent connections to validator sets at different `(relay_parents, para_id)`.
/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests.
#[derive(Default)]
pub struct ConnectionRequests {
/// Connection requests relay_parent -> StreamUnordered token
id_map: HashMap<Hash, usize>,
/// Connection requests relay_parent -> para_id -> StreamUnordered token
///
/// Q: Why not (relay_parent, para_id) -> Stream?
/// A: So that we can remove from it by relay_parent only.
id_map: HashMap<Hash, HashMap<ParaId, usize>>,

/// Connection requests themselves.
requests: StreamUnordered<ConnectionRequestForRelayParent>,
requests: StreamUnordered<ConnectionRequestForRelayParentAndParaId>,
}

impl ConnectionRequests {
/// Insert a new connection request.
///
/// If a `ConnectionRequest` under a given `relay_parent` already exists it will
/// be revoked and substituted with the given one.
pub fn put(&mut self, relay_parent: Hash, request: ConnectionRequest) {
self.remove(&relay_parent);
let token = self.requests.push(ConnectionRequestForRelayParent { relay_parent, request });

self.id_map.insert(relay_parent, token);
/// If a `ConnectionRequest` under a given `relay_parent` and `para_id` already exists,
/// it will be revoked and substituted with the given one.
pub fn put(&mut self, relay_parent: Hash, para_id: ParaId, request: ConnectionRequest) {
self.remove(&relay_parent, para_id);
let token = self.requests.push(ConnectionRequestForRelayParentAndParaId {
relay_parent,
para_id,
request,
});

self.id_map.entry(relay_parent).or_default().insert(para_id, token);
}

/// Remove a connection request by a given `relay_parent`.
pub fn remove(&mut self, relay_parent: &Hash) {
if let Some(token) = self.id_map.remove(relay_parent) {
/// Remove all connection requests by a given `relay_parent`.
pub fn remove_all(&mut self, relay_parent: &Hash) {
let map = self.id_map.remove(relay_parent);
for token in map.map(|m| m.into_iter().map(|(_, v)| v)).into_iter().flatten() {
Pin::new(&mut self.requests).remove(token);
}
}

/// Is a connection at this relay parent already present in the request
pub fn contains_request(&self, relay_parent: &Hash) -> bool {
self.id_map.contains_key(relay_parent)
/// Remove a connection request by a given `relay_parent` and `para_id`.
pub fn remove(&mut self, relay_parent: &Hash, para_id: ParaId) {
if let Some(map) = self.id_map.get_mut(relay_parent) {
if let Some(token) = map.remove(&para_id) {
Pin::new(&mut self.requests).remove(token);
}
}
}

/// Is a connection at this relay parent and para_id already present in the request
pub fn contains_request(&self, relay_parent: &Hash, para_id: ParaId) -> bool {
self.id_map.get(relay_parent).map_or(false, |map| map.contains_key(&para_id))
}

/// Returns the next available connection request result.
Expand Down Expand Up @@ -298,22 +321,22 @@ mod tests {
};

let relay_parent_1 = Hash::repeat_byte(1);

connection_requests.put(relay_parent_1.clone(), connection_request_1);
let para_id = ParaId::from(3);
connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1);

rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap();

let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 },
);

let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_2, peer_id: peer_id_2 },
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_2, peer_id: peer_id_2 },
);

check_next_is_pending(&mut connection_requests).await;
Expand Down Expand Up @@ -358,23 +381,81 @@ mod tests {

let relay_parent_1 = Hash::repeat_byte(1);
let relay_parent_2 = Hash::repeat_byte(2);
let para_id = ParaId::from(3);

connection_requests.put(relay_parent_1.clone(), connection_request_1);
connection_requests.put(relay_parent_2.clone(), connection_request_2);
connection_requests.put(relay_parent_1.clone(), para_id, connection_request_1);
connection_requests.put(relay_parent_2.clone(), para_id, connection_request_2);

rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();

let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
DiscoveredValidator { relay_parent: relay_parent_1, para_id, validator_id: validator_1, peer_id: peer_id_1 },
);

let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_2, validator_id: validator_2, peer_id: peer_id_2 },
DiscoveredValidator { relay_parent: relay_parent_2, para_id, validator_id: validator_2, peer_id: peer_id_2 },
);

check_next_is_pending(&mut connection_requests).await;
});
}

#[test]
fn same_relay_parent_diffent_para_ids() {
let mut connection_requests = ConnectionRequests::default();

executor::block_on(async move {
check_next_is_pending(&mut connection_requests).await;

let validator_1 = ValidatorPair::generate().0.public();
let validator_2 = ValidatorPair::generate().0.public();

let auth_1 = AuthorityDiscoveryId::from_slice(&[1; 32]);
let auth_2 = AuthorityDiscoveryId::from_slice(&[2; 32]);

let mut validator_map_1 = HashMap::new();
let mut validator_map_2 = HashMap::new();

validator_map_1.insert(auth_1.clone(), validator_1.clone());
validator_map_2.insert(auth_2.clone(), validator_2.clone());

let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);

let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();

let connection_request_1 = ConnectionRequest {
validator_map: validator_map_1,
connections: rq1_rx,
};

let connection_request_2 = ConnectionRequest {
validator_map: validator_map_2,
connections: rq2_rx,
};

let relay_parent = Hash::repeat_byte(1);
let para_id_1 = ParaId::from(1);
let para_id_2 = ParaId::from(2);

connection_requests.put(relay_parent.clone(), para_id_1, connection_request_1);
connection_requests.put(relay_parent.clone(), para_id_2, connection_request_2);

rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();

connection_requests.remove(&relay_parent, para_id_1);

let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent, para_id: para_id_2, validator_id: validator_2, peer_id: peer_id_2 },
);

check_next_is_pending(&mut connection_requests).await;
Expand Down Expand Up @@ -418,22 +499,23 @@ mod tests {
};

let relay_parent = Hash::repeat_byte(3);
let para_id = ParaId::from(3);

connection_requests.put(relay_parent.clone(), connection_request_1);
connection_requests.put(relay_parent.clone(), para_id, connection_request_1);

rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap();

let res = connection_requests.next().await;
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_1, peer_id: peer_id_1.clone() });
assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_1, peer_id: peer_id_1.clone() });

connection_requests.put(relay_parent.clone(), connection_request_2);
connection_requests.put(relay_parent.clone(), para_id, connection_request_2);

assert!(rq1_tx.send((auth_1, peer_id_1.clone())).await.is_err());

rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();

let res = connection_requests.next().await;
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_2, peer_id: peer_id_2 });
assert_eq!(res, DiscoveredValidator { relay_parent, para_id, validator_id: validator_2, peer_id: peer_id_2 });

check_next_is_pending(&mut connection_requests).await;
});
Expand Down