From 8af0035806e7ed8c797802829966a00368c302d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 23 Jun 2021 20:57:37 +0200 Subject: [PATCH 1/3] Only send one collation per relay parent at a time to validators This changes the way we are sending collations to validators. Before we answered every collation request immediatley. Now we only answer one pov request at a time per relay parent. This should bring down the bandwidth requirements and should help parachains to include bigger blocks more easily. --- .../src/collator_side/mod.rs | 144 +++++++++++++++--- .../src/collator_side/tests.rs | 143 ++++++++++++++++- 2 files changed, 260 insertions(+), 27 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 5fae120b6156..19bedbecd601 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -14,12 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::{HashMap, HashSet}; +use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin}; -use futures::{FutureExt, channel::oneshot}; +use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future}; use sp_core::Pair; -use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId}; +use polkadot_primitives::v1::{ + AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, + Id as ParaId, +}; use polkadot_subsystem::{ FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, jaeger, messages::{ @@ -27,13 +30,11 @@ use polkadot_subsystem::{ }, }; use polkadot_node_network_protocol::{ - OurView, PeerId, View, peer_set::PeerSet, + OurView, PeerId, UnifiedReputationChange as Rep, View, peer_set::PeerSet, request_response::{ - IncomingRequest, - v1::{CollationFetchingRequest, CollationFetchingResponse}, + IncomingRequest, request::OutgoingResponse, v1::{CollationFetchingRequest, CollationFetchingResponse} }, v1 as protocol_v1, - UnifiedReputationChange as Rep, }; use polkadot_node_subsystem_util::{ metrics::{self, prometheus}, @@ -59,6 +60,12 @@ impl Metrics { } } + fn on_collation_sent_requested(&self) { + if let Some(metrics) = &self.0 { + metrics.collations_sent_requested.inc(); + } + } + fn on_collation_sent(&self) { if let Some(metrics) = &self.0 { metrics.collations_sent.inc(); @@ -75,6 +82,7 @@ impl Metrics { struct MetricsInner { advertisements_made: prometheus::Counter, collations_sent: prometheus::Counter, + collations_sent_requested: prometheus::Counter, process_msg: prometheus::Histogram, } @@ -90,6 +98,13 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + collations_sent_requested: prometheus::register( + prometheus::Counter::new( + "parachain_collations_sent_requested_total", + "A number of collations requested to be sent to validators.", + )?, + registry, + )?, collations_sent: prometheus::register( prometheus::Counter::new( "parachain_collations_sent_total", @@ -185,6 +200,17 @@ struct Collation { status: CollationStatus, } +/// Stores the state for waiting collation fetches. +#[derive(Default)] +struct WaitingCollationFetches { + /// Is there currently a collation being fetched? + collation_fetch_active: bool, + /// The collation fetches waiting to be fulfilled. + waiting: VecDeque>, +} + +type ActiveCollationFetches = FuturesUnordered + Send + 'static>>>; + struct State { /// Our network peer id. local_peer_id: PeerId, @@ -217,11 +243,23 @@ struct State { /// Our validator groups per active leaf. our_validators_groups: HashMap, - /// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s by `PeerConnected` events. + /// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s + /// by `PeerConnected` events. peer_ids: HashMap, /// Metrics. metrics: Metrics, + + /// All collation fetching requests that are still waiting to be answered. + /// + /// They are stored per relay parent, when our view changes and the relay parent moves out, we will cancel the fetch + /// request. + waiting_collation_fetches: HashMap, + + /// Active collation fetches. + /// + /// Each future returns the relay parent of the finished collation fetch. + active_collation_fetches: ActiveCollationFetches, } impl State { @@ -240,6 +278,8 @@ impl State { collation_result_senders: Default::default(), our_validators_groups: Default::default(), peer_ids: Default::default(), + waiting_collation_fetches: Default::default(), + active_collation_fetches: Default::default(), } } @@ -349,8 +389,9 @@ async fn distribute_collation( state.collations.insert(relay_parent, Collation { receipt, pov, status: CollationStatus::Created }); + let interested = state.peers_interested_in_leaf(&relay_parent); // Make sure already connected peers get collations: - for peer_id in state.peers_interested_in_leaf(&relay_parent) { + for peer_id in interested { advertise_collation(ctx, state, relay_parent, peer_id).await; } @@ -373,6 +414,7 @@ async fn determine_core( } } } + Ok(None) } @@ -455,7 +497,7 @@ async fn declare( async fn connect_to_validators( ctx: &mut impl SubsystemContext, validator_ids: Vec, -) { +) { // ignore address resolution failure // will reissue a new request on new collation let (failed, _) = oneshot::channel(); @@ -607,8 +649,18 @@ async fn process_msg( return Ok(()); }; + state.metrics.on_collation_sent_requested(); + let _span = _span.as_ref().map(|s| s.child("sending")); - send_collation(state, incoming, receipt, pov).await; + + let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default(); + + if waiting.collation_fetch_active { + waiting.waiting.push_back(incoming); + } else { + waiting.collation_fetch_active = true; + send_collation(state, incoming, receipt, pov).await; + } } else { tracing::warn!( target: LOG_TARGET, @@ -640,12 +692,28 @@ async fn send_collation( receipt: CandidateReceipt, pov: PoV, ) { - if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) { + let (tx, rx) = oneshot::channel(); + + let relay_parent = request.payload.relay_parent; + + let response = OutgoingResponse { + result: Ok(CollationFetchingResponse::Collation(receipt, pov)), + reputation_changes: Vec::new(), + sent_feedback: Some(tx), + }; + + if let Err(_) = request.send_outgoing_response(response) { tracing::warn!( target: LOG_TARGET, "Sending collation response failed", ); } + + state.active_collation_fetches.push(async move { + let _ = rx.await; + relay_parent + }.boxed()); + state.metrics.on_collation_sent(); } @@ -840,6 +908,7 @@ async fn handle_our_view_change( } state.our_validators_groups.remove(removed); state.span_per_relay_parent.remove(removed); + state.waiting_collation_fetches.remove(removed); } state.view = view; @@ -861,17 +930,48 @@ pub(crate) async fn run( let mut runtime = RuntimeInfo::new(None); loop { - let msg = ctx.recv().fuse().await.map_err(Fatal::SubsystemReceive)?; - match msg { - Communication { msg } => { - log_error( - process_msg(&mut ctx, &mut runtime, &mut state, msg).await, - "Failed to process message" - )?; + async fn wait_for_collation_fetch(active: &mut ActiveCollationFetches) -> Hash { + loop { + if active.is_empty() { + futures::pending!() + } else if let Some(res) = StreamExt::next(active).await { + return res + } + } + } + + select! { + msg = ctx.recv().fuse() => match msg.map_err(Fatal::SubsystemReceive)? { + Communication { msg } => { + log_error( + process_msg(&mut ctx, &mut runtime, &mut state, msg).await, + "Failed to process message" + )?; + }, + Signal(ActiveLeaves(_update)) => {} + Signal(BlockFinalized(..)) => {} + Signal(Conclude) => return Ok(()), }, - Signal(ActiveLeaves(_update)) => {} - Signal(BlockFinalized(..)) => {} - Signal(Conclude) => return Ok(()), + relay_parent = wait_for_collation_fetch(&mut state.active_collation_fetches).fuse() => { + let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) { + if let Some(next) = waiting.waiting.pop_front() { + next + } else { + waiting.collation_fetch_active = false; + continue + } + } else { + // No waiting collation fetches means we already removed the relay parent from our view. + continue + }; + + if let Some(collation) = state.collations.get(&relay_parent) { + let receipt = collation.receipt.clone(); + let pov = collation.pov.clone(); + + send_collation(&mut state, next, receipt, pov).await; + } + } } } } diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index 1153ff5c73b9..a716dbc16a78 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -20,6 +20,7 @@ use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; use futures::{executor, future, Future}; +use futures_timer::Delay; use sp_core::{crypto::Pair, Decode}; use sp_keyring::Sr25519Keyring; @@ -31,7 +32,10 @@ use polkadot_node_network_protocol::{ request_response::request::IncomingRequest, }; use polkadot_node_subsystem_util::TimeoutExt; -use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex}; +use polkadot_primitives::v1::{ + AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, + SessionInfo, ValidatorId, ValidatorIndex, +}; use polkadot_node_primitives::BlockData; use polkadot_subsystem::{ jaeger, @@ -196,6 +200,18 @@ fn test_harness>( collator_pair: CollatorPair, test: impl FnOnce(TestHarness) -> T, ) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_collator_protocol"), + log::LevelFilter::Trace, + ) + .filter( + Some(LOG_TARGET), + log::LevelFilter::Trace, + ) + .try_init(); + let pool = sp_core::testing::TaskExecutor::new(); let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); @@ -580,10 +596,7 @@ fn advertise_and_send_collation() { ) ).await; // Re-requesting collation should fail: - assert_matches!( - rx.await, - Err(_) => {} - ); + rx.await.unwrap_err(); assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); @@ -605,6 +618,126 @@ fn advertise_and_send_collation() { }); } +#[test] +fn send_only_one_collation_per_relay_parent_at_a_time() { + let test_state = TestState::default(); + let local_peer_id = test_state.local_peer_id.clone(); + let collator_pair = test_state.collator_pair.clone(); + + test_harness(local_peer_id, collator_pair, |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + + setup_system(&mut virtual_overseer, &test_state).await; + + let DistributeCollation { candidate, pov_block } = + distribute_collation(&mut virtual_overseer, &test_state, true).await; + + for (val, peer) in test_state.current_group_validator_authority_ids() + .into_iter() + .zip(test_state.current_group_validator_peer_ids()) + { + connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await; + } + + // We declare to the connected validators that we are a collator. + // We need to catch all `Declare` messages to the validators we've + // previosly connected to. + for peer_id in test_state.current_group_validator_peer_ids() { + expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await; + } + + let validator_0 = test_state.current_group_validator_peer_ids()[0].clone(); + let validator_1 = test_state.current_group_validator_peer_ids()[1].clone(); + + // Send info about peer's view. + send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await; + send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await; + + // The peer is interested in a leaf that we have a collation for; + // advertise it. + expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await; + + // Request a collation. + let (tx, rx) = oneshot::channel(); + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollationFetchingRequest( + IncomingRequest::new( + validator_0, + CollationFetchingRequest { + relay_parent: test_state.relay_parent, + para_id: test_state.para_id, + }, + tx, + ) + ) + ).await; + + // Keep the feedback channel alive because we need to use it to inform about the finished transfer. + let feedback_tx = assert_matches!( + rx.await, + Ok(full_response) => { + let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse + = CollationFetchingResponse::decode( + &mut full_response.result + .expect("We should have a proper answer").as_ref() + ) + .expect("Decoding should work"); + assert_eq!(receipt, candidate); + assert_eq!(pov, pov_block); + + full_response.sent_feedback.expect("Feedback channel is always set") + } + ); + + // Let the second validator request the collation. + let (tx, mut rx) = oneshot::channel(); + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollationFetchingRequest( + IncomingRequest::new( + validator_1, + CollationFetchingRequest { + relay_parent: test_state.relay_parent, + para_id: test_state.para_id, + }, + tx, + ) + ) + ).await; + + Delay::new(Duration::from_millis(500)).await; + + assert!( + rx.try_recv().unwrap().is_none(), + "We should not have send the collation yet to the second validator", + ); + + // Signal that the collation fetch is finished + feedback_tx.send(()).expect("Sending collation fetch finished"); + + // Now we should send it to the second validator + assert_matches!( + rx.await, + Ok(full_response) => { + let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse + = CollationFetchingResponse::decode( + &mut full_response.result + .expect("We should have a proper answer").as_ref() + ) + .expect("Decoding should work"); + assert_eq!(receipt, candidate); + assert_eq!(pov, pov_block); + + full_response.sent_feedback.expect("Feedback channel is always set") + } + ); + + virtual_overseer + }); +} + #[test] fn collators_declare_to_connected_peers() { let test_state = TestState::default(); From ec6342785ca7565ccda71e88ad43b5abb9b6b881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 24 Jun 2021 14:21:21 +0200 Subject: [PATCH 2/3] Guide updates --- .../src/node/collators/collator-protocol.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/roadmap/implementers-guide/src/node/collators/collator-protocol.md b/roadmap/implementers-guide/src/node/collators/collator-protocol.md index 1afbaeb77033..5ee273bc294a 100644 --- a/roadmap/implementers-guide/src/node/collators/collator-protocol.md +++ b/roadmap/implementers-guide/src/node/collators/collator-protocol.md @@ -60,7 +60,7 @@ As seen in the [Scheduler Module][SCH] of the runtime, validator groups are fixe * Determine the group on that core and the next group on that core. * Issue a discovery request for the validators of the current group and the next group with[`NetworkBridgeMessage`][NBM]`::ConnectToValidators`. -Once connected to the relevant peers for the current group assigned to the core (transitively, the para), advertise the collation to any of them which advertise the relay-parent in their view (as provided by the [Network Bridge][NB]). If any respond with a request for the full collation, provide it. Upon receiving a view update from any of these peers which includes a relay-parent for which we have a collation that they will find relevant, advertise the collation to them if we haven't already. +Once connected to the relevant peers for the current group assigned to the core (transitively, the para), advertise the collation to any of them which advertise the relay-parent in their view (as provided by the [Network Bridge][NB]). If any respond with a request for the full collation, provide it. However, we only send one collation at a time per relay parent, other requests need to wait. This is done to reduce the bandwidth requirements of a collator and also increases the chance to fully send the collation to at least one validator. From the point where one validator has received the collation and seconded it, it will also start to share this collation with other validators in its backing group. Upon receiving a view update from any of these peers which includes a relay-parent for which we have a collation that they will find relevant, advertise the collation to them if we haven't already. ### Validators @@ -104,7 +104,7 @@ The protocol tracks advertisements received and the source of the advertisement. As a validator, we will handle requests from other subsystems to fetch a collation on a specific `ParaId` and relay-parent. These requests are made with the request response protocol `CollationFetchingRequest` request. To do so, we need to first check if we have already gathered a collation on that `ParaId` and relay-parent. If not, we need to select one of the advertisements and issue a request for it. If we've already issued a request, we shouldn't issue another one until the first has returned. -When acting on an advertisement, we issue a `Requests::CollationFetching`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. +When acting on an advertisement, we issue a `Requests::CollationFetching`. However, we only request one collation at a time per relay parent. This reduces the bandwidth requirements and as we can second only one candidate per relay parent, the others are probably not required anyway. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator`. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it. If the collation is seconded, we notify the collator and apply a benefit to the `PeerId` associated with the collator. From acc529f49ea917c6190c509e2275101f7b868968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 28 Jun 2021 12:42:10 +0200 Subject: [PATCH 3/3] Review feedback. --- .../src/collator_side/mod.rs | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 19bedbecd601..4cc52b33fde7 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -62,7 +62,7 @@ impl Metrics { fn on_collation_sent_requested(&self) { if let Some(metrics) = &self.0 { - metrics.collations_sent_requested.inc(); + metrics.collations_send_requested.inc(); } } @@ -82,7 +82,7 @@ impl Metrics { struct MetricsInner { advertisements_made: prometheus::Counter, collations_sent: prometheus::Counter, - collations_sent_requested: prometheus::Counter, + collations_send_requested: prometheus::Counter, process_msg: prometheus::Histogram, } @@ -98,7 +98,7 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - collations_sent_requested: prometheus::register( + collations_send_requested: prometheus::register( prometheus::Counter::new( "parachain_collations_sent_requested_total", "A number of collations requested to be sent to validators.", @@ -203,7 +203,7 @@ struct Collation { /// Stores the state for waiting collation fetches. #[derive(Default)] struct WaitingCollationFetches { - /// Is there currently a collation being fetched? + /// Is there currently a collation getting fetched? collation_fetch_active: bool, /// The collation fetches waiting to be fulfilled. waiting: VecDeque>, @@ -930,16 +930,6 @@ pub(crate) async fn run( let mut runtime = RuntimeInfo::new(None); loop { - async fn wait_for_collation_fetch(active: &mut ActiveCollationFetches) -> Hash { - loop { - if active.is_empty() { - futures::pending!() - } else if let Some(res) = StreamExt::next(active).await { - return res - } - } - } - select! { msg = ctx.recv().fuse() => match msg.map_err(Fatal::SubsystemReceive)? { Communication { msg } => { @@ -952,7 +942,7 @@ pub(crate) async fn run( Signal(BlockFinalized(..)) => {} Signal(Conclude) => return Ok(()), }, - relay_parent = wait_for_collation_fetch(&mut state.active_collation_fetches).fuse() => { + relay_parent = state.active_collation_fetches.select_next_some() => { let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) { if let Some(next) = waiting.waiting.pop_front() { next