diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index da98a1d29c17..c4778fe6f639 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -68,7 +68,9 @@ use futures::{ }; use std::{ - collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}, + collections::{ + btree_map::Entry as BTMEntry, hash_map::Entry as HMEntry, BTreeMap, HashMap, HashSet, + }, sync::Arc, time::Duration, }; @@ -400,7 +402,7 @@ impl Wakeups { } // we are replacing previous wakeup with an earlier one. - if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) { + if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(*prev) { if let Some(pos) = entry.get().iter().position(|x| x == &(block_hash, candidate_hash)) { @@ -436,7 +438,7 @@ impl Wakeups { }); for (tick, pruned) in pruned_wakeups { - if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) { + if let BTMEntry::Occupied(mut entry) = self.wakeups.entry(tick) { entry.get_mut().retain(|wakeup| !pruned.contains(wakeup)); if entry.get().is_empty() { let _ = entry.remove(); @@ -457,10 +459,10 @@ impl Wakeups { Some(tick) => { clock.wait(tick).await; match self.wakeups.entry(tick) { - Entry::Vacant(_) => { + BTMEntry::Vacant(_) => { panic!("entry is known to exist since `first` was `Some`; qed") }, - Entry::Occupied(mut entry) => { + BTMEntry::Occupied(mut entry) => { let (hash, candidate_hash) = entry.get_mut().pop() .expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed"); @@ -507,9 +509,7 @@ impl ApprovalState { } struct CurrentlyCheckingSet { - /// Invariant: The contained `Vec` needs to stay sorted as we are using `binary_search_by_key` - /// on it. - candidate_hash_map: HashMap>, + candidate_hash_map: HashMap>, currently_checking: FuturesUnordered>, } @@ -529,21 +529,26 @@ impl CurrentlyCheckingSet { relay_block: Hash, launch_work: impl Future>>, ) -> SubsystemResult<()> { - let val = self.candidate_hash_map.entry(candidate_hash).or_insert(Default::default()); - - if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) { - let _ = val.insert(k, relay_block); - let work = launch_work.await?; - self.currently_checking.push(Box::pin(async move { - match work.timeout(APPROVAL_CHECKING_TIMEOUT).await { - None => ApprovalState { - candidate_hash, - validator_index, - approval_outcome: ApprovalOutcome::TimedOut, - }, - Some(approval_state) => approval_state, - } - })); + match self.candidate_hash_map.entry(candidate_hash) { + HMEntry::Occupied(mut entry) => { + // validation already undergoing. just add the relay hash if unknown. + entry.get_mut().insert(relay_block); + }, + HMEntry::Vacant(entry) => { + // validation not ongoing. launch work and time out the remote handle. + entry.insert(HashSet::new()).insert(relay_block); + let work = launch_work.await?; + self.currently_checking.push(Box::pin(async move { + match work.timeout(APPROVAL_CHECKING_TIMEOUT).await { + None => ApprovalState { + candidate_hash, + validator_index, + approval_outcome: ApprovalOutcome::TimedOut, + }, + Some(approval_state) => approval_state, + } + })); + }, } Ok(()) @@ -552,7 +557,7 @@ impl CurrentlyCheckingSet { pub async fn next( &mut self, approvals_cache: &mut lru::LruCache, - ) -> (Vec, ApprovalState) { + ) -> (HashSet, ApprovalState) { if !self.currently_checking.is_empty() { if let Some(approval_state) = self.currently_checking.next().await { let out = self diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index b746052655d0..8e69cb88aa01 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -15,18 +15,25 @@ // along with Polkadot. If not, see . use super::*; -use polkadot_node_primitives::approval::{ - AssignmentCert, AssignmentCertKind, DelayTranche, VRFOutput, VRFProof, RELAY_VRF_MODULO_CONTEXT, +use polkadot_node_primitives::{ + approval::{ + AssignmentCert, AssignmentCertKind, DelayTranche, VRFOutput, VRFProof, + RELAY_VRF_MODULO_CONTEXT, + }, + AvailableData, BlockData, PoV, }; use polkadot_node_subsystem::{ - messages::{AllMessages, ApprovalVotingMessage, AssignmentCheckResult}, + messages::{ + AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage, + }, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_overseer::HeadSupportsParachains; use polkadot_primitives::v1::{ - CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId, ValidatorSignature, + CandidateCommitments, CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId, + ValidationCode, ValidatorSignature, }; use std::time::Duration; @@ -2156,7 +2163,7 @@ fn subsystem_approved_ancestor_missing_approval() { } #[test] -fn subsystem_process_wakeup_trigger_assignment_launch_approval() { +fn subsystem_validate_approvals_cache() { let assignment_criteria = Box::new(MockAssignmentCriteria( || { let mut assignments = HashMap::new(); @@ -2186,7 +2193,10 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() { } = test_harness; let block_hash = Hash::repeat_byte(0x01); - let candidate_receipt = dummy_candidate_receipt(block_hash); + let fork_block_hash = Hash::repeat_byte(0x02); + let candidate_commitments = CandidateCommitments::default(); + let mut candidate_receipt = dummy_candidate_receipt(block_hash); + candidate_receipt.commitments_hash = candidate_commitments.hash(); let candidate_hash = candidate_receipt.hash(); let slot = Slot::from(1); let candidate_index = 0; @@ -2215,6 +2225,7 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() { no_show_slots: 2, }; + let candidates = Some(vec![(candidate_receipt.clone(), CoreIndex(0), GroupIndex(0))]); ChainBuilder::new() .add_block( block_hash, @@ -2222,10 +2233,16 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() { 1, BlockConfig { slot, - candidates: Some(vec![(candidate_receipt, CoreIndex(0), GroupIndex(0))]), - session_info: Some(session_info), + candidates: candidates.clone(), + session_info: Some(session_info.clone()), }, ) + .add_block( + fork_block_hash, + ChainBuilder::GENESIS_HASH, + 1, + BlockConfig { slot, candidates, session_info: Some(session_info) }, + ) .build(&mut virtual_overseer) .await; @@ -2237,19 +2254,8 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() { futures_timer::Delay::new(Duration::from_millis(200)).await; - assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot + 2))); clock.inner.lock().wakeup_all(slot_to_tick(slot + 2)); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( - _, - c_index, - )) => { - assert_eq!(candidate_index, c_index); - } - ); - assert_eq!(clock.inner.lock().wakeups.len(), 0); futures_timer::Delay::new(Duration::from_millis(200)).await; @@ -2259,10 +2265,92 @@ fn subsystem_process_wakeup_trigger_assignment_launch_approval() { candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap(); assert!(our_assignment.triggered()); + // Handle the the next two assignment imports, where only one should trigger approvals work + handle_double_assignment_import(&mut virtual_overseer, candidate_index).await; + virtual_overseer }); } +/// Ensure that when two assignments are imported, only one triggers the Approval Checking work +pub async fn handle_double_assignment_import( + virtual_overseer: &mut VirtualOverseer, + candidate_index: CandidateIndex, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + c_index, + )) => { + assert_eq!(candidate_index, c_index); + } + ); + + recover_available_data(virtual_overseer).await; + fetch_validation_code(virtual_overseer).await; + + let first_message = virtual_overseer.recv().await; + let second_message = virtual_overseer.recv().await; + + for msg in vec![first_message, second_message].into_iter() { + match msg { + AllMessages::ApprovalDistribution( + ApprovalDistributionMessage::DistributeAssignment(_, c_index), + ) => { + assert_eq!(candidate_index, c_index); + }, + AllMessages::CandidateValidation( + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx), + ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + }, + _ => panic! {}, + } + } + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); +} + +/// Handles validation code fetch, returns the received relay parent hash. +async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) -> Hash { + let validation_code = ValidationCode(Vec::new()); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::ValidationCodeByHash( + _, + tx, + ) + )) => { + tx.send(Ok(Some(validation_code))).unwrap(); + hash + }, + "overseer did not receive runtime API request for validation code", + ) +} + +async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) { + let pov_block = PoV { block_data: BlockData(Vec::new()) }; + + let available_data = + AvailableData { pov: Arc::new(pov_block), validation_data: Default::default() }; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx) + ) => { + tx.send(Ok(available_data)).unwrap(); + }, + "overseer did not receive recover available data message", + ); +} + struct TriggersAssignmentConfig { our_assigned_tranche: DelayTranche, assign_validator_tranche: F1, @@ -2445,6 +2533,21 @@ async fn step_until_done(clock: &MockClock) { println!("relevant_ticks: {:?}", relevant_ticks); } +#[test] +fn subsystem_process_wakeup_trigger_assignment_launch_approval() { + triggers_assignment_test(TriggersAssignmentConfig { + our_assigned_tranche: 0, + assign_validator_tranche: |_| Ok(0), + no_show_slots: 0, + assignments_to_import: vec![1], + approvals_to_import: vec![1], + ticks: vec![ + 10, // Alice wakeup, assignment triggered + ], + should_be_triggered: |_| true, + }); +} + #[test] fn subsystem_assignment_triggered_solo_zero_tranche() { triggers_assignment_test(TriggersAssignmentConfig {