From f28baaa1ed1166cda95a3828444a516a7e2133dd Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sun, 27 Aug 2023 11:27:14 +0200 Subject: [PATCH 1/6] Import changes from archieved repo --- .../node/core/approval-voting/src/import.rs | 60 +++- polkadot/node/core/approval-voting/src/lib.rs | 21 +- .../node/core/approval-voting/src/tests.rs | 15 +- polkadot/node/core/backing/src/lib.rs | 19 +- polkadot/node/core/backing/src/tests/mod.rs | 258 +++++++++++++- .../src/tests/prospective_parachains.rs | 35 +- .../node/core/candidate-validation/src/lib.rs | 39 +- .../core/candidate-validation/src/tests.rs | 336 +++++++----------- .../core/dispute-coordinator/src/import.rs | 22 +- .../dispute-coordinator/src/initialized.rs | 1 + .../node/core/dispute-coordinator/src/lib.rs | 1 + .../src/participation/mod.rs | 1 + .../src/participation/queues/mod.rs | 29 +- .../src/participation/queues/tests.rs | 3 +- .../src/participation/tests.rs | 11 +- .../node/core/pvf/common/src/worker/mod.rs | 56 +-- polkadot/node/core/pvf/tests/it/main.rs | 2 +- polkadot/node/malus/src/variants/common.rs | 10 + .../src/pov_requester/mod.rs | 8 +- .../src/requester/tests.rs | 8 +- .../src/tests/state.rs | 8 +- .../src/collator_side/tests/mod.rs | 14 +- .../dispute-distribution/src/tests/mod.rs | 43 ++- .../node/overseer/examples/minimal-example.rs | 1 + polkadot/node/overseer/src/tests.rs | 2 + polkadot/node/subsystem-types/src/messages.rs | 2 + .../node/subsystem-util/src/runtime/error.rs | 4 + .../node/subsystem-util/src/runtime/mod.rs | 22 +- 28 files changed, 679 insertions(+), 352 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index c504ba71b3c2..5a9f193f1668 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -598,7 +598,9 @@ pub(crate) mod tests { use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage}; use polkadot_node_subsystem_test_helpers::make_subsystem_context; use polkadot_node_subsystem_util::database::Database; - use polkadot_primitives::{Id as ParaId, IndexedVec, SessionInfo, ValidatorId, ValidatorIndex}; + use polkadot_primitives::{ + ExecutorParams, Id as ParaId, IndexedVec, SessionInfo, ValidatorId, ValidatorIndex, + }; pub(crate) use sp_consensus_babe::{ digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, @@ -835,6 +837,20 @@ pub(crate) mod tests { si_tx.send(Ok(Some(session_info.clone()))).unwrap(); } ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionExecutorParams(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); @@ -952,6 +968,20 @@ pub(crate) mod tests { si_tx.send(Ok(Some(session_info.clone()))).unwrap(); } ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionExecutorParams(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); @@ -1172,6 +1202,20 @@ pub(crate) mod tests { si_tx.send(Ok(Some(session_info.clone()))).unwrap(); } ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionExecutorParams(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); }); futures::executor::block_on(futures::future::join(test_fut, aux_fut)); @@ -1374,6 +1418,20 @@ pub(crate) mod tests { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionExecutorParams(idx, si_tx), + ) + ) => { + assert_eq!(session, idx); + assert_eq!(req_block_hash, hash); + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( handle.recv().await, AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks( diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index b29e47b4c435..24a8c4b456ee 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -50,8 +50,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ ApprovalVote, BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement, - GroupIndex, Hash, PvfExecTimeoutKind, SessionIndex, SessionInfo, ValidDisputeStatementKind, - ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, + ExecutorParams, GroupIndex, Hash, PvfExecTimeoutKind, SessionIndex, SessionInfo, + ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, }; use sc_keystore::LocalKeystore; use sp_application_crypto::Pair; @@ -1009,6 +1009,15 @@ async fn handle_actions( }, None => { let ctx = &mut *ctx; + // FIXME: Should we request at `block_hash` or `relay_block_hash`? + let executor_params = match session_info_provider + .get_session_info_by_index(ctx.sender(), block_hash, session) + .await + { + Err(_) => None, + Ok(extended_session_info) => + Some(extended_session_info.executor_params.clone()), + }; currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -1023,6 +1032,7 @@ async fn handle_actions( validator_index, block_hash, backing_group, + executor_params, &launch_approval_span, ) .await @@ -2467,6 +2477,7 @@ async fn launch_approval( validator_index: ValidatorIndex, block_hash: Hash, backing_group: GroupIndex, + executor_params: Option, span: &jaeger::Span, ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); @@ -2537,6 +2548,11 @@ async fn launch_approval( // Force the move of the timer into the background task. let _timer = timer; + let executor_params = match executor_params { + Some(ep) => ep, + None => return ApprovalState::failed(validator_index, candidate_hash), + }; + let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Ok(Ok(a)) => a, @@ -2612,6 +2628,7 @@ async fn launch_approval( validation_code, candidate.clone(), available_data.pov, + executor_params, PvfExecTimeoutKind::Approval, val_tx, )) diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index f58e60c6a487..3c7b74b26fbe 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -910,6 +910,19 @@ async fn import_block( si_tx.send(Ok(Some(session_info.clone()))).unwrap(); } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + req_block_hash, + RuntimeApiRequest::SessionExecutorParams(_, si_tx), + ) + ) => { + // Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent) + assert_ne!(req_block_hash, hashes[(number-1) as usize].0); + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); } assert_matches!( @@ -2376,7 +2389,7 @@ async fn handle_double_assignment_import( assert_matches!( overseer_recv(virtual_overseer).await, - AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)) if timeout == PvfExecTimeoutKind::Approval => { + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, _, timeout, tx)) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) .unwrap(); } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 58763e6d80cc..28b1635ebc60 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -96,16 +96,16 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::{ self as util, backing_implicit_view::{FetchError as ImplicitViewFetchError, View as ImplicitView}, - request_from_runtime, request_session_index_for_child, request_validator_groups, - request_validators, + executor_params_at_relay_parent, request_from_runtime, request_session_index_for_child, + request_validator_groups, request_validators, runtime::{prospective_parachains_mode, ProspectiveParachainsMode}, Validator, }; use polkadot_primitives::{ BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, - CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, PersistedValidationData, - PvfExecTimeoutKind, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, - ValidatorSignature, ValidityAttestation, + CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, Hash, Id as ParaId, + PersistedValidationData, PvfExecTimeoutKind, SigningContext, ValidationCode, ValidatorId, + ValidatorIndex, ValidatorSignature, ValidityAttestation, }; use sp_keystore::KeystorePtr; use statement_table::{ @@ -551,6 +551,7 @@ async fn request_candidate_validation( code: ValidationCode, candidate_receipt: CandidateReceipt, pov: Arc, + executor_params: ExecutorParams, ) -> Result { let (tx, rx) = oneshot::channel(); @@ -560,6 +561,7 @@ async fn request_candidate_validation( code, candidate_receipt, pov, + executor_params, PvfExecTimeoutKind::Backing, tx, )) @@ -626,6 +628,12 @@ async fn validate_and_make_available( } }; + let executor_params = match executor_params_at_relay_parent(relay_parent, &mut sender).await { + Ok(ep) => ep, + Err(e) => return Err(Error::UtilError(e)), /* FIXME: Is it enough to just proparate + * `UtilError` here? */ + }; + let pov = match pov { PoVData::Ready(pov) => pov, PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => @@ -661,6 +669,7 @@ async fn validate_and_make_available( validation_code, candidate.clone(), pov.clone(), + executor_params, ) .await? }; diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index 054337669c07..41f88491bea1 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -335,6 +335,24 @@ fn backing_second_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -343,6 +361,7 @@ fn backing_second_works() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -481,6 +500,24 @@ fn backing_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + // Sending a `Statement::Seconded` for our assignment will start // validation process. The first thing requested is the PoV. assert_matches!( @@ -506,6 +543,7 @@ fn backing_works() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -665,6 +703,24 @@ fn backing_works_while_validation_ongoing() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + // Sending a `Statement::Seconded` for our assignment will start // validation process. The first thing requested is PoV from the // `PoVDistribution`. @@ -691,6 +747,7 @@ fn backing_works_while_validation_ongoing() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -837,6 +894,24 @@ fn backing_misbehavior_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityDistribution( @@ -858,6 +933,7 @@ fn backing_misbehavior_works() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -1023,6 +1099,24 @@ fn backing_dont_second_invalid() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -1031,6 +1125,7 @@ fn backing_dont_second_invalid() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -1069,6 +1164,24 @@ fn backing_dont_second_invalid() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -1077,6 +1190,7 @@ fn backing_dont_second_invalid() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -1187,6 +1301,24 @@ fn backing_second_after_first_fails_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + // Subsystem requests PoV and requests validation. assert_matches!( virtual_overseer.recv().await, @@ -1210,6 +1342,7 @@ fn backing_second_after_first_fails_works() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -1276,6 +1409,24 @@ fn backing_second_after_first_fails_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -1344,6 +1495,24 @@ fn backing_works_after_failed_validation() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + // Subsystem requests PoV and requests validation. assert_matches!( virtual_overseer.recv().await, @@ -1367,6 +1536,7 @@ fn backing_works_after_failed_validation() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -1553,6 +1723,24 @@ fn retry_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + // Subsystem requests PoV and requests validation. // We cancel - should mean retry on next backing statement. assert_matches!( @@ -1573,7 +1761,7 @@ fn retry_works() { virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await; // Not deterministic which message comes first: - for _ in 0u32..3 { + for _ in 0u32..5 { match virtual_overseer.recv().await { AllMessages::Provisioner(ProvisionerMessage::ProvisionableData( _, @@ -1592,6 +1780,18 @@ fn retry_works() { )) if hash == validation_code.hash() => { tx.send(Ok(Some(validation_code.clone()))).unwrap(); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + tx.send(Ok(1u32.into())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(sess_idx, tx), + )) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + }, msg => { assert!(false, "Unexpected message: {:?}", msg); }, @@ -1611,6 +1811,24 @@ fn retry_works() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityDistribution( @@ -1634,6 +1852,7 @@ fn retry_works() { _validation_code, candidate_receipt, _pov, + _, timeout, .. ), @@ -1808,6 +2027,24 @@ fn cannot_second_multiple_candidates_per_parent() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( @@ -1816,6 +2053,7 @@ fn cannot_second_multiple_candidates_per_parent() { _validation_code, candidate_receipt, _pov, + _, timeout, tx, ), @@ -1894,6 +2132,24 @@ fn cannot_second_multiple_candidates_per_parent() { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index 7c2773c8e3b6..d479e1bf14a1 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -205,6 +205,24 @@ async fn assert_validate_seconded_candidate( } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) + ) => { + tx.send(Ok(1u32.into())).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(sess_idx, tx)) + ) if sess_idx == 1 => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + if fetch_pov { assert_matches!( virtual_overseer.recv().await, @@ -227,6 +245,7 @@ async fn assert_validate_seconded_candidate( _validation_code, candidate_receipt, _pov, + _, timeout, tx, )) if &_pvd == pvd && @@ -1327,7 +1346,7 @@ fn concurrent_dependent_candidates() { tx.send(pov.clone()).unwrap(); }, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(.., candidate, _, _, tx), + CandidateValidationMessage::ValidateFromExhaustive(.., candidate, _, _, _, tx), ) => { let candidate_hash = candidate.hash(); let (head_data, pvd) = if candidate_hash == candidate_a_hash { @@ -1384,6 +1403,20 @@ fn concurrent_dependent_candidates() { break } }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + tx.send(Ok(1u32.into())).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(sess_idx, tx), + )) => { + assert_eq!(sess_idx, 1); + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + }, + _ => panic!("unexpected message received from overseer: {:?}", msg), } } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index f53f2a6aee06..f47c9b9771ec 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -165,6 +165,7 @@ async fn run( CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + executor_params, timeout, response_sender, ) => { @@ -180,6 +181,7 @@ async fn run( validation_host, candidate_receipt, pov, + executor_params, timeout, &metrics, ) @@ -197,23 +199,23 @@ async fn run( validation_code, candidate_receipt, pov, + executor_params, timeout, response_sender, ) => { let bg = { - let mut sender = ctx.sender().clone(); let metrics = metrics.clone(); let validation_host = validation_host.clone(); async move { let _timer = metrics.time_validate_from_exhaustive(); let res = validate_candidate_exhaustive( - &mut sender, validation_host, persisted_validation_data, validation_code, candidate_receipt, pov, + executor_params, timeout, &metrics, ) @@ -498,6 +500,7 @@ async fn validate_from_chain_state( validation_host: ValidationHost, candidate_receipt: CandidateReceipt, pov: Arc, + executor_params: ExecutorParams, exec_timeout_kind: PvfExecTimeoutKind, metrics: &Metrics, ) -> Result @@ -512,12 +515,12 @@ where }; let validation_result = validate_candidate_exhaustive( - sender, validation_host, validation_data, validation_code, candidate_receipt.clone(), pov, + executor_params, exec_timeout_kind, metrics, ) @@ -547,19 +550,16 @@ where validation_result } -async fn validate_candidate_exhaustive( - sender: &mut Sender, +async fn validate_candidate_exhaustive( mut validation_backend: impl ValidationBackend + Send, persisted_validation_data: PersistedValidationData, validation_code: ValidationCode, candidate_receipt: CandidateReceipt, pov: Arc, + executor_params: ExecutorParams, exec_timeout_kind: PvfExecTimeoutKind, metrics: &Metrics, -) -> Result -where - Sender: SubsystemSender, -{ +) -> Result { let _timer = metrics.time_validate_candidate_exhaustive(); let validation_code_hash = validation_code.hash(); @@ -616,27 +616,6 @@ where relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, }; - let executor_params = if let Ok(executor_params) = - executor_params_at_relay_parent(candidate_receipt.descriptor.relay_parent, sender).await - { - gum::debug!( - target: LOG_TARGET, - ?validation_code_hash, - ?para_id, - "Acquired executor params for the session: {:?}", - executor_params, - ); - executor_params - } else { - gum::warn!( - target: LOG_TARGET, - ?validation_code_hash, - ?para_id, - "Failed to acquire executor params for the session", - ); - return Ok(ValidationResult::Invalid(InvalidCandidate::BadParent)) - }; - let result = validation_backend .validate_candidate_with_retry( raw_validation_code.to_vec(), diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 2dcead4db466..af530a20c4e0 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -26,37 +26,6 @@ use polkadot_primitives::{HeadData, Id as ParaId, UpwardMessage}; use sp_core::testing::TaskExecutor; use sp_keyring::Sr25519Keyring; -fn test_with_executor_params, R, M>( - mut ctx_handle: test_helpers::TestSubsystemContextHandle, - test: impl FnOnce() -> T, -) -> R { - let test_fut = test(); - - let overseer = async move { - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) - ) => { - tx.send(Ok(1u32.into())).unwrap(); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) - ) => { - tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); - } - ); - }; - - futures::pin_mut!(test_fut); - futures::pin_mut!(overseer); - let v = executor::block_on(future::join(test_fut, overseer)); - v.0 -} - #[test] fn correctly_checks_included_assumption() { let validation_data: PersistedValidationData = Default::default(); @@ -460,23 +429,16 @@ fn candidate_validation_ok_is_ok() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data.clone(), - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }) + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data.clone(), + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )) .unwrap(); assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { @@ -517,27 +479,21 @@ fn candidate_validation_bad_return_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result(Err( - ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), - )), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }); + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result(Err( + ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), + )), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )) + .unwrap(); - assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))); + assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout)); } // Test that we vote valid if we get `AmbiguousWorkerDeath`, retry, and then succeed. @@ -588,26 +544,19 @@ fn candidate_validation_one_ambiguous_error_is_valid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - Ok(validation_result), - ]), - validation_data.clone(), - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }) + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Ok(validation_result), + ]), + validation_data.clone(), + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )) .unwrap(); assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { @@ -648,26 +597,19 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - ]), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }) + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )) .unwrap(); assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_))); @@ -702,29 +644,22 @@ fn candidate_validation_retry_internal_errors() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(InternalValidationError::HostCommunication("foo".into()).into()), - // Throw an AWD error, we should still retry again. - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - // Throw another internal error. - Err(InternalValidationError::HostCommunication("bar".into()).into()), - ]), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }); + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(InternalValidationError::HostCommunication("foo".into()).into()), + // Throw an AWD error, we should still retry again. + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + // Throw another internal error. + Err(InternalValidationError::HostCommunication("bar".into()).into()), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )); assert_matches!(v, Err(ValidationFailed(s)) if s.contains("bar")); } @@ -758,29 +693,22 @@ fn candidate_validation_retry_panic_errors() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result_list(vec![ - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))), - // Throw an AWD error, we should still retry again. - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), - // Throw another panic error. - Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))), - ]), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }); + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))), + // Throw an AWD error, we should still retry again. + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + // Throw another panic error. + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )); assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(s))) if s == "bar".to_string()); } @@ -813,25 +741,18 @@ fn candidate_validation_timeout_is_internal_error() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result(Err( - ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), - )), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }); + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result(Err( + ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), + )), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )); assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))); } @@ -867,23 +788,16 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { hrmp_watermark: 12345, }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let result = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }) + let result = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )) .unwrap(); // Ensure `post validation` check on the commitments hash works as expected. @@ -919,11 +833,9 @@ fn candidate_validation_code_mismatch_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let pool = TaskExecutor::new(); - let (mut ctx, _ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); + let (_ctx, _ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); let v = executor::block_on(validate_candidate_exhaustive( - ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Err( ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), )), @@ -931,6 +843,7 @@ fn candidate_validation_code_mismatch_is_invalid() { validation_code, candidate_receipt, Arc::new(pov), + ExecutorParams::default(), PvfExecTimeoutKind::Backing, &Default::default(), )) @@ -981,23 +894,16 @@ fn compressed_code_works() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; - let pool = TaskExecutor::new(); - let (mut ctx, ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - let metrics = Metrics::default(); - - let v = test_with_executor_params(ctx_handle, || { - validate_candidate_exhaustive( - ctx.sender(), - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - PvfExecTimeoutKind::Backing, - &metrics, - ) - }); + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + ExecutorParams::default(), + PvfExecTimeoutKind::Backing, + &Default::default(), + )); assert_matches!(v, Ok(ValidationResult::Valid(_, _))); } @@ -1037,16 +943,15 @@ fn code_decompression_failure_is_error() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let pool = TaskExecutor::new(); - let (mut ctx, _ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); + let (_ctx, _ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); let v = executor::block_on(validate_candidate_exhaustive( - ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, validation_code, candidate_receipt, Arc::new(pov), + ExecutorParams::default(), PvfExecTimeoutKind::Backing, &Default::default(), )); @@ -1090,16 +995,15 @@ fn pov_decompression_failure_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let pool = TaskExecutor::new(); - let (mut ctx, _ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); + let (_ctx, _ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); let v = executor::block_on(validate_candidate_exhaustive( - ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, validation_code, candidate_receipt, Arc::new(pov), + ExecutorParams::default(), PvfExecTimeoutKind::Backing, &Default::default(), )); diff --git a/polkadot/node/core/dispute-coordinator/src/import.rs b/polkadot/node/core/dispute-coordinator/src/import.rs index 0da3723ebf22..cf51ae4d3655 100644 --- a/polkadot/node/core/dispute-coordinator/src/import.rs +++ b/polkadot/node/core/dispute-coordinator/src/import.rs @@ -34,8 +34,9 @@ use polkadot_node_primitives::{ use polkadot_node_subsystem::overseer; use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ - CandidateReceipt, DisputeStatement, Hash, IndexedVec, SessionIndex, SessionInfo, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, + CandidateReceipt, DisputeStatement, ExecutorParams, Hash, IndexedVec, SessionIndex, + SessionInfo, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, + ValidatorSignature, }; use sc_keystore::LocalKeystore; @@ -47,6 +48,8 @@ pub struct CandidateEnvironment<'a> { session_index: SessionIndex, /// Session for above index. session: &'a SessionInfo, + /// Executor parameters for the session. + executor_params: &'a ExecutorParams, /// Validator indices controlled by this node. controlled_indices: HashSet, } @@ -63,17 +66,17 @@ impl<'a> CandidateEnvironment<'a> { session_index: SessionIndex, relay_parent: Hash, ) -> Option> { - let session_info = match runtime_info + let (session, executor_params) = match runtime_info .get_session_info_by_index(ctx.sender(), relay_parent, session_index) .await { - Ok(extended_session_info) => &extended_session_info.session_info, + Ok(extended_session_info) => + (&extended_session_info.session_info, &extended_session_info.executor_params), Err(_) => return None, }; - let controlled_indices = - find_controlled_validator_indices(keystore, &session_info.validators); - Some(Self { session_index, session: session_info, controlled_indices }) + let controlled_indices = find_controlled_validator_indices(keystore, &session.validators); + Some(Self { session_index, session, executor_params, controlled_indices }) } /// Validators in the candidate's session. @@ -86,6 +89,11 @@ impl<'a> CandidateEnvironment<'a> { &self.session } + /// Executor parameters for the candidate's session + pub fn executor_params(&self) -> &ExecutorParams { + &self.executor_params + } + /// Retrieve `SessionIndex` for this environment. pub fn session_index(&self) -> SessionIndex { self.session_index diff --git a/polkadot/node/core/dispute-coordinator/src/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs index c1d02ef976cb..dd7aef19f6ed 100644 --- a/polkadot/node/core/dispute-coordinator/src/initialized.rs +++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs @@ -1160,6 +1160,7 @@ impl Initialized { ParticipationRequest::new( new_state.candidate_receipt().clone(), session, + env.executor_params().clone(), request_timer, ), ) diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs index a2c500e08e28..a423490fc534 100644 --- a/polkadot/node/core/dispute-coordinator/src/lib.rs +++ b/polkadot/node/core/dispute-coordinator/src/lib.rs @@ -399,6 +399,7 @@ impl DisputeCoordinatorSubsystem { ParticipationRequest::new( vote_state.votes().candidate_receipt.clone(), session, + env.executor_params().clone(), request_timer, ), )); diff --git a/polkadot/node/core/dispute-coordinator/src/participation/mod.rs b/polkadot/node/core/dispute-coordinator/src/participation/mod.rs index 25b7352807f6..5a3c4be90aa0 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/mod.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/mod.rs @@ -366,6 +366,7 @@ async fn participate( validation_code, req.candidate_receipt().clone(), available_data.pov, + req.executor_params(), PvfExecTimeoutKind::Approval, validation_tx, )) diff --git a/polkadot/node/core/dispute-coordinator/src/participation/queues/mod.rs b/polkadot/node/core/dispute-coordinator/src/participation/queues/mod.rs index 8a4374999f88..1105135a747f 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -21,7 +21,9 @@ use std::{ use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; -use polkadot_primitives::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; +use polkadot_primitives::{ + BlockNumber, CandidateHash, CandidateReceipt, ExecutorParams, Hash, SessionIndex, +}; use crate::{ error::{FatalError, FatalResult, Result}, @@ -73,6 +75,7 @@ pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, + executor_params: ExecutorParams, request_timer: Option, // Sends metric data when request is dropped } @@ -120,9 +123,16 @@ impl ParticipationRequest { pub fn new( candidate_receipt: CandidateReceipt, session: SessionIndex, + executor_params: ExecutorParams, request_timer: Option, ) -> Self { - Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer } + Self { + candidate_hash: candidate_receipt.hash(), + candidate_receipt, + session, + executor_params, + request_timer, + } } pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { @@ -134,6 +144,9 @@ impl ParticipationRequest { pub fn session(&self) -> SessionIndex { self.session } + pub fn executor_params(&self) -> ExecutorParams { + self.executor_params.clone() + } pub fn discard_timer(&mut self) { if let Some(timer) = self.request_timer.take() { timer.stop_and_discard(); @@ -150,11 +163,17 @@ impl ParticipationRequest { #[cfg(test)] impl PartialEq for ParticipationRequest { fn eq(&self, other: &Self) -> bool { - let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } = - self; + let ParticipationRequest { + candidate_receipt, + candidate_hash, + session, + executor_params, + request_timer: _, + } = self; candidate_receipt == other.candidate_receipt() && candidate_hash == other.candidate_hash() && - *session == other.session() + *session == other.session() && + executor_params.hash() == other.executor_params.hash() } } #[cfg(test)] diff --git a/polkadot/node/core/dispute-coordinator/src/participation/queues/tests.rs b/polkadot/node/core/dispute-coordinator/src/participation/queues/tests.rs index d4f43639ce87..63bfc1d7d026 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -27,7 +27,7 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest { // make it differ: receipt.commitments_hash = hash; let request_timer = Metrics::default().time_participation_pipeline(); - ParticipationRequest::new(receipt, 1, request_timer) + ParticipationRequest::new(receipt, 1, Default::default(), request_timer) } /// Make dummy comparator for request, based on the given block number. @@ -46,6 +46,7 @@ fn clone_request(request: &ParticipationRequest) -> ParticipationRequest { candidate_receipt: request.candidate_receipt.clone(), candidate_hash: request.candidate_hash, session: request.session, + executor_params: request.executor_params.clone(), request_timer: None, } } diff --git a/polkadot/node/core/dispute-coordinator/src/participation/tests.rs b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs index 32725a3ac658..a2e8e88cb674 100644 --- a/polkadot/node/core/dispute-coordinator/src/participation/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/participation/tests.rs @@ -73,7 +73,8 @@ async fn participate_with_commitments_hash( let session = 1; let request_timer = participation.metrics.time_participation_pipeline(); - let req = ParticipationRequest::new(candidate_receipt, session, request_timer); + let req = + ParticipationRequest::new(candidate_receipt, session, Default::default(), request_timer); participation .queue_participation(ctx, ParticipationPriority::BestEffort, req) @@ -120,7 +121,7 @@ pub async fn participation_full_happy_path( assert_matches!( ctx_handle.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) + CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, _, timeout, tx) ) if timeout == PvfExecTimeoutKind::Approval => { if expected_commitments_hash != candidate_receipt.commitments_hash { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); @@ -455,7 +456,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { assert_matches!( ctx_handle.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, _, timeout, tx) ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))).unwrap(); }, @@ -492,7 +493,7 @@ fn cast_invalid_vote_if_commitments_dont_match() { assert_matches!( ctx_handle.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, _, timeout, tx) ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); }, @@ -529,7 +530,7 @@ fn cast_valid_vote_if_validation_passes() { assert_matches!( ctx_handle.recv().await, AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) + CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, _, timeout, tx) ) if timeout == PvfExecTimeoutKind::Approval => { tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); }, diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs index 4ea0e5aa1a9a..f8cb063d7c7d 100644 --- a/polkadot/node/core/pvf/common/src/worker/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/mod.rs @@ -121,14 +121,13 @@ pub fn worker_event_loop( "Node and worker version mismatch, node needs restarting, forcing shutdown", ); kill_parent_node_in_emergency(); - let err = io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"); - worker_shutdown_message(debug_id, worker_pid, err); + let err: io::Result = + Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")); + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err); return } } - remove_env_vars(debug_id); - // Run the main worker loop. let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let err = rt @@ -143,7 +142,7 @@ pub fn worker_event_loop( // It's never `Ok` because it's `Ok(Never)`. .unwrap_err(); - worker_shutdown_message(debug_id, worker_pid, err); + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, @@ -151,53 +150,6 @@ pub fn worker_event_loop( rt.shutdown_background(); } -/// Delete all env vars to prevent malicious code from accessing them. -fn remove_env_vars(debug_id: &'static str) { - for (key, value) in std::env::vars_os() { - // TODO: *theoretically* the value (or mere presence) of `RUST_LOG` can be a source of - // randomness for malicious code. In the future we can remove it also and log in the host; - // see . - if key == "RUST_LOG" { - continue - } - - // In case of a key or value that would cause [`env::remove_var` to - // panic](https://doc.rust-lang.org/std/env/fn.remove_var.html#panics), we first log a - // warning and then proceed to attempt to remove the env var. - let mut err_reasons = vec![]; - let (key_str, value_str) = (key.to_str(), value.to_str()); - if key.is_empty() { - err_reasons.push("key is empty"); - } - if key_str.is_some_and(|s| s.contains('=')) { - err_reasons.push("key contains '='"); - } - if key_str.is_some_and(|s| s.contains('\0')) { - err_reasons.push("key contains null character"); - } - if value_str.is_some_and(|s| s.contains('\0')) { - err_reasons.push("value contains null character"); - } - if !err_reasons.is_empty() { - gum::warn!( - target: LOG_TARGET, - %debug_id, - ?key, - ?value, - "Attempting to remove badly-formatted env var, this may cause the PVF worker to crash. Please remove it yourself. Reasons: {:?}", - err_reasons - ); - } - - std::env::remove_var(key); - } -} - -/// Provide a consistent message on worker shutdown. -fn worker_shutdown_message(debug_id: &'static str, worker_pid: u32, err: io::Error) { - gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); -} - /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up /// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. /// diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 72c459c2f632..0f30efefc4cd 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -258,7 +258,7 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { #[tokio::test] async fn deleting_prepared_artifact_does_not_dispute() { let host = TestHost::new(); - let cache_dir = host.cache_dir.path().clone(); + let cache_dir = host.cache_dir.path(); let result = host .validate_candidate( diff --git a/polkadot/node/malus/src/variants/common.rs b/polkadot/node/malus/src/variants/common.rs index 475ca8f31452..bc5f6f92aedb 100644 --- a/polkadot/node/malus/src/variants/common.rs +++ b/polkadot/node/malus/src/variants/common.rs @@ -278,6 +278,7 @@ where validation_code, candidate_receipt, pov, + executor_params, timeout, sender, ), @@ -292,6 +293,7 @@ where validation_code, candidate_receipt, pov, + executor_params, timeout, sender, ), @@ -329,6 +331,7 @@ where validation_code, candidate_receipt, pov, + executor_params, timeout, sender, ), @@ -368,6 +371,7 @@ where validation_code, candidate_receipt, pov, + executor_params, timeout, sender, ), @@ -382,6 +386,7 @@ where validation_code, candidate_receipt, pov, + executor_params, timeout, sender, ), @@ -394,6 +399,7 @@ where CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + executor_params, timeout, response_sender, ), @@ -406,6 +412,7 @@ where msg: CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + executor_params, timeout, response_sender, ), @@ -435,6 +442,7 @@ where msg: CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + executor_params, timeout, response_sender, ), @@ -468,6 +476,7 @@ where msg: CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + executor_params, timeout, response_sender, ), @@ -479,6 +488,7 @@ where msg: CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + executor_params, timeout, response_sender, ), diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs index 8fc9fa82153e..12a97a1fb5a1 100644 --- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs @@ -146,7 +146,7 @@ mod tests { AllMessages, AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, }; use polkadot_node_subsystem_test_helpers as test_helpers; - use polkadot_primitives::{CandidateHash, Hash, ValidatorIndex}; + use polkadot_primitives::{CandidateHash, ExecutorParams, Hash, ValidatorIndex}; use test_helpers::mock::make_ferdie_keystore; use super::*; @@ -208,6 +208,12 @@ mod tests { )) => { tx.send(Ok(Some(make_session_info()))).unwrap(); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(_, tx), + )) => { + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + }, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests( mut reqs, _, diff --git a/polkadot/node/network/availability-distribution/src/requester/tests.rs b/polkadot/node/network/availability-distribution/src/requester/tests.rs index b74a69ea0769..5f7e4c36f063 100644 --- a/polkadot/node/network/availability-distribution/src/requester/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/tests.rs @@ -24,8 +24,8 @@ use polkadot_node_network_protocol::jaeger; use polkadot_node_primitives::{BlockData, ErasureChunk, PoV}; use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ - BlockNumber, CoreState, GroupIndex, Hash, Id as ParaId, ScheduledCore, SessionIndex, - SessionInfo, + BlockNumber, CoreState, ExecutorParams, GroupIndex, Hash, Id as ParaId, ScheduledCore, + SessionIndex, SessionInfo, }; use sp_core::traits::SpawnNamed; @@ -120,6 +120,10 @@ fn spawn_virtual_overseer( tx.send(Ok(Some(test_state.session_info.clone()))) .expect("Receiver should be alive."); }, + RuntimeApiRequest::SessionExecutorParams(_, tx) => { + tx.send(Ok(Some(ExecutorParams::default()))) + .expect("Receiver should be alive."); + }, RuntimeApiRequest::AvailabilityCores(tx) => { let para_id = ParaId::from(1_u32); let maybe_block_position = diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index 706ec13a3e9b..ecde44788c25 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -48,8 +48,8 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_primitives::{ - CandidateHash, CoreState, GroupIndex, Hash, Id as ParaId, ScheduledCore, SessionInfo, - ValidatorIndex, + CandidateHash, CoreState, ExecutorParams, GroupIndex, Hash, Id as ParaId, ScheduledCore, + SessionInfo, ValidatorIndex, }; use test_helpers::mock::make_ferdie_keystore; @@ -267,6 +267,10 @@ impl TestState { tx.send(Ok(Some(self.session_info.clone()))) .expect("Receiver should be alive."); }, + RuntimeApiRequest::SessionExecutorParams(_, tx) => { + tx.send(Ok(Some(ExecutorParams::default()))) + .expect("Receiver should be alive."); + }, RuntimeApiRequest::AvailabilityCores(tx) => { gum::trace!(target: LOG_TARGET, cores= ?self.cores[&hash], hash = ?hash, "Sending out cores for hash"); tx.send(Ok(self.cores[&hash].clone())) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index fae719b2b4a1..8b28730c9012 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -45,8 +45,8 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::{reputation::add_reputation, TimeoutExt}; use polkadot_primitives::{ - AuthorityDiscoveryId, CollatorPair, GroupIndex, GroupRotationInfo, IndexedVec, ScheduledCore, - SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, + AuthorityDiscoveryId, CollatorPair, ExecutorParams, GroupIndex, GroupRotationInfo, IndexedVec, + ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, }; use polkadot_primitives_test_helpers::TestCandidateBuilder; @@ -398,6 +398,16 @@ async fn distribute_collation_with_receipt( tx.send(Ok(Some(test_state.session_info.clone()))).unwrap(); }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionExecutorParams(session_index, tx), + )) => { + assert_eq!(relay_parent, relay_parent); + assert_eq!(session_index, test_state.current_session_index()); + + tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( _relay_parent, RuntimeApiRequest::ValidatorGroups(tx), diff --git a/polkadot/node/network/dispute-distribution/src/tests/mod.rs b/polkadot/node/network/dispute-distribution/src/tests/mod.rs index bc8e8e8be160..f53b9c0dd4b5 100644 --- a/polkadot/node/network/dispute-distribution/src/tests/mod.rs +++ b/polkadot/node/network/dispute-distribution/src/tests/mod.rs @@ -57,7 +57,8 @@ use polkadot_node_subsystem_test_helpers::{ mock::make_ferdie_keystore, subsystem_test_harness, TestSubsystemContextHandle, }; use polkadot_primitives::{ - AuthorityDiscoveryId, CandidateHash, CandidateReceipt, Hash, SessionIndex, SessionInfo, + AuthorityDiscoveryId, CandidateHash, CandidateReceipt, ExecutorParams, Hash, SessionIndex, + SessionInfo, }; use self::mock::{ @@ -635,6 +636,16 @@ async fn nested_network_dispute_request<'a, F, O>( }, unexpected => panic!("Unexpected message {:?}", unexpected), } + match handle.recv().await { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(_, tx), + )) => { + tx.send(Ok(Some(ExecutorParams::default()))) + .expect("Receiver should stay alive."); + }, + unexpected => panic!("Unexpected message {:?}", unexpected), + } } // Import should get initiated: @@ -746,15 +757,27 @@ async fn activate_leaf( if let Some(session_info) = new_session { assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(session_idx, tx) - )) => { - assert_eq!(h, activate); - assert_eq!(session_index, session_idx); - tx.send(Ok(Some(session_info))).expect("Receiver should stay alive."); - }); + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(session_idx, tx) + )) => { + assert_eq!(h, activate); + assert_eq!(session_index, session_idx); + tx.send(Ok(Some(session_info))).expect("Receiver should stay alive."); + } + ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionExecutorParams(session_idx, tx) + )) => { + assert_eq!(h, activate); + assert_eq!(session_index, session_idx); + tx.send(Ok(Some(ExecutorParams::default()))).expect("Receiver should stay alive."); + } + ); } assert_matches!( diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index a823b1d3961e..e78941776d5e 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -76,6 +76,7 @@ impl Subsystem1 { let msg = CandidateValidationMessage::ValidateFromChainState( candidate_receipt, PoV { block_data: BlockData(Vec::new()) }.into(), + Default::default(), PvfExecTimeoutKind::Backing, tx, ); diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index 4ac538a7fd3a..22d9bf0a708d 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -106,6 +106,7 @@ where ctx.send_message(CandidateValidationMessage::ValidateFromChainState( candidate_receipt, PoV { block_data: BlockData(Vec::new()) }.into(), + Default::default(), PvfExecTimeoutKind::Backing, tx, )) @@ -779,6 +780,7 @@ fn test_candidate_validation_msg() -> CandidateValidationMessage { CandidateValidationMessage::ValidateFromChainState( candidate_receipt, pov, + Default::default(), PvfExecTimeoutKind::Backing, sender, ) diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 8adc39eed56d..c0abaf27b6ce 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -143,6 +143,7 @@ pub enum CandidateValidationMessage { ValidateFromChainState( CandidateReceipt, Arc, + ExecutorParams, /// Execution timeout PvfExecTimeoutKind, oneshot::Sender>, @@ -161,6 +162,7 @@ pub enum CandidateValidationMessage { ValidationCode, CandidateReceipt, Arc, + ExecutorParams, /// Execution timeout PvfExecTimeoutKind, oneshot::Sender>, diff --git a/polkadot/node/subsystem-util/src/runtime/error.rs b/polkadot/node/subsystem-util/src/runtime/error.rs index db3eacd68514..1702a2dbd60a 100644 --- a/polkadot/node/subsystem-util/src/runtime/error.rs +++ b/polkadot/node/subsystem-util/src/runtime/error.rs @@ -38,6 +38,10 @@ pub enum Error { /// We tried fetching a session info which was not available. #[error("There was no session with the given index {0}")] NoSuchSession(SessionIndex), + + /// We tried fetching executor params for a session which were not available. + #[error("There was no executor parameters for session with the given index {0}")] + NoExecutorParams(SessionIndex), } pub type Result = std::result::Result; diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs index 1f5641e3ea95..be79d551acfd 100644 --- a/polkadot/node/subsystem-util/src/runtime/mod.rs +++ b/polkadot/node/subsystem-util/src/runtime/mod.rs @@ -29,16 +29,16 @@ use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::RuntimeApiMessage, overseer, SubsystemSender, }; use polkadot_primitives::{ - vstaging, CandidateEvent, CandidateHash, CoreState, EncodeAs, GroupIndex, GroupRotationInfo, - Hash, IndexedVec, OccupiedCore, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed, - SigningContext, UncheckedSigned, ValidationCode, ValidationCodeHash, ValidatorId, - ValidatorIndex, + vstaging, CandidateEvent, CandidateHash, CoreState, EncodeAs, ExecutorParams, GroupIndex, + GroupRotationInfo, Hash, IndexedVec, OccupiedCore, ScrapedOnChainVotes, SessionIndex, + SessionInfo, Signed, SigningContext, UncheckedSigned, ValidationCode, ValidationCodeHash, + ValidatorId, ValidatorIndex, }; use crate::{ request_availability_cores, request_candidate_events, request_key_ownership_proof, - request_on_chain_votes, request_session_index_for_child, request_session_info, - request_staging_async_backing_params, request_submit_report_dispute_lost, + request_on_chain_votes, request_session_executor_params, request_session_index_for_child, + request_session_info, request_staging_async_backing_params, request_submit_report_dispute_lost, request_unapplied_slashes, request_validation_code_by_hash, request_validator_groups, }; @@ -84,6 +84,8 @@ pub struct ExtendedSessionInfo { pub session_info: SessionInfo, /// Contains useful information about ourselves, in case this node is a validator. pub validator_info: ValidatorInfo, + /// Session executor parameters + pub executor_params: ExecutorParams, } /// Information about ourselves, in case we are an `Authority`. @@ -177,9 +179,15 @@ impl RuntimeInfo { recv_runtime(request_session_info(parent, session_index, sender).await) .await? .ok_or(JfyiError::NoSuchSession(session_index))?; + + let executor_params = + recv_runtime(request_session_executor_params(parent, session_index, sender).await) + .await? + .ok_or(JfyiError::NoExecutorParams(session_index))?; + let validator_info = self.get_validator_info(&session_info)?; - let full_info = ExtendedSessionInfo { session_info, validator_info }; + let full_info = ExtendedSessionInfo { session_info, validator_info, executor_params }; self.session_info_cache.put(session_index, full_info); } From e85262a1de30a9dad6454dd3d0755a3fc815e731 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sun, 27 Aug 2023 13:09:22 +0200 Subject: [PATCH 2/6] Revert erroneous changes --- .../node/core/pvf/common/src/worker/mod.rs | 56 +++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs index f8cb063d7c7d..4ea0e5aa1a9a 100644 --- a/polkadot/node/core/pvf/common/src/worker/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/mod.rs @@ -121,13 +121,14 @@ pub fn worker_event_loop( "Node and worker version mismatch, node needs restarting, forcing shutdown", ); kill_parent_node_in_emergency(); - let err: io::Result = - Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch")); - gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err); + let err = io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"); + worker_shutdown_message(debug_id, worker_pid, err); return } } + remove_env_vars(debug_id); + // Run the main worker loop. let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let err = rt @@ -142,7 +143,7 @@ pub fn worker_event_loop( // It's never `Ok` because it's `Ok(Never)`. .unwrap_err(); - gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); + worker_shutdown_message(debug_id, worker_pid, err); // We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast // as possible and not wait for stalled validation to finish. This isn't strictly necessary now, @@ -150,6 +151,53 @@ pub fn worker_event_loop( rt.shutdown_background(); } +/// Delete all env vars to prevent malicious code from accessing them. +fn remove_env_vars(debug_id: &'static str) { + for (key, value) in std::env::vars_os() { + // TODO: *theoretically* the value (or mere presence) of `RUST_LOG` can be a source of + // randomness for malicious code. In the future we can remove it also and log in the host; + // see . + if key == "RUST_LOG" { + continue + } + + // In case of a key or value that would cause [`env::remove_var` to + // panic](https://doc.rust-lang.org/std/env/fn.remove_var.html#panics), we first log a + // warning and then proceed to attempt to remove the env var. + let mut err_reasons = vec![]; + let (key_str, value_str) = (key.to_str(), value.to_str()); + if key.is_empty() { + err_reasons.push("key is empty"); + } + if key_str.is_some_and(|s| s.contains('=')) { + err_reasons.push("key contains '='"); + } + if key_str.is_some_and(|s| s.contains('\0')) { + err_reasons.push("key contains null character"); + } + if value_str.is_some_and(|s| s.contains('\0')) { + err_reasons.push("value contains null character"); + } + if !err_reasons.is_empty() { + gum::warn!( + target: LOG_TARGET, + %debug_id, + ?key, + ?value, + "Attempting to remove badly-formatted env var, this may cause the PVF worker to crash. Please remove it yourself. Reasons: {:?}", + err_reasons + ); + } + + std::env::remove_var(key); + } +} + +/// Provide a consistent message on worker shutdown. +fn worker_shutdown_message(debug_id: &'static str, worker_pid: u32, err: io::Error) { + gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err); +} + /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up /// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout. /// From 3dd3ce7583eba19095560e7ab2d0a501878dcbfd Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Wed, 30 Aug 2023 11:55:39 +0200 Subject: [PATCH 3/6] Fix more tests --- .../core/dispute-coordinator/src/tests.rs | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs index 75eae8200dc6..8d6a2e103962 100644 --- a/polkadot/node/core/dispute-coordinator/src/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/tests.rs @@ -63,9 +63,9 @@ use polkadot_node_subsystem_test_helpers::{ }; use polkadot_primitives::{ ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, - CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec, - MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, SigningContext, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, + CandidateReceipt, CoreIndex, DisputeStatement, ExecutorParams, GroupIndex, Hash, HeadData, + Header, IndexedVec, MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, + SigningContext, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, }; use crate::{ @@ -347,6 +347,17 @@ impl TestState { let _ = tx.send(Ok(Some(self.session_info()))); } ); + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionExecutorParams(session_index, tx), + )) => { + assert_eq!(h, block_hash); + assert_eq!(session_index, i); + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); } } @@ -3482,6 +3493,16 @@ fn session_info_is_requested_only_once() { let _ = tx.send(Ok(Some(test_state.session_info()))); } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(session_index, tx), + )) => { + assert_eq!(session_index, 2); + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); test_state }) }); @@ -3532,6 +3553,16 @@ fn session_info_big_jump_works() { let _ = tx.send(Ok(Some(test_state.session_info()))); } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(session_index, tx), + )) => { + assert_eq!(session_index, expected_idx); + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); } test_state }) @@ -3582,6 +3613,16 @@ fn session_info_small_jump_works() { let _ = tx.send(Ok(Some(test_state.session_info()))); } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(session_index, tx), + )) => { + assert_eq!(session_index, expected_idx); + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); } test_state }) From cf5885960fa675b82c38d47185145f47817e2518 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Wed, 30 Aug 2023 12:19:17 +0200 Subject: [PATCH 4/6] Resolve discussions --- polkadot/node/core/approval-voting/src/lib.rs | 1 - polkadot/node/core/backing/src/lib.rs | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 6691713f1966..c59d49341ca8 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -1008,7 +1008,6 @@ async fn handle_actions( }, None => { let ctx = &mut *ctx; - // FIXME: Should we request at `block_hash` or `relay_block_hash`? let executor_params = match session_info_provider .get_session_info_by_index(ctx.sender(), block_hash, session) .await diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 28b1635ebc60..39eb8b5b77b7 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -630,8 +630,7 @@ async fn validate_and_make_available( let executor_params = match executor_params_at_relay_parent(relay_parent, &mut sender).await { Ok(ep) => ep, - Err(e) => return Err(Error::UtilError(e)), /* FIXME: Is it enough to just proparate - * `UtilError` here? */ + Err(e) => return Err(Error::UtilError(e)), }; let pov = match pov { From 9abb36fc0a90a25324987c29fdab6c1fa884e0e9 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Wed, 30 Aug 2023 13:41:07 +0200 Subject: [PATCH 5/6] Fix MORE tests --- .../src/legacy_v1/tests.rs | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/polkadot/node/network/statement-distribution/src/legacy_v1/tests.rs b/polkadot/node/network/statement-distribution/src/legacy_v1/tests.rs index a8ce65f29861..63ac1febde56 100644 --- a/polkadot/node/network/statement-distribution/src/legacy_v1/tests.rs +++ b/polkadot/node/network/statement-distribution/src/legacy_v1/tests.rs @@ -44,7 +44,8 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_test_helpers::mock::make_ferdie_keystore; use polkadot_primitives::{ - GroupIndex, Hash, HeadData, Id as ParaId, IndexedVec, SessionInfo, ValidationCode, + ExecutorParams, GroupIndex, Hash, HeadData, Id as ParaId, IndexedVec, SessionInfo, + ValidationCode, }; use polkadot_primitives_test_helpers::{ dummy_committed_candidate_receipt, dummy_hash, AlwaysZeroRng, @@ -828,6 +829,17 @@ fn receiving_from_one_sends_to_another_and_to_candidate_backing() { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionExecutorParams(sess_index, tx)) + ) + if r == hash_a && sess_index == session_index + => { + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); + // notify of peers and view handle .send(FromOrchestra::Communication { @@ -1062,6 +1074,17 @@ fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing( } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionExecutorParams(sess_index, tx)) + ) + if r == hash_a && sess_index == session_index + => { + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); + // notify of peers and view handle .send(FromOrchestra::Communication { @@ -1586,6 +1609,17 @@ fn delay_reputation_changes() { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionExecutorParams(sess_index, tx)) + ) + if r == hash_a && sess_index == session_index + => { + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); + // notify of peers and view handle .send(FromOrchestra::Communication { @@ -2060,6 +2094,17 @@ fn share_prioritizes_backing_group() { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionExecutorParams(sess_index, tx)) + ) + if r == hash_a && sess_index == session_index + => { + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); + // notify of dummy peers and view for (peer, pair) in dummy_peers.clone().into_iter().zip(dummy_pairs) { handle @@ -2376,6 +2421,17 @@ fn peer_cant_flood_with_large_statements() { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionExecutorParams(sess_index, tx)) + ) + if r == hash_a && sess_index == session_index + => { + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); + // notify of peers and view handle .send(FromOrchestra::Communication { @@ -2595,6 +2651,17 @@ fn handle_multiple_seconded_statements() { } ); + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionExecutorParams(sess_index, tx)) + ) + if r == relay_parent_hash && sess_index == session_index + => { + let _ = tx.send(Ok(Some(ExecutorParams::default()))); + } + ); + // notify of peers and view for peer in all_peers.iter() { handle From f32c7c98ee5b4db71d1475787a92be7c4309815b Mon Sep 17 00:00:00 2001 From: ordian Date: Fri, 1 Sep 2023 19:18:41 +0200 Subject: [PATCH 6/6] approval-voting: launch_approval better interface (#1355) --- polkadot/node/core/approval-voting/src/lib.rs | 66 ++++++++++--------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index c59d49341ca8..0087f8e14350 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -45,7 +45,7 @@ use polkadot_node_subsystem_util::{ self, database::Database, metrics::{self, prometheus}, - runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, + runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo}, TimeoutExt, }; use polkadot_primitives::{ @@ -643,12 +643,12 @@ impl CurrentlyCheckingSet { } } -async fn get_session_info<'a, Sender>( +async fn get_extended_session_info<'a, Sender>( runtime_info: &'a mut RuntimeInfo, sender: &mut Sender, relay_parent: Hash, session_index: SessionIndex, -) -> Option<&'a SessionInfo> +) -> Option<&'a ExtendedSessionInfo> where Sender: SubsystemSender, { @@ -656,19 +656,33 @@ where .get_session_info_by_index(sender, relay_parent, session_index) .await { - Ok(extended_info) => Some(&extended_info.session_info), + Ok(extended_info) => Some(&extended_info), Err(_) => { gum::debug!( target: LOG_TARGET, session = session_index, ?relay_parent, - "Can't obtain SessionInfo" + "Can't obtain SessionInfo or ExecutorParams" ); None }, } } +async fn get_session_info<'a, Sender>( + runtime_info: &'a mut RuntimeInfo, + sender: &mut Sender, + relay_parent: Hash, + session_index: SessionIndex, +) -> Option<&'a SessionInfo> +where + Sender: SubsystemSender, +{ + get_extended_session_info(runtime_info, sender, relay_parent, session_index) + .await + .map(|extended_info| &extended_info.session_info) +} + struct State { keystore: Arc, slot_duration_millis: u64, @@ -746,6 +760,7 @@ enum Action { relay_block_hash: Hash, candidate_index: CandidateIndex, session: SessionIndex, + executor_params: ExecutorParams, candidate: CandidateReceipt, backing_group: GroupIndex, }, @@ -968,6 +983,7 @@ async fn handle_actions( relay_block_hash, candidate_index, session, + executor_params, candidate, backing_group, } => { @@ -1008,14 +1024,7 @@ async fn handle_actions( }, None => { let ctx = &mut *ctx; - let executor_params = match session_info_provider - .get_session_info_by_index(ctx.sender(), block_hash, session) - .await - { - Err(_) => None, - Ok(extended_session_info) => - Some(extended_session_info.executor_params.clone()), - }; + currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -2337,17 +2346,18 @@ async fn process_wakeup( _ => return Ok(Vec::new()), }; - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - block_entry.parent_hash(), - block_entry.session(), - ) - .await - { - Some(i) => i, - None => return Ok(Vec::new()), - }; + let ExtendedSessionInfo { ref session_info, ref executor_params, .. } = + match get_extended_session_info( + session_info_provider, + ctx.sender(), + block_entry.parent_hash(), + block_entry.session(), + ) + .await + { + Some(i) => i, + None => return Ok(Vec::new()), + }; let block_tick = slot_number_to_tick(state.slot_duration_millis, block_entry.slot()); let no_show_duration = slot_number_to_tick( @@ -2434,6 +2444,7 @@ async fn process_wakeup( relay_block_hash: relay_block, candidate_index: i as _, session: block_entry.session(), + executor_params: executor_params.clone(), candidate: candidate_receipt, backing_group, }); @@ -2475,7 +2486,7 @@ async fn launch_approval( validator_index: ValidatorIndex, block_hash: Hash, backing_group: GroupIndex, - executor_params: Option, + executor_params: ExecutorParams, span: &jaeger::Span, ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); @@ -2546,11 +2557,6 @@ async fn launch_approval( // Force the move of the timer into the background task. let _timer = timer; - let executor_params = match executor_params { - Some(ep) => ep, - None => return ApprovalState::failed(validator_index, candidate_hash), - }; - let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Ok(Ok(a)) => a,