diff --git a/polkadot/node/network/collator-protocol/src/validator_side/collation.rs b/polkadot/node/network/collator-protocol/src/validator_side/collation.rs index a53e0028b9e7..d6f34fc81b82 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/collation.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/collation.rs @@ -31,6 +31,7 @@ use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll}; use futures::{future::BoxFuture, FutureExt}; use polkadot_node_network_protocol::{ + peer_set::CollationVersion, request_response::{outgoing::RequestError, v1 as request_v1, OutgoingResult}, PeerId, }; @@ -160,6 +161,8 @@ pub fn fetched_collation_sanity_check( pub struct CollationEvent { /// Collator id. pub collator_id: CollatorId, + /// The network protocol version the collator is using. + pub collator_protocol_version: CollationVersion, /// The requested collation data. pub pending_collation: PendingCollation, } @@ -307,6 +310,8 @@ pub(super) struct CollationFetchRequest { pub pending_collation: PendingCollation, /// Collator id. pub collator_id: CollatorId, + /// The network protocol version the collator is using. + pub collator_protocol_version: CollationVersion, /// Responses from collator. pub from_collator: BoxFuture<'static, OutgoingResult>, /// Handle used for checking if this request was cancelled. @@ -334,6 +339,7 @@ impl Future for CollationFetchRequest { self.span.as_mut().map(|s| s.add_string_tag("success", "false")); return Poll::Ready(( CollationEvent { + collator_protocol_version: self.collator_protocol_version, collator_id: self.collator_id.clone(), pending_collation: self.pending_collation, }, @@ -344,6 +350,7 @@ impl Future for CollationFetchRequest { let res = self.from_collator.poll_unpin(cx).map(|res| { ( CollationEvent { + collator_protocol_version: self.collator_protocol_version, collator_id: self.collator_id.clone(), pending_collation: self.pending_collation, }, diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index fcb408d54b1b..00bf50013a53 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -85,6 +85,8 @@ const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error"); const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature"); const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem"); const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para"); +const COST_PROTOCOL_MISUSE: Rep = + Rep::Malicious("A collator advertising a collation for an async backing relay parent using V1"); const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected"); const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem"); @@ -144,9 +146,6 @@ enum InsertAdvertisementError { UndeclaredCollator, /// A limit for announcements per peer is reached. PeerLimitReached, - /// Mismatch of relay parent mode and advertisement arguments. - /// An internal error that should not happen. - ProtocolMismatch, } #[derive(Debug)] @@ -252,23 +251,41 @@ impl PeerData { }, ( ProspectiveParachainsMode::Enabled { max_candidate_depth, .. }, - Some(candidate_hash), + candidate_hash, ) => { - if state - .advertisements - .get(&on_relay_parent) - .map_or(false, |candidates| candidates.contains(&candidate_hash)) - { - return Err(InsertAdvertisementError::Duplicate) - } - let candidates = state.advertisements.entry(on_relay_parent).or_default(); - - if candidates.len() > max_candidate_depth { - return Err(InsertAdvertisementError::PeerLimitReached) - } - candidates.insert(candidate_hash); + if let Some(candidate_hash) = candidate_hash { + if state + .advertisements + .get(&on_relay_parent) + .map_or(false, |candidates| candidates.contains(&candidate_hash)) + { + return Err(InsertAdvertisementError::Duplicate) + } + + let candidates = + state.advertisements.entry(on_relay_parent).or_default(); + + if candidates.len() > max_candidate_depth { + return Err(InsertAdvertisementError::PeerLimitReached) + } + candidates.insert(candidate_hash); + } else { + if self.version != CollationVersion::V1 { + gum::error!( + target: LOG_TARGET, + "Programming error, `candidate_hash` can not be `None` \ + for non `V1` networking.", + ); + } + + if state.advertisements.contains_key(&on_relay_parent) { + return Err(InsertAdvertisementError::Duplicate) + } + state + .advertisements + .insert(on_relay_parent, HashSet::from_iter(candidate_hash)); + }; }, - _ => return Err(InsertAdvertisementError::ProtocolMismatch), } state.last_active = Instant::now(); @@ -705,6 +722,7 @@ async fn request_collation( let collation_request = CollationFetchRequest { pending_collation, collator_id: collator_id.clone(), + collator_protocol_version: peer_protocol_version, from_collator: response_recv.boxed(), cancellation_token: cancellation_token.clone(), span: state @@ -920,10 +938,11 @@ enum AdvertisementError { UndeclaredCollator, /// We're assigned to a different para at the given relay parent. InvalidAssignment, - /// An advertisement format doesn't match the relay parent. - ProtocolMismatch, /// Para reached a limit of seconded candidates for this relay parent. SecondedLimitReached, + /// Collator trying to advertise a collation using V1 protocol for an async backing relay + /// parent. + ProtocolMisuse, /// Advertisement is invalid. Invalid(InsertAdvertisementError), } @@ -933,8 +952,9 @@ impl AdvertisementError { use AdvertisementError::*; match self { InvalidAssignment => Some(COST_WRONG_PARA), + ProtocolMisuse => Some(COST_PROTOCOL_MISUSE), RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE), - UnknownPeer | ProtocolMismatch | SecondedLimitReached => None, + UnknownPeer | SecondedLimitReached => None, } } } @@ -1042,6 +1062,13 @@ where .get(&relay_parent) .map(|s| s.child("advertise-collation")); + let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?; + + if peer_data.version == CollationVersion::V1 && !state.active_leaves.contains_key(&relay_parent) + { + return Err(AdvertisementError::ProtocolMisuse) + } + let per_relay_parent = state .per_relay_parent .get(&relay_parent) @@ -1050,20 +1077,12 @@ where let relay_parent_mode = per_relay_parent.prospective_parachains_mode; let assignment = &per_relay_parent.assignment; - let peer_data = state.peer_data.get_mut(&peer_id).ok_or(AdvertisementError::UnknownPeer)?; let collator_para_id = peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?; - match assignment.current { - Some(id) if id == collator_para_id => { - // Our assignment. - }, - _ => return Err(AdvertisementError::InvalidAssignment), - }; - - if relay_parent_mode.is_enabled() && prospective_candidate.is_none() { - // Expected v2 advertisement. - return Err(AdvertisementError::ProtocolMismatch) + // Check if this is assigned to us. + if assignment.current.map_or(true, |id| id != collator_para_id) { + return Err(AdvertisementError::InvalidAssignment) } // Always insert advertisements that pass all the checks for spam protection. @@ -1077,13 +1096,17 @@ where &state.active_leaves, ) .map_err(AdvertisementError::Invalid)?; + if !per_relay_parent.collations.is_seconded_limit_reached(relay_parent_mode) { return Err(AdvertisementError::SecondedLimitReached) } if let Some((candidate_hash, parent_head_data_hash)) = prospective_candidate { - let is_seconding_allowed = !relay_parent_mode.is_enabled() || - can_second( + // We need to queue the advertisement if we are not allowed to second it. + // + // This is also only important when async backing is enabled. + let queue_advertisement = relay_parent_mode.is_enabled() && + !can_second( sender, collator_para_id, relay_parent, @@ -1092,7 +1115,7 @@ where ) .await; - if !is_seconding_allowed { + if queue_advertisement { gum::debug!( target: LOG_TARGET, relay_parent = ?relay_parent, @@ -1125,6 +1148,7 @@ where prospective_candidate, ) .await; + if let Err(fetch_error) = result { gum::debug!( target: LOG_TARGET, @@ -1477,7 +1501,7 @@ async fn process_msg( }, }; let fetched_collation = FetchedCollation::from(&receipt.to_plain()); - if let Some(CollationEvent { collator_id, pending_collation }) = + if let Some(CollationEvent { collator_id, pending_collation, .. }) = state.fetched_candidates.remove(&fetched_collation) { let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } = @@ -1635,7 +1659,7 @@ async fn run_inner( Ok(res) => res }; - let CollationEvent {collator_id, pending_collation} = res.collation_event.clone(); + let CollationEvent {collator_id, pending_collation, .. } = res.collation_event.clone(); if let Err(err) = kick_off_seconding(&mut ctx, &mut state, res).await { gum::warn!( target: LOG_TARGET, @@ -1783,39 +1807,39 @@ async fn kick_off_seconding( }, }; let collations = &mut per_relay_parent.collations; - let relay_parent_mode = per_relay_parent.prospective_parachains_mode; let fetched_collation = FetchedCollation::from(&candidate_receipt); if let Entry::Vacant(entry) = state.fetched_candidates.entry(fetched_collation) { collation_event.pending_collation.commitments_hash = Some(candidate_receipt.commitments_hash); - let pvd = - match (relay_parent_mode, collation_event.pending_collation.prospective_candidate) { - ( - ProspectiveParachainsMode::Enabled { .. }, - Some(ProspectiveCandidate { parent_head_data_hash, .. }), - ) => - request_prospective_validation_data( - ctx.sender(), - relay_parent, - parent_head_data_hash, - pending_collation.para_id, - ) - .await?, - (ProspectiveParachainsMode::Disabled, _) => - request_persisted_validation_data( - ctx.sender(), - candidate_receipt.descriptor().relay_parent, - candidate_receipt.descriptor().para_id, - ) - .await?, - _ => { - // `handle_advertisement` checks for protocol mismatch. - return Ok(()) - }, - } - .ok_or(SecondingError::PersistedValidationDataNotFound)?; + let pvd = match ( + collation_event.collator_protocol_version, + collation_event.pending_collation.prospective_candidate, + ) { + (CollationVersion::V2, Some(ProspectiveCandidate { parent_head_data_hash, .. })) + if per_relay_parent.prospective_parachains_mode.is_enabled() => + request_prospective_validation_data( + ctx.sender(), + relay_parent, + parent_head_data_hash, + pending_collation.para_id, + ) + .await?, + // Support V2 collators without async backing enabled. + (CollationVersion::V2, Some(_)) | (CollationVersion::V1, _) => + request_persisted_validation_data( + ctx.sender(), + candidate_receipt.descriptor().relay_parent, + candidate_receipt.descriptor().para_id, + ) + .await?, + _ => { + // `handle_advertisement` checks for protocol mismatch. + return Ok(()) + }, + } + .ok_or(SecondingError::PersistedValidationDataNotFound)?; fetched_collation_sanity_check( &collation_event.pending_collation, @@ -1864,7 +1888,8 @@ async fn handle_collation_fetch_response( network_error_freq: &mut gum::Freq, canceled_freq: &mut gum::Freq, ) -> std::result::Result> { - let (CollationEvent { collator_id, pending_collation }, response) = response; + let (CollationEvent { collator_id, collator_protocol_version, pending_collation }, response) = + response; // Remove the cancellation handle, as the future already completed. state.collation_requests_cancel_handles.remove(&pending_collation); @@ -1970,7 +1995,11 @@ async fn handle_collation_fetch_response( metrics_result = Ok(()); Ok(PendingCollationFetch { - collation_event: CollationEvent { collator_id, pending_collation }, + collation_event: CollationEvent { + collator_id, + pending_collation, + collator_protocol_version, + }, candidate_receipt, pov, }) diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs index 9812998aab76..3a9740149948 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests/mod.rs @@ -269,15 +269,15 @@ async fn assert_candidate_backing_second( expected_relay_parent: Hash, expected_para_id: ParaId, expected_pov: &PoV, - mode: ProspectiveParachainsMode, + version: CollationVersion, ) -> CandidateReceipt { let pvd = dummy_pvd(); // Depending on relay parent mode pvd will be either requested // from the Runtime API or Prospective Parachains. let msg = overseer_recv(virtual_overseer).await; - match mode { - ProspectiveParachainsMode::Disabled => assert_matches!( + match version { + CollationVersion::V1 => assert_matches!( msg, AllMessages::RuntimeApi(RuntimeApiMessage::Request( hash, @@ -289,7 +289,7 @@ async fn assert_candidate_backing_second( tx.send(Ok(Some(pvd.clone()))).unwrap(); } ), - ProspectiveParachainsMode::Enabled { .. } => assert_matches!( + CollationVersion::V2 => assert_matches!( msg, AllMessages::ProspectiveParachains( ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx), @@ -532,7 +532,14 @@ fn act_on_advertisement_v2() { ) .await; - let candidate_hash = CandidateHash::default(); + let pov = PoV { block_data: BlockData(vec![]) }; + let mut candidate_a = + dummy_candidate_receipt_bad_sig(dummy_hash(), Some(Default::default())); + candidate_a.descriptor.para_id = test_state.chain_ids[0]; + candidate_a.descriptor.relay_parent = test_state.relay_parent; + candidate_a.descriptor.persisted_validation_data_hash = dummy_pvd().hash(); + + let candidate_hash = candidate_a.hash(); let parent_head_data_hash = Hash::zero(); // v2 advertisement. advertise_collation( @@ -543,7 +550,7 @@ fn act_on_advertisement_v2() { ) .await; - assert_fetch_collation_request( + let response_channel = assert_fetch_collation_request( &mut virtual_overseer, test_state.relay_parent, test_state.chain_ids[0], @@ -551,6 +558,24 @@ fn act_on_advertisement_v2() { ) .await; + response_channel + .send(Ok(request_v1::CollationFetchingResponse::Collation( + candidate_a.clone(), + pov.clone(), + ) + .encode())) + .expect("Sending response should succeed"); + + assert_candidate_backing_second( + &mut virtual_overseer, + test_state.relay_parent, + test_state.chain_ids[0], + &pov, + // Async backing isn't enabled and thus it should do it the old way. + CollationVersion::V1, + ) + .await; + virtual_overseer }); } @@ -748,7 +773,7 @@ fn fetch_one_collation_at_a_time() { test_state.relay_parent, test_state.chain_ids[0], &pov, - ProspectiveParachainsMode::Disabled, + CollationVersion::V1, ) .await; @@ -880,7 +905,7 @@ fn fetches_next_collation() { second, test_state.chain_ids[0], &pov, - ProspectiveParachainsMode::Disabled, + CollationVersion::V1, ) .await; @@ -1010,7 +1035,7 @@ fn fetch_next_collation_on_invalid_collation() { test_state.relay_parent, test_state.chain_ids[0], &pov, - ProspectiveParachainsMode::Disabled, + CollationVersion::V1, ) .await; diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs index 4da0f11da390..c5236ef3eb21 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs @@ -74,7 +74,7 @@ async fn assert_assign_incoming( } /// Handle a view update. -async fn update_view( +pub(super) async fn update_view( virtual_overseer: &mut VirtualOverseer, test_state: &TestState, new_view: Vec<(Hash, u32)>, // Hash and block number. @@ -212,6 +212,7 @@ async fn assert_collation_seconded( virtual_overseer: &mut VirtualOverseer, relay_parent: Hash, peer_id: PeerId, + version: CollationVersion, ) { assert_matches!( overseer_recv(virtual_overseer).await, @@ -222,29 +223,51 @@ async fn assert_collation_seconded( assert_eq!(rep.value, BENEFIT_NOTIFY_GOOD.cost_or_benefit()); } ); - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendCollationMessage( - peers, - Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol( - protocol_v2::CollatorProtocolMessage::CollationSeconded( - _relay_parent, - .., - ), - )), - )) => { - assert_eq!(peers, vec![peer_id]); - assert_eq!(relay_parent, _relay_parent); - } - ); + + match version { + CollationVersion::V1 => { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendCollationMessage( + peers, + Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol( + protocol_v1::CollatorProtocolMessage::CollationSeconded( + _relay_parent, + .., + ), + )), + )) => { + assert_eq!(peers, vec![peer_id]); + assert_eq!(relay_parent, _relay_parent); + } + ); + }, + CollationVersion::V2 => { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendCollationMessage( + peers, + Versioned::V2(protocol_v2::CollationProtocol::CollatorProtocol( + protocol_v2::CollatorProtocolMessage::CollationSeconded( + _relay_parent, + .., + ), + )), + )) => { + assert_eq!(peers, vec![peer_id]); + assert_eq!(relay_parent, _relay_parent); + } + ); + }, + } } #[test] -fn v1_advertisement_rejected() { +fn v1_advertisement_accepted_and_seconded() { let test_state = TestState::default(); test_harness(ReputationAggregator::new(|_| true), |test_harness| async move { - let TestHarness { mut virtual_overseer, .. } = test_harness; + let TestHarness { mut virtual_overseer, keystore } = test_harness; let pair_a = CollatorPair::generate().0; @@ -267,9 +290,94 @@ fn v1_advertisement_rejected() { advertise_collation(&mut virtual_overseer, peer_a, head_b, None).await; - // Not reported. - test_helpers::Yield::new().await; - assert_matches!(virtual_overseer.recv().now_or_never(), None); + let response_channel = assert_fetch_collation_request( + &mut virtual_overseer, + head_b, + test_state.chain_ids[0], + None, + ) + .await; + + let mut candidate = dummy_candidate_receipt_bad_sig(head_b, Some(Default::default())); + candidate.descriptor.para_id = test_state.chain_ids[0]; + candidate.descriptor.persisted_validation_data_hash = dummy_pvd().hash(); + let commitments = CandidateCommitments { + head_data: HeadData(vec![1 as u8]), + horizontal_messages: Default::default(), + upward_messages: Default::default(), + new_validation_code: None, + processed_downward_messages: 0, + hrmp_watermark: 0, + }; + candidate.commitments_hash = commitments.hash(); + + let pov = PoV { block_data: BlockData(vec![1]) }; + + response_channel + .send(Ok(request_v2::CollationFetchingResponse::Collation( + candidate.clone(), + pov.clone(), + ) + .encode())) + .expect("Sending response should succeed"); + + assert_candidate_backing_second( + &mut virtual_overseer, + head_b, + test_state.chain_ids[0], + &pov, + CollationVersion::V1, + ) + .await; + + let candidate = CommittedCandidateReceipt { descriptor: candidate.descriptor, commitments }; + + send_seconded_statement(&mut virtual_overseer, keystore.clone(), &candidate).await; + + assert_collation_seconded(&mut virtual_overseer, head_b, peer_a, CollationVersion::V1) + .await; + + virtual_overseer + }); +} + +#[test] +fn v1_advertisement_rejected_on_non_active_leave() { + let test_state = TestState::default(); + + test_harness(ReputationAggregator::new(|_| true), |test_harness| async move { + let TestHarness { mut virtual_overseer, .. } = test_harness; + + let pair_a = CollatorPair::generate().0; + + let head_b = Hash::from_low_u64_be(128); + let head_b_num: u32 = 5; + + update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await; + + let peer_a = PeerId::random(); + + // Accept both collators from the implicit view. + connect_and_declare_collator( + &mut virtual_overseer, + peer_a, + pair_a.clone(), + test_state.chain_ids[0], + CollationVersion::V1, + ) + .await; + + advertise_collation(&mut virtual_overseer, peer_a, get_parent_hash(head_b), None).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(peer, rep)), + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep.value, COST_PROTOCOL_MISUSE.cost_or_benefit()); + } + ); virtual_overseer }); @@ -469,10 +577,7 @@ fn second_multiple_candidates_per_relay_parent() { head_c, test_state.chain_ids[0], &pov, - ProspectiveParachainsMode::Enabled { - max_candidate_depth: ASYNC_BACKING_PARAMETERS.max_candidate_depth as _, - allowed_ancestry_len: ASYNC_BACKING_PARAMETERS.allowed_ancestry_len as _, - }, + CollationVersion::V2, ) .await; @@ -481,7 +586,8 @@ fn second_multiple_candidates_per_relay_parent() { send_seconded_statement(&mut virtual_overseer, keystore.clone(), &candidate).await; - assert_collation_seconded(&mut virtual_overseer, head_c, peer_a).await; + assert_collation_seconded(&mut virtual_overseer, head_c, peer_a, CollationVersion::V2) + .await; } // No more advertisements can be made for this relay parent.