From f58e2b80c901176ff66376b49973195120559669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 2 Sep 2024 16:08:23 +0200 Subject: [PATCH] collator-protocol: Handle unknown validator heads (#5538) There is a race condition when a validator sends its heads to the collator, but the collator doesn't yet know these heads. Before it is aware of these heads by importing the block(s), any collation registered on the collator is not announced to the validators. The collations aren't advertised, because the collator doesn't know yet that these heads of the validator are descendants of the collations relay parent. The solution is to store these unknown heads of the validators and to handle them when the collator updates its own view. --- Cargo.lock | 5 +- Cargo.toml | 2 +- .../node/network/collator-protocol/Cargo.toml | 1 + .../src/collator_side/mod.rs | 78 ++++-- .../src/collator_side/tests/mod.rs | 87 ++++-- .../tests/prospective_parachains.rs | 262 ++++++++++-------- .../node/subsystem-test-helpers/src/lib.rs | 22 ++ prdoc/pr_5538.prdoc | 11 + 8 files changed, 297 insertions(+), 171 deletions(-) create mode 100644 prdoc/pr_5538.prdoc diff --git a/Cargo.lock b/Cargo.lock index c2cbb0f6d4f6..2074bb1d67ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13150,6 +13150,7 @@ dependencies = [ "rstest", "sc-keystore", "sc-network", + "schnellru", "sp-core", "sp-keyring", "sp-keystore", @@ -18797,9 +18798,9 @@ dependencies = [ [[package]] name = "schnellru" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d" +checksum = "c9a8ef13a93c54d20580de1e5c413e624e53121d42fc7e2c11d10ef7f8b02367" dependencies = [ "ahash 0.8.11", "cfg-if", diff --git a/Cargo.toml b/Cargo.toml index f8c430f58254..5ed5ce2f13cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1178,7 +1178,7 @@ sc-transaction-pool-api = { path = "substrate/client/transaction-pool/api", defa sc-utils = { path = "substrate/client/utils", default-features = false } scale-info = { version = "2.11.1", default-features = false } schemars = { version = "0.8.13", default-features = false } -schnellru = { version = "0.2.1" } +schnellru = { version = "0.2.3" } schnorrkel = { version = "0.11.4", default-features = false } seccompiler = { version = "0.4.0" } secp256k1 = { version = "0.28.0", default-features = false } diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index 8a7c384dcbe8..304cb23bb6aa 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -14,6 +14,7 @@ bitvec = { features = ["alloc"], workspace = true } futures = { workspace = true } futures-timer = { workspace = true } gum = { workspace = true, default-features = true } +schnellru.workspace = true sp-core = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 401525d02f11..97bc66d6058c 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -23,6 +23,7 @@ use bitvec::{bitvec, vec::BitVec}; use futures::{ channel::oneshot, future::Fuse, pin_mut, select, stream::FuturesUnordered, FutureExt, StreamExt, }; +use schnellru::{ByLength, LruMap}; use sp_core::Pair; use polkadot_node_network_protocol::{ @@ -42,7 +43,7 @@ use polkadot_node_subsystem::{ CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData, RuntimeApiMessage, }, - overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, PerLeafSpan, + overseer, FromOrchestra, OverseerSignal, PerLeafSpan, }; use polkadot_node_subsystem_util::{ backing_implicit_view::View as ImplicitView, @@ -200,6 +201,11 @@ struct PeerData { view: View, /// Network protocol version. version: CollationVersion, + /// Unknown heads in the view. + /// + /// This can happen when the validator is faster at importing a block and sending out its + /// `View` than the collator is able to import a block. + unknown_heads: LruMap, } /// A type wrapping a collation and it's designated core index. @@ -1198,9 +1204,10 @@ async fn handle_peer_view_change( peer_id: PeerId, view: View, ) { - let PeerData { view: current, version } = match state.peer_data.get_mut(&peer_id) { - Some(peer_data) => peer_data, - None => return, + let Some(PeerData { view: current, version, unknown_heads }) = + state.peer_data.get_mut(&peer_id) + else { + return }; let added: Vec = view.difference(&*current).cloned().collect(); @@ -1228,15 +1235,18 @@ async fn handle_peer_view_change( new_leaf = ?added, "New leaf in peer's view is unknown", ); + + unknown_heads.insert(added, ()); + continue }, }; for block_hash in block_hashes { - let per_relay_parent = match state.per_relay_parent.get_mut(block_hash) { - Some(per_relay_parent) => per_relay_parent, - None => continue, + let Some(per_relay_parent) = state.per_relay_parent.get_mut(block_hash) else { + continue }; + advertise_collation( ctx, *block_hash, @@ -1282,10 +1292,13 @@ async fn handle_network_msg( return Ok(()) }, }; - state - .peer_data - .entry(peer_id) - .or_insert_with(|| PeerData { view: View::default(), version }); + state.peer_data.entry(peer_id).or_insert_with(|| PeerData { + view: View::default(), + version, + // Unlikely that the collator is falling 10 blocks behind and if so, it probably is + // not able to keep up any way. + unknown_heads: LruMap::new(ByLength::new(10)), + }); if let Some(authority_ids) = maybe_authority { gum::trace!( @@ -1310,7 +1323,7 @@ async fn handle_network_msg( }, OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); - handle_our_view_change(ctx.sender(), state, view).await?; + handle_our_view_change(ctx, state, view).await?; }, PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; @@ -1332,21 +1345,19 @@ async fn handle_network_msg( } /// Handles our view changes. -async fn handle_our_view_change( - sender: &mut Sender, +#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)] +async fn handle_our_view_change( + ctx: &mut Context, state: &mut State, view: OurView, -) -> Result<()> -where - Sender: CollatorProtocolSenderTrait, -{ +) -> Result<()> { let current_leaves = state.active_leaves.clone(); let removed = current_leaves.iter().filter(|(h, _)| !view.contains(h)); let added = view.iter().filter(|h| !current_leaves.contains_key(h)); for leaf in added { - let mode = prospective_parachains_mode(sender, *leaf).await?; + let mode = prospective_parachains_mode(ctx.sender(), *leaf).await?; if let Some(span) = view.span_per_head().get(leaf).cloned() { let per_leaf_span = PerLeafSpan::new(span, "collator-side"); @@ -1359,7 +1370,7 @@ where if mode.is_enabled() { if let Some(ref mut implicit_view) = state.implicit_view { implicit_view - .activate_leaf(sender, *leaf) + .activate_leaf(ctx.sender(), *leaf) .await .map_err(Error::ImplicitViewFetchError)?; @@ -1367,11 +1378,36 @@ where .known_allowed_relay_parents_under(leaf, state.collating_on) .unwrap_or_default(); + // Get the peers that already reported us this head, but we didn't knew it at this + // point. + let peers = state + .peer_data + .iter_mut() + .filter_map(|(id, data)| { + data.unknown_heads.remove(leaf).map(|_| (id, data.version)) + }) + .collect::>(); + for block_hash in allowed_ancestry { - state + let per_relay_parent = state .per_relay_parent .entry(*block_hash) .or_insert_with(|| PerRelayParent::new(mode)); + + // Announce relevant collations to these peers. + for (peer_id, peer_version) in &peers { + advertise_collation( + ctx, + *block_hash, + per_relay_parent, + &peer_id, + *peer_version, + &state.peer_ids, + &mut state.advertisement_timeouts, + &state.metrics, + ) + .await; + } } } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 74a151c168dc..2f4c768b89e0 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -299,21 +299,33 @@ async fn overseer_send(overseer: &mut VirtualOverseer, msg: CollatorProtocolMess } async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { - let msg = overseer_recv_with_timeout(overseer, TIMEOUT) + overseer_recv_with_timeout(overseer, TIMEOUT) .await - .expect(&format!("{:?} is more than enough to receive messages", TIMEOUT)); + .expect(&format!("{:?} is more than enough to receive messages", TIMEOUT)) +} + +async fn overseer_recv_with_timeout( + overseer: &mut VirtualOverseer, + timeout: Duration, +) -> Option { + gum::trace!("waiting for message..."); + let msg = overseer.recv().timeout(timeout).await; gum::trace!(?msg, "received message"); msg } -async fn overseer_recv_with_timeout( +async fn overseer_peek_with_timeout( overseer: &mut VirtualOverseer, timeout: Duration, -) -> Option { - gum::trace!("waiting for message..."); - overseer.recv().timeout(timeout).await +) -> Option<&AllMessages> { + gum::trace!("peeking for message..."); + let msg = overseer.peek().timeout(timeout).await; + + gum::trace!(?msg, "received message"); + + msg.flatten() } async fn overseer_signal(overseer: &mut VirtualOverseer, signal: OverseerSignal) { @@ -603,7 +615,7 @@ async fn expect_declare_msg( /// Expects v2 message if `expected_candidate_hashes` is `Some`, v1 otherwise. async fn expect_advertise_collation_msg( virtual_overseer: &mut VirtualOverseer, - peer: &PeerId, + any_peers: &[PeerId], expected_relay_parent: Hash, expected_candidate_hashes: Option>, ) { @@ -620,7 +632,7 @@ async fn expect_advertise_collation_msg( wire_message, ) ) => { - assert_eq!(to[0], *peer); + assert!(any_peers.iter().any(|p| to.contains(p))); match (candidate_hashes.as_mut(), wire_message) { (None, Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message))) => { assert_matches!( @@ -739,7 +751,7 @@ fn advertise_and_send_collation() { // advertise it. expect_advertise_collation_msg( &mut virtual_overseer, - &peer, + &[peer], test_state.relay_parent, None, ) @@ -849,7 +861,7 @@ fn advertise_and_send_collation() { expect_advertise_collation_msg( &mut virtual_overseer, - &peer, + &[peer], test_state.relay_parent, None, ) @@ -910,7 +922,7 @@ fn delay_reputation_change() { // advertise it. expect_advertise_collation_msg( &mut virtual_overseer, - &peer, + &[peer], test_state.relay_parent, None, ) @@ -1031,7 +1043,7 @@ fn advertise_collation_v2_protocol() { // Versioned advertisements work. expect_advertise_collation_msg( virtual_overseer, - &peer_ids[0], + &[peer_ids[0]], test_state.relay_parent, None, ) @@ -1039,7 +1051,7 @@ fn advertise_collation_v2_protocol() { for peer_id in peer_ids.iter().skip(1) { expect_advertise_collation_msg( virtual_overseer, - peer_id, + &[*peer_id], test_state.relay_parent, Some(vec![candidate.hash()]), // This is `Some`, advertisement is v2. ) @@ -1142,15 +1154,25 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) .await; - expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent, None) - .await; + expect_advertise_collation_msg( + virtual_overseer, + &[peer2], + test_state.relay_parent, + None, + ) + .await; // The other validator announces that it changed its view. send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await; // After changing the view we should receive the advertisement - expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent, None) - .await; + expect_advertise_collation_msg( + virtual_overseer, + &[peer], + test_state.relay_parent, + None, + ) + .await; test_harness }, ) @@ -1199,12 +1221,17 @@ fn collate_on_two_different_relay_chain_blocks() { .await; send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await; - expect_advertise_collation_msg(virtual_overseer, &peer, old_relay_parent, None).await; + expect_advertise_collation_msg(virtual_overseer, &[peer], old_relay_parent, None).await; send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent, None) - .await; + expect_advertise_collation_msg( + virtual_overseer, + &[peer2], + test_state.relay_parent, + None, + ) + .await; test_harness }, ) @@ -1237,8 +1264,13 @@ fn validator_reconnect_does_not_advertise_a_second_time() { .await; send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await; - expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent, None) - .await; + expect_advertise_collation_msg( + virtual_overseer, + &[peer], + test_state.relay_parent, + None, + ) + .await; // Disconnect and reconnect directly disconnect_peer(virtual_overseer, peer).await; @@ -1361,14 +1393,14 @@ where // advertise it. expect_advertise_collation_msg( virtual_overseer, - &validator_0, + &[validator_0], test_state.relay_parent, None, ) .await; expect_advertise_collation_msg( virtual_overseer, - &validator_1, + &[validator_1], test_state.relay_parent, None, ) @@ -1498,9 +1530,10 @@ fn connect_to_buffered_groups() { } // Update views. - for peed_id in &peers_a { - send_peer_view_change(&mut virtual_overseer, peed_id, vec![head_a]).await; - expect_advertise_collation_msg(&mut virtual_overseer, peed_id, head_a, None).await; + for peer_id in &peers_a { + send_peer_view_change(&mut virtual_overseer, peer_id, vec![head_a]).await; + expect_advertise_collation_msg(&mut virtual_overseer, &[*peer_id], head_a, None) + .await; } let peer = peers_a[0]; diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index ea8fdb0e04fb..d3eae9dbba6e 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -45,7 +45,6 @@ async fn update_view( ) .await; - let mut next_overseer_message = None; for _ in 0..activated { let (leaf_hash, leaf_number) = assert_matches!( overseer_recv(virtual_overseer).await, @@ -147,18 +146,10 @@ async fn update_view( let parent_hash = ancestry_iter.peek().map(|(h, _)| *h).unwrap_or_else(|| get_parent_hash(hash)); - let msg = match next_overseer_message.take() { - Some(msg) => Some(msg), - None => - overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await, - }; - - let msg = match msg { - Some(msg) => msg, - None => { - // We're done. - return - }, + let Some(msg) = + overseer_peek_with_timeout(virtual_overseer, Duration::from_millis(50)).await + else { + return }; if !matches!( @@ -167,12 +158,11 @@ async fn update_view( if *_hash == hash ) { // Ancestry has already been cached for this leaf. - next_overseer_message.replace(msg); break } assert_matches!( - msg, + overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), AllMessages::ChainApi(ChainApiMessage::BlockHeader(.., tx)) => { let header = Header { parent_hash, @@ -238,124 +228,156 @@ fn distribute_collation_from_implicit_view() { let head_c = Hash::from_low_u64_be(130); let head_c_num = 62; - let group_rotation_info = GroupRotationInfo { - session_start_block: head_c_num - 2, - group_rotation_frequency: 3, - now: head_c_num, - }; + // Run once with validators sending their view first and then the collator setting their own + // view first. + for validator_sends_view_first in [true, false] { + let group_rotation_info = GroupRotationInfo { + session_start_block: head_c_num - 2, + group_rotation_frequency: 3, + now: head_c_num, + }; + + let mut test_state = TestState::default(); + test_state.group_rotation_info = group_rotation_info; + + let local_peer_id = test_state.local_peer_id; + let collator_pair = test_state.collator_pair.clone(); + + test_harness( + local_peer_id, + collator_pair, + ReputationAggregator::new(|_| true), + |mut test_harness| async move { + let virtual_overseer = &mut test_harness.virtual_overseer; + + // Set collating para id. + overseer_send( + virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.para_id), + ) + .await; - let mut test_state = TestState::default(); - test_state.group_rotation_info = group_rotation_info; + if validator_sends_view_first { + // Activate leaf `c` to accept at least the collation. + update_view(virtual_overseer, vec![(head_c, head_c_num)], 1).await; + } else { + // Activated leaf is `b`, but the collation will be based on `c`. + update_view(virtual_overseer, vec![(head_b, head_b_num)], 1).await; + } - let local_peer_id = test_state.local_peer_id; - let collator_pair = test_state.collator_pair.clone(); + let validator_peer_ids = test_state.current_group_validator_peer_ids(); + for (val, peer) in test_state + .current_group_validator_authority_ids() + .into_iter() + .zip(validator_peer_ids.clone()) + { + connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(val.clone())) + .await; + } - test_harness( - local_peer_id, - collator_pair, - ReputationAggregator::new(|_| true), - |mut test_harness| async move { - let virtual_overseer = &mut test_harness.virtual_overseer; + // Collator declared itself to each peer. + for peer_id in &validator_peer_ids { + expect_declare_msg_v2(virtual_overseer, &test_state, peer_id).await; + } - // Set collating para id. - overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) - .await; - // Activated leaf is `b`, but the collation will be based on `c`. - update_view(virtual_overseer, vec![(head_b, head_b_num)], 1).await; - - let validator_peer_ids = test_state.current_group_validator_peer_ids(); - for (val, peer) in test_state - .current_group_validator_authority_ids() - .into_iter() - .zip(validator_peer_ids.clone()) - { - connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(val.clone())).await; - } + let pov = PoV { block_data: BlockData(vec![1, 2, 3]) }; + let parent_head_data_hash = Hash::repeat_byte(0xAA); + let candidate = TestCandidateBuilder { + para_id: test_state.para_id, + relay_parent: head_c, + pov_hash: pov.hash(), + ..Default::default() + } + .build(); + let DistributeCollation { candidate, pov_block: _ } = + distribute_collation_with_receipt( + virtual_overseer, + &test_state, + head_c, + false, // Check the group manually. + candidate, + pov, + parent_head_data_hash, + ) + .await; + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } + ) => { + let expected_validators = test_state.current_group_validator_authority_ids(); - // Collator declared itself to each peer. - for peer_id in &validator_peer_ids { - expect_declare_msg_v2(virtual_overseer, &test_state, peer_id).await; - } + assert_eq!(expected_validators, validator_ids); + } + ); - let pov = PoV { block_data: BlockData(vec![1, 2, 3]) }; - let parent_head_data_hash = Hash::repeat_byte(0xAA); - let candidate = TestCandidateBuilder { - para_id: test_state.para_id, - relay_parent: head_c, - pov_hash: pov.hash(), - ..Default::default() - } - .build(); - let DistributeCollation { candidate, pov_block: _ } = - distribute_collation_with_receipt( - virtual_overseer, - &test_state, - head_c, - false, // Check the group manually. - candidate, - pov, - parent_head_data_hash, - ) - .await; - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - let expected_validators = test_state.current_group_validator_authority_ids(); + let candidate_hash = candidate.hash(); + + // Update peer views. + for peer_id in &validator_peer_ids { + send_peer_view_change(virtual_overseer, peer_id, vec![head_b]).await; - assert_eq!(expected_validators, validator_ids); + if !validator_sends_view_first { + expect_advertise_collation_msg( + virtual_overseer, + &[*peer_id], + head_c, + Some(vec![candidate_hash]), + ) + .await; + } } - ); - let candidate_hash = candidate.hash(); + if validator_sends_view_first { + // Activated leaf is `b`, but the collation will be based on `c`. + update_view(virtual_overseer, vec![(head_b, head_b_num)], 1).await; + + for _ in &validator_peer_ids { + expect_advertise_collation_msg( + virtual_overseer, + &validator_peer_ids, + head_c, + Some(vec![candidate_hash]), + ) + .await; + } + } - // Update peer views. - for peed_id in &validator_peer_ids { - send_peer_view_change(virtual_overseer, peed_id, vec![head_b]).await; - expect_advertise_collation_msg( + // Head `c` goes out of view. + // Build a different candidate for this relay parent and attempt to distribute it. + update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await; + + let pov = PoV { block_data: BlockData(vec![4, 5, 6]) }; + let parent_head_data_hash = Hash::repeat_byte(0xBB); + let candidate = TestCandidateBuilder { + para_id: test_state.para_id, + relay_parent: head_c, + pov_hash: pov.hash(), + ..Default::default() + } + .build(); + overseer_send( virtual_overseer, - peed_id, - head_c, - Some(vec![candidate_hash]), + CollatorProtocolMessage::DistributeCollation { + candidate_receipt: candidate.clone(), + parent_head_data_hash, + pov: pov.clone(), + parent_head_data: HeadData(vec![1, 2, 3]), + result_sender: None, + core_index: CoreIndex(0), + }, ) .await; - } - - // Head `c` goes out of view. - // Build a different candidate for this relay parent and attempt to distribute it. - update_view(virtual_overseer, vec![(head_a, head_a_num)], 1).await; - - let pov = PoV { block_data: BlockData(vec![4, 5, 6]) }; - let parent_head_data_hash = Hash::repeat_byte(0xBB); - let candidate = TestCandidateBuilder { - para_id: test_state.para_id, - relay_parent: head_c, - pov_hash: pov.hash(), - ..Default::default() - } - .build(); - overseer_send( - virtual_overseer, - CollatorProtocolMessage::DistributeCollation { - candidate_receipt: candidate.clone(), - parent_head_data_hash, - pov: pov.clone(), - parent_head_data: HeadData(vec![1, 2, 3]), - result_sender: None, - core_index: CoreIndex(0), - }, - ) - .await; - // Parent out of view, nothing happens. - assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) - .await - .is_none()); + // Parent out of view, nothing happens. + assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) + .await + .is_none()); - test_harness - }, - ) + test_harness + }, + ); + } } /// Tests that collator can distribute up to `MAX_CANDIDATE_DEPTH + 1` candidates @@ -505,7 +527,7 @@ fn send_parent_head_data_for_elastic_scaling() { send_peer_view_change(&mut virtual_overseer, &peer, vec![head_b]).await; let hashes: Vec<_> = vec![candidate.hash()]; - expect_advertise_collation_msg(&mut virtual_overseer, &peer, head_b, Some(hashes)) + expect_advertise_collation_msg(&mut virtual_overseer, &[peer], head_b, Some(hashes)) .await; let (pending_response, rx) = oneshot::channel(); @@ -625,7 +647,7 @@ fn advertise_and_send_collation_by_hash() { // Head `b` is not a leaf, but both advertisements are still relevant. send_peer_view_change(&mut virtual_overseer, &peer, vec![head_b]).await; let hashes: Vec<_> = candidates.iter().map(|(candidate, _)| candidate.hash()).collect(); - expect_advertise_collation_msg(&mut virtual_overseer, &peer, head_b, Some(hashes)) + expect_advertise_collation_msg(&mut virtual_overseer, &[peer], head_b, Some(hashes)) .await; for (candidate, pov_block) in candidates { diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index bdb0647fee6f..5b1f8d3223d1 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -294,6 +294,9 @@ pub struct TestSubsystemContextHandle { /// Message counter over subsystems. pub message_counter: MessageCounter, + + /// Intermediate buffer for a message when using `peek`. + message_buffer: Option, } impl TestSubsystemContextHandle { @@ -323,12 +326,30 @@ impl TestSubsystemContextHandle { /// Receive the next message from the subsystem, or `None` if the channel has been closed. pub async fn try_recv(&mut self) -> Option { + if let Some(msg) = self.message_buffer.take() { + return Some(msg) + } + self.rx .next() .timeout(Self::TIMEOUT) .await .expect("`try_recv` does not timeout") } + + /// Peek into the next message from the subsystem or `None` if the channel has been closed. + pub async fn peek(&mut self) -> Option<&AllMessages> { + if self.message_buffer.is_none() { + self.message_buffer = self + .rx + .next() + .timeout(Self::TIMEOUT) + .await + .expect("`try_recv` does not timeout"); + } + + self.message_buffer.as_ref() + } } /// Make a test subsystem context with `buffer_size == 0`. This is used by most @@ -392,6 +413,7 @@ pub fn make_buffered_subsystem_context( tx: overseer_tx, rx: all_messages_rx, message_counter: message_counter.clone(), + message_buffer: None, }, ) } diff --git a/prdoc/pr_5538.prdoc b/prdoc/pr_5538.prdoc new file mode 100644 index 000000000000..5924f9789040 --- /dev/null +++ b/prdoc/pr_5538.prdoc @@ -0,0 +1,11 @@ +title: "collator-protocol: Remove race condition" + +doc: + - audience: Node Dev + description: | + Remove a race condition in the collator protocol that could lead + to collations not being announced to a validator. + +crates: + - name: polkadot-collator-protocol + bump: patch