diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index c3775ba1c453..f538f3a4929f 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -24,7 +24,8 @@ #![warn(missing_docs)] use polkadot_node_core_pvf::{ - InvalidCandidate as WasmInvalidCandidate, PrepareError, Pvf, ValidationError, ValidationHost, + InvalidCandidate as WasmInvalidCandidate, PrepareError, PvfDescriptor, PvfPreimage, + ValidationError, ValidationHost, }; use polkadot_node_primitives::{ BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, @@ -40,7 +41,7 @@ use polkadot_node_subsystem::{ }; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_primitives::v2::{ - CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, OccupiedCoreAssumption, + BlockNumber, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, PersistedValidationData, ValidationCode, ValidationCodeHash, }; @@ -168,15 +169,17 @@ async fn run( response_sender, ) => { let bg = { + let mut sender = ctx.sender().clone(); let metrics = metrics.clone(); let validation_host = validation_host.clone(); async move { let _timer = metrics.time_validate_from_exhaustive(); let res = validate_candidate_exhaustive( + &mut sender, validation_host, persisted_validation_data, - validation_code, + Some(validation_code), candidate_receipt, pov, timeout, @@ -220,6 +223,7 @@ async fn run( } } +#[derive(Debug)] struct RuntimeRequestFailed; async fn runtime_api_request( @@ -274,6 +278,30 @@ where .await } +async fn request_assumed_validation_data( + sender: &mut Sender, + descriptor: &CandidateDescriptor, +) -> Result< + Option<(PersistedValidationData, ValidationCodeHash)>, + RuntimeRequestFailed, +> +where + Sender: SubsystemSender, +{ + let (tx, rx) = oneshot::channel(); + runtime_api_request( + sender, + descriptor.relay_parent, + RuntimeApiRequest::AssumedValidationData( + descriptor.para_id, + descriptor.persisted_validation_data_hash, + tx, + ), + rx, + ) + .await +} + async fn precheck_pvf( sender: &mut Sender, mut validation_backend: impl ValidationBackend, @@ -283,6 +311,10 @@ async fn precheck_pvf( where Sender: SubsystemSender, { + // Even though the PVF host is capable of looking for code by its hash, we still + // request the validation code from the runtime right away. The reasoning is that + // the host is only expected to have the PVF cached in case there was a transient error + // that allows precheck retrying, otherwise the Runtime API request is necessary. let validation_code = match request_validation_code_by_hash(sender, relay_parent, validation_code_hash).await { Ok(Some(code)) => code, @@ -305,7 +337,7 @@ where &validation_code.0, VALIDATION_CODE_BOMB_LIMIT, ) { - Ok(code) => Pvf::from_code(code.into_owned()), + Ok(code) => PvfPreimage::from_code(code.into_owned()), Err(e) => { gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); return PreCheckOutcome::Invalid @@ -323,117 +355,9 @@ where } } -#[derive(Debug)] -enum AssumptionCheckOutcome { - Matches(PersistedValidationData, ValidationCode), - DoesNotMatch, - BadRequest, -} - -async fn check_assumption_validation_data( - sender: &mut Sender, - descriptor: &CandidateDescriptor, - assumption: OccupiedCoreAssumption, -) -> AssumptionCheckOutcome -where - Sender: SubsystemSender, -{ - let validation_data = { - let (tx, rx) = oneshot::channel(); - let d = runtime_api_request( - sender, - descriptor.relay_parent, - RuntimeApiRequest::PersistedValidationData(descriptor.para_id, assumption, tx), - rx, - ) - .await; - - match d { - Ok(None) | Err(RuntimeRequestFailed) => return AssumptionCheckOutcome::BadRequest, - Ok(Some(d)) => d, - } - }; - - let persisted_validation_data_hash = validation_data.hash(); - - if descriptor.persisted_validation_data_hash == persisted_validation_data_hash { - let (code_tx, code_rx) = oneshot::channel(); - let validation_code = runtime_api_request( - sender, - descriptor.relay_parent, - RuntimeApiRequest::ValidationCode(descriptor.para_id, assumption, code_tx), - code_rx, - ) - .await; - - match validation_code { - Ok(None) | Err(RuntimeRequestFailed) => AssumptionCheckOutcome::BadRequest, - Ok(Some(v)) => AssumptionCheckOutcome::Matches(validation_data, v), - } - } else { - AssumptionCheckOutcome::DoesNotMatch - } -} - -async fn find_assumed_validation_data( - sender: &mut Sender, - descriptor: &CandidateDescriptor, -) -> AssumptionCheckOutcome -where - Sender: SubsystemSender, -{ - // The candidate descriptor has a `persisted_validation_data_hash` which corresponds to - // one of up to two possible values that we can derive from the state of the - // relay-parent. We can fetch these values by getting the persisted validation data - // based on the different `OccupiedCoreAssumption`s. - - const ASSUMPTIONS: &[OccupiedCoreAssumption] = &[ - OccupiedCoreAssumption::Included, - OccupiedCoreAssumption::TimedOut, - // `TimedOut` and `Free` both don't perform any speculation and therefore should be the same - // for our purposes here. In other words, if `TimedOut` matched then the `Free` must be - // matched as well. - ]; - - // Consider running these checks in parallel to reduce validation latency. - for assumption in ASSUMPTIONS { - let outcome = check_assumption_validation_data(sender, descriptor, *assumption).await; - - match outcome { - AssumptionCheckOutcome::Matches(_, _) => return outcome, - AssumptionCheckOutcome::BadRequest => return outcome, - AssumptionCheckOutcome::DoesNotMatch => continue, - } - } - - AssumptionCheckOutcome::DoesNotMatch -} - -/// Returns validation data for a given candidate. -pub async fn find_validation_data( - sender: &mut Sender, - descriptor: &CandidateDescriptor, -) -> Result, ValidationFailed> -where - Sender: SubsystemSender, -{ - match find_assumed_validation_data(sender, &descriptor).await { - AssumptionCheckOutcome::Matches(validation_data, validation_code) => - Ok(Some((validation_data, validation_code))), - AssumptionCheckOutcome::DoesNotMatch => { - // If neither the assumption of the occupied core having the para included or the assumption - // of the occupied core timing out are valid, then the persisted_validation_data_hash in the descriptor - // is not based on the relay parent and is thus invalid. - Ok(None) - }, - AssumptionCheckOutcome::BadRequest => - Err(ValidationFailed("Assumption Check: Bad request".into())), - } -} - async fn validate_from_chain_state( sender: &mut Sender, - validation_host: ValidationHost, + validation_host: impl ValidationBackend, candidate_receipt: CandidateReceipt, pov: Arc, timeout: Duration, @@ -442,17 +366,30 @@ async fn validate_from_chain_state( where Sender: SubsystemSender, { - let mut new_sender = sender.clone(); - let (validation_data, validation_code) = - match find_validation_data(&mut new_sender, &candidate_receipt.descriptor).await? { - Some((validation_data, validation_code)) => (validation_data, validation_code), - None => return Ok(ValidationResult::Invalid(InvalidCandidate::BadParent)), + let (validation_data, validation_code_hash) = + match request_assumed_validation_data(sender, &candidate_receipt.descriptor).await { + Ok(Some((data, code_hash))) => (data, code_hash), + Ok(None) => { + // If neither the assumption of the occupied core having the para included or the assumption + // of the occupied core timing out are valid, then the persisted_validation_data_hash in the descriptor + // is not based on the relay parent and is thus invalid. + return Ok(ValidationResult::Invalid(InvalidCandidate::BadParent)) + }, + Err(_) => + return Err(ValidationFailed( + "Failed to request persisted validation data from the Runtime API".into(), + )), }; + if candidate_receipt.descriptor.validation_code_hash != validation_code_hash { + return Ok(ValidationResult::Invalid(InvalidCandidate::CodeHashMismatch)) + } + let validation_result = validate_candidate_exhaustive( + sender, validation_host, validation_data, - validation_code, + None, candidate_receipt.clone(), pov, timeout, @@ -489,19 +426,24 @@ where validation_result } -async fn validate_candidate_exhaustive( +async fn validate_candidate_exhaustive( + sender: &mut Sender, mut validation_backend: impl ValidationBackend, persisted_validation_data: PersistedValidationData, - validation_code: ValidationCode, + validation_code: Option, candidate_receipt: CandidateReceipt, pov: Arc, timeout: Duration, metrics: &Metrics, -) -> Result { +) -> Result +where + Sender: SubsystemSender, +{ let _timer = metrics.time_validate_candidate_exhaustive(); - let validation_code_hash = validation_code.hash(); - let para_id = candidate_receipt.descriptor.para_id.clone(); + let descriptor = &candidate_receipt.descriptor; + let validation_code_hash = validation_code.as_ref().map(ValidationCode::hash); + let para_id = descriptor.para_id.clone(); gum::debug!( target: LOG_TARGET, ?validation_code_hash, @@ -510,26 +452,34 @@ async fn validate_candidate_exhaustive( ); if let Err(e) = perform_basic_checks( - &candidate_receipt.descriptor, + &descriptor, persisted_validation_data.max_pov_size, &*pov, - &validation_code_hash, + validation_code_hash.as_ref(), ) { gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (basic checks)"); return Ok(ValidationResult::Invalid(e)) } - let raw_validation_code = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => code, - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)"); + let validation_code = if let Some(validation_code) = validation_code { + let raw_code = match sp_maybe_compressed_blob::decompress( + &validation_code.0, + VALIDATION_CODE_BOMB_LIMIT, + ) { + Ok(code) => code, + Err(e) => { + gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (code decompression failed)"); - // If the validation code is invalid, the candidate certainly is. - return Ok(ValidationResult::Invalid(InvalidCandidate::CodeDecompressionFailure)) - }, + // If the validation code is invalid, the candidate certainly is. + return Ok(ValidationResult::Invalid(InvalidCandidate::CodeDecompressionFailure)) + }, + } + .to_vec(); + PvfDescriptor::from_code(raw_code) + } else { + // In case validation code is not provided, ask the backend to obtain + // it from the cache using the hash. + PvfDescriptor::Hash(ValidationCodeHash::from(descriptor.validation_code_hash)) }; let raw_block_data = @@ -550,9 +500,59 @@ async fn validate_candidate_exhaustive( relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, }; - let result = validation_backend - .validate_candidate(raw_validation_code.to_vec(), timeout, params) - .await; + let result = match validation_backend + .validate_candidate(validation_code, timeout, params.clone()) + .await + { + Err(ValidationError::ArtifactNotFound) => { + // In case preimage for the supplied code hash was not found by the + // validation host, request the code from Runtime API and try again. + gum::debug!( + target: LOG_TARGET, + validation_code_hash = ?descriptor.validation_code_hash, + "Validation host failed to find artifact by provided hash", + ); + + let validation_code = match request_validation_code_by_hash( + sender, + descriptor.relay_parent, + descriptor.validation_code_hash, + ) + .await + { + Ok(Some(validation_code)) => validation_code, + Ok(None) => { + // This path can only happen when validating candidate from chain state. + // Earlier we obtained the PVF hash from the runtime and verified + // that it matches with the one in candidate descriptor. + // As a result, receiving `None` is unexpected and should hint on a major + // bug in Runtime API. + return Err(ValidationFailed( + "Runtime API returned `None` for the validation \ + code with a known hash" + .into(), + )) + }, + Err(_) => return Err(ValidationFailed("Runtime API request failed".into())), + }; + + let raw_code = match sp_maybe_compressed_blob::decompress( + &validation_code.0, + VALIDATION_CODE_BOMB_LIMIT, + ) { + Ok(code) => code, + Err(e) => { + gum::debug!(target: LOG_TARGET, err=?e, "Code decompression failed"); + + // If the validation code is invalid, the candidate certainly is. + return Ok(ValidationResult::Invalid(InvalidCandidate::CodeDecompressionFailure)) + }, + }; + let validation_code = PvfDescriptor::from_code(raw_code.to_vec()); + validation_backend.validate_candidate(validation_code, timeout, params).await + }, + result => result, + }; if let Err(ref error) = result { gum::info!(target: LOG_TARGET, ?para_id, ?error, "Failed to validate candidate",); @@ -569,9 +569,17 @@ async fn validate_candidate_exhaustive( Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError( "ambiguous worker death".to_string(), ))), + Err(ValidationError::ArtifactNotFound) => { + // The code was supplied on the second attempt, this + // error should be unreachable. + let err = ValidationFailed( + "Validation host failed to find artifact even though it was supplied".to_string(), + ); + gum::error!(target: LOG_TARGET, error = ?err); + Err(err) + }, Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))), - Ok(res) => if res.head_data.hash() != candidate_receipt.descriptor.para_head { gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)"); @@ -605,26 +613,26 @@ async fn validate_candidate_exhaustive( trait ValidationBackend { async fn validate_candidate( &mut self, - raw_validation_code: Vec, + validation_code: PvfDescriptor, timeout: Duration, params: ValidationParams, ) -> Result; - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>; + async fn precheck_pvf(&mut self, pvf: PvfPreimage) -> Result<(), PrepareError>; } #[async_trait] impl ValidationBackend for ValidationHost { async fn validate_candidate( &mut self, - raw_validation_code: Vec, + validation_code: PvfDescriptor, timeout: Duration, params: ValidationParams, ) -> Result { let (tx, rx) = oneshot::channel(); if let Err(err) = self .execute_pvf( - Pvf::from_code(raw_validation_code), + validation_code, timeout, params.encode(), polkadot_node_core_pvf::Priority::Normal, @@ -645,7 +653,7 @@ impl ValidationBackend for ValidationHost { validation_result } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> { + async fn precheck_pvf(&mut self, pvf: PvfPreimage) -> Result<(), PrepareError> { let (tx, rx) = oneshot::channel(); if let Err(_) = self.precheck_pvf(pvf, tx).await { return Err(PrepareError::DidNotMakeIt) @@ -663,7 +671,7 @@ fn perform_basic_checks( candidate: &CandidateDescriptor, max_pov_size: u32, pov: &PoV, - validation_code_hash: &ValidationCodeHash, + validation_code_hash: Option<&ValidationCodeHash>, ) -> Result<(), InvalidCandidate> { let pov_hash = pov.hash(); @@ -676,7 +684,7 @@ fn perform_basic_checks( return Err(InvalidCandidate::PoVHashMismatch) } - if *validation_code_hash != candidate.validation_code_hash { + if let Some(false) = validation_code_hash.map(|hash| *hash == candidate.validation_code_hash) { return Err(InvalidCandidate::CodeHashMismatch) } diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index ecac13d1440d..21e8398eef31 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -27,9 +27,8 @@ use sp_core::testing::TaskExecutor; use sp_keyring::Sr25519Keyring; #[test] -fn correctly_checks_included_assumption() { +fn correctly_finds_assumed_validation_data() { let validation_data: PersistedValidationData = Default::default(); - let validation_code: ValidationCode = vec![1, 2, 3].into(); let persisted_validation_data_hash = validation_data.hash(); let relay_parent = [2; 32].into(); @@ -46,51 +45,36 @@ fn correctly_checks_included_assumption() { Sr25519Keyring::Alice, ); + let code_hash = ValidationCodeHash::from(Hash::zero()); + let pool = TaskExecutor::new(); let (mut ctx, mut ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); - let (check_fut, check_result) = check_assumption_validation_data( - ctx.sender(), - &descriptor, - OccupiedCoreAssumption::Included, - ) - .remote_handle(); + let (check_fut, check_result) = + request_assumed_validation_data(ctx.sender(), &descriptor).remote_handle(); let test_fut = async move { assert_matches!( ctx_handle.recv().await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( rp, - RuntimeApiRequest::PersistedValidationData( + RuntimeApiRequest::AssumedValidationData( p, - OccupiedCoreAssumption::Included, + hash, tx ), )) => { assert_eq!(rp, relay_parent); assert_eq!(p, para_id); + assert_eq!(hash, persisted_validation_data_hash); - let _ = tx.send(Ok(Some(validation_data.clone()))); + let _ = tx.send(Ok(Some((validation_data.clone(), code_hash)))); } ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::ValidationCode(p, OccupiedCoreAssumption::Included, tx) - )) => { - assert_eq!(rp, relay_parent); - assert_eq!(p, para_id); - - let _ = tx.send(Ok(Some(validation_code.clone()))); - } - ); - - assert_matches!(check_result.await, AssumptionCheckOutcome::Matches(o, v) => { - assert_eq!(o, validation_data); - assert_eq!(v, validation_code); + assert_matches!(check_result.await, Ok(Some((d, _))) => { + assert_eq!(d, validation_data); }); }; @@ -99,10 +83,8 @@ fn correctly_checks_included_assumption() { } #[test] -fn correctly_checks_timed_out_assumption() { +fn check_is_none_if_no_validation_data() { let validation_data: PersistedValidationData = Default::default(); - let validation_code: ValidationCode = vec![1, 2, 3].into(); - let persisted_validation_data_hash = validation_data.hash(); let relay_parent = [2; 32].into(); let para_id = ParaId::from(5_u32); @@ -122,48 +104,28 @@ fn correctly_checks_timed_out_assumption() { let (mut ctx, mut ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); - let (check_fut, check_result) = check_assumption_validation_data( - ctx.sender(), - &descriptor, - OccupiedCoreAssumption::TimedOut, - ) - .remote_handle(); + let (check_fut, check_result) = + request_assumed_validation_data(ctx.sender(), &descriptor).remote_handle(); let test_fut = async move { assert_matches!( ctx_handle.recv().await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( rp, - RuntimeApiRequest::PersistedValidationData( + RuntimeApiRequest::AssumedValidationData( p, - OccupiedCoreAssumption::TimedOut, + _, tx ), )) => { assert_eq!(rp, relay_parent); assert_eq!(p, para_id); - let _ = tx.send(Ok(Some(validation_data.clone()))); - } - ); - - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::ValidationCode(p, OccupiedCoreAssumption::TimedOut, tx) - )) => { - assert_eq!(rp, relay_parent); - assert_eq!(p, para_id); - - let _ = tx.send(Ok(Some(validation_code.clone()))); + let _ = tx.send(Ok(None)); } ); - assert_matches!(check_result.await, AssumptionCheckOutcome::Matches(o, v) => { - assert_eq!(o, validation_data); - assert_eq!(v, validation_code); - }); + assert_matches!(check_result.await, Ok(None)); }; let test_fut = future::join(test_fut, check_fut); @@ -171,16 +133,14 @@ fn correctly_checks_timed_out_assumption() { } #[test] -fn check_is_bad_request_if_no_validation_data() { - let validation_data: PersistedValidationData = Default::default(); - let persisted_validation_data_hash = validation_data.hash(); +fn check_runtime_api_error() { let relay_parent = [2; 32].into(); - let para_id = ParaId::from(5_u32); + let para_id = 5.into(); let descriptor = make_valid_candidate_descriptor( para_id, relay_parent, - persisted_validation_data_hash, + Hash::from([3; 32]), dummy_hash(), dummy_hash(), dummy_hash(), @@ -192,130 +152,109 @@ fn check_is_bad_request_if_no_validation_data() { let (mut ctx, mut ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); - let (check_fut, check_result) = check_assumption_validation_data( - ctx.sender(), - &descriptor, - OccupiedCoreAssumption::Included, - ) - .remote_handle(); + let (check_fut, check_result) = + request_assumed_validation_data(ctx.sender(), &descriptor).remote_handle(); let test_fut = async move { assert_matches!( ctx_handle.recv().await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( rp, - RuntimeApiRequest::PersistedValidationData( + RuntimeApiRequest::AssumedValidationData( p, - OccupiedCoreAssumption::Included, + _, tx ), )) => { assert_eq!(rp, relay_parent); assert_eq!(p, para_id); - let _ = tx.send(Ok(None)); + let _ = tx.send(Err(RuntimeApiError::NotSupported { runtime_api_name: "" })); } ); - assert_matches!(check_result.await, AssumptionCheckOutcome::BadRequest); + assert_matches!(check_result.await, Err(RuntimeRequestFailed)); }; let test_fut = future::join(test_fut, check_fut); executor::block_on(test_fut); } -#[test] -fn check_is_bad_request_if_no_validation_code() { - let validation_data: PersistedValidationData = Default::default(); - let persisted_validation_data_hash = validation_data.hash(); - let relay_parent = [2; 32].into(); - let para_id = ParaId::from(5_u32); - - let descriptor = make_valid_candidate_descriptor( - para_id, - relay_parent, - persisted_validation_data_hash, - dummy_hash(), - dummy_hash(), - dummy_hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let pool = TaskExecutor::new(); - let (mut ctx, mut ctx_handle) = - test_helpers::make_subsystem_context::(pool.clone()); - - let (check_fut, check_result) = check_assumption_validation_data( - ctx.sender(), - &descriptor, - OccupiedCoreAssumption::TimedOut, - ) - .remote_handle(); - - let test_fut = async move { - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::PersistedValidationData( - p, - OccupiedCoreAssumption::TimedOut, - tx - ), - )) => { - assert_eq!(rp, relay_parent); - assert_eq!(p, para_id); - - let _ = tx.send(Ok(Some(validation_data.clone()))); - } - ); - - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::ValidationCode(p, OccupiedCoreAssumption::TimedOut, tx) - )) => { - assert_eq!(rp, relay_parent); - assert_eq!(p, para_id); +struct MockValidateCandidateBackend { + result: Result, +} - let _ = tx.send(Ok(None)); - } - ); +impl MockValidateCandidateBackend { + fn with_hardcoded_result(result: Result) -> Self { + Self { result } + } +} - assert_matches!(check_result.await, AssumptionCheckOutcome::BadRequest); - }; +#[async_trait] +impl ValidationBackend for MockValidateCandidateBackend { + async fn validate_candidate( + &mut self, + raw_validation_code: PvfDescriptor, + _timeout: Duration, + _params: ValidationParams, + ) -> Result { + match raw_validation_code { + PvfDescriptor::Preimage(_) => self.result.clone(), + PvfDescriptor::Hash(_) => Err(ValidationError::ArtifactNotFound), + } + } - let test_fut = future::join(test_fut, check_fut); - executor::block_on(test_fut); + async fn precheck_pvf(&mut self, _pvf: PvfPreimage) -> Result<(), PrepareError> { + unreachable!() + } } #[test] -fn check_does_not_match() { - let validation_data: PersistedValidationData = Default::default(); - let relay_parent = Hash::repeat_byte(0x02); - let para_id = ParaId::from(5_u32); +fn check_runtime_validation_code_request() { + let metrics = Default::default(); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let validation_code = ValidationCode(vec![2; 16]); + let head_data = HeadData(vec![1, 1, 1]); let descriptor = make_valid_candidate_descriptor( - para_id, - relay_parent, - Hash::from([3; 32]), - dummy_hash(), - dummy_hash(), + 1.into(), dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + head_data.hash(), dummy_hash(), Sr25519Keyring::Alice, ); + let validation_result = WasmValidationResult { + head_data, + new_validation_code: Some(vec![2, 2, 2].into()), + upward_messages: Vec::new(), + horizontal_messages: Vec::new(), + processed_downward_messages: 0, + hrmp_watermark: 0, + }; + + let commitments = make_commitments_from_validation_result(&validation_result); + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; + let pool = TaskExecutor::new(); let (mut ctx, mut ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); - let (check_fut, check_result) = check_assumption_validation_data( + let (check_fut, check_result) = validate_candidate_exhaustive( ctx.sender(), - &descriptor, - OccupiedCoreAssumption::Included, + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data.clone(), + None, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, ) .remote_handle(); @@ -323,55 +262,31 @@ fn check_does_not_match() { assert_matches!( ctx_handle.recv().await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::PersistedValidationData( - p, - OccupiedCoreAssumption::Included, + _, + RuntimeApiRequest::ValidationCodeByHash( + validation_code_hash, tx ), )) => { - assert_eq!(rp, relay_parent); - assert_eq!(p, para_id); + assert_eq!(validation_code_hash, validation_code.hash()); - let _ = tx.send(Ok(Some(validation_data.clone()))); + let _ = tx.send(Ok(Some(validation_code))); } ); - assert_matches!(check_result.await, AssumptionCheckOutcome::DoesNotMatch); + assert_matches!(check_result.await, Ok(ValidationResult::Valid(_, _))); }; let test_fut = future::join(test_fut, check_fut); executor::block_on(test_fut); } -struct MockValidateCandidateBackend { - result: Result, -} - -impl MockValidateCandidateBackend { - fn with_hardcoded_result(result: Result) -> Self { - Self { result } - } -} - -#[async_trait] -impl ValidationBackend for MockValidateCandidateBackend { - async fn validate_candidate( - &mut self, - _raw_validation_code: Vec, - _timeout: Duration, - _params: ValidationParams, - ) -> Result { - self.result.clone() - } - - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> { - unreachable!() - } -} - #[test] fn candidate_validation_ok_is_ok() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; let pov = PoV { block_data: BlockData(vec![1; 32]) }; @@ -393,7 +308,7 @@ fn candidate_validation_ok_is_ok() { &descriptor, validation_data.max_pov_size, &pov, - &validation_code.hash(), + Some(&validation_code.hash()), ); assert!(check.is_ok()); @@ -406,21 +321,15 @@ fn candidate_validation_ok_is_ok() { hrmp_watermark: 0, }; - let commitments = CandidateCommitments { - head_data: validation_result.head_data.clone(), - upward_messages: validation_result.upward_messages.clone(), - horizontal_messages: validation_result.horizontal_messages.clone(), - new_validation_code: validation_result.new_validation_code.clone(), - processed_downward_messages: validation_result.processed_downward_messages, - hrmp_watermark: validation_result.hrmp_watermark, - }; + let commitments = make_commitments_from_validation_result(&validation_result); let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data.clone(), - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -440,6 +349,10 @@ fn candidate_validation_ok_is_ok() { #[test] fn candidate_validation_bad_return_is_invalid() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; let pov = PoV { block_data: BlockData(vec![1; 32]) }; @@ -460,18 +373,19 @@ fn candidate_validation_bad_return_is_invalid() { &descriptor, validation_data.max_pov_size, &pov, - &validation_code.hash(), + Some(&validation_code.hash()), ); assert!(check.is_ok()); let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Err( ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath), )), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -484,6 +398,10 @@ fn candidate_validation_bad_return_is_invalid() { #[test] fn candidate_validation_timeout_is_internal_error() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; let pov = PoV { block_data: BlockData(vec![1; 32]) }; @@ -504,18 +422,19 @@ fn candidate_validation_timeout_is_internal_error() { &descriptor, validation_data.max_pov_size, &pov, - &validation_code.hash(), + Some(&validation_code.hash()), ); assert!(check.is_ok()); let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Err( ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), )), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -556,10 +475,15 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { hrmp_watermark: 12345, }; + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let result = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -573,6 +497,10 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { #[test] fn candidate_validation_code_mismatch_is_invalid() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; let pov = PoV { block_data: BlockData(vec![1; 32]) }; @@ -593,18 +521,19 @@ fn candidate_validation_code_mismatch_is_invalid() { &descriptor, validation_data.max_pov_size, &pov, - &validation_code.hash(), + Some(&validation_code.hash()), ); assert_matches!(check, Err(InvalidCandidate::CodeHashMismatch)); let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Err( ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), )), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -617,6 +546,10 @@ fn candidate_validation_code_mismatch_is_invalid() { #[test] fn compressed_code_works() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; let pov = PoV { block_data: BlockData(vec![1; 32]) }; let head_data = HeadData(vec![1, 1, 1]); @@ -646,21 +579,15 @@ fn compressed_code_works() { hrmp_watermark: 0, }; - let commitments = CandidateCommitments { - head_data: validation_result.head_data.clone(), - upward_messages: validation_result.upward_messages.clone(), - horizontal_messages: validation_result.horizontal_messages.clone(), - new_validation_code: validation_result.new_validation_code.clone(), - processed_downward_messages: validation_result.processed_downward_messages, - hrmp_watermark: validation_result.hrmp_watermark, - }; + let commitments = make_commitments_from_validation_result(&validation_result); let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -672,6 +599,10 @@ fn compressed_code_works() { #[test] fn code_decompression_failure_is_invalid() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; let pov = PoV { block_data: BlockData(vec![1; 32]) }; let head_data = HeadData(vec![1, 1, 1]); @@ -705,9 +636,10 @@ fn code_decompression_failure_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), @@ -719,6 +651,10 @@ fn code_decompression_failure_is_invalid() { #[test] fn pov_decompression_failure_is_invalid() { + let pool = TaskExecutor::new(); + let (mut ctx, mut _ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: POV_BOMB_LIMIT as u32, ..Default::default() }; let head_data = HeadData(vec![1, 1, 1]); @@ -753,18 +689,163 @@ fn pov_decompression_failure_is_invalid() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; let v = executor::block_on(validate_candidate_exhaustive( + ctx.sender(), MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, - validation_code, + Some(validation_code), candidate_receipt, Arc::new(pov), Duration::from_secs(0), &Default::default(), )); - assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))); } +#[test] +fn requests_code_from_api_if_not_found() { + let pool = TaskExecutor::new(); + let (mut ctx, mut ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let head_data = HeadData(vec![1, 1, 1]); + let validation_code = ValidationCode(vec![2; 16]); + let validation_code_hash = validation_code.hash(); + + let descriptor = make_valid_candidate_descriptor( + 1.into(), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + head_data.hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let validation_result = WasmValidationResult { + head_data, + new_validation_code: Some(vec![2, 2, 2].into()), + upward_messages: Vec::new(), + horizontal_messages: Vec::new(), + processed_downward_messages: 0, + hrmp_watermark: 0, + }; + + let commitments = make_commitments_from_validation_result(&validation_result); + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; + + let metrics = Default::default(); + + let (check_fut, check_result) = validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + validation_data, + None, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + .remote_handle(); + + let test_fut = async move { + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _rp, + RuntimeApiRequest::ValidationCodeByHash( + code_hash, + tx + ), + )) => { + assert_eq!(code_hash, validation_code_hash); + + let _ = tx.send(Ok(Some(validation_code))); + } + ); + + assert_matches!(check_result.await, Ok(ValidationResult::Valid(_, _))); + }; + + let test_fut = future::join(test_fut, check_fut); + executor::block_on(test_fut); +} + +#[test] +fn code_hash_mismatch_error() { + let pool = TaskExecutor::new(); + let (mut ctx, mut ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let head_data = HeadData(vec![1, 1, 1]); + let validation_code = ValidationCode(vec![2; 16]); + + let descriptor = make_valid_candidate_descriptor( + 1.into(), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + head_data.hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let validation_result = WasmValidationResult { + head_data, + new_validation_code: Some(vec![2, 2, 2].into()), + upward_messages: Vec::new(), + horizontal_messages: Vec::new(), + processed_downward_messages: 0, + hrmp_watermark: 0, + }; + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + + let metrics = Default::default(); + + let (check_fut, check_result) = validate_from_chain_state( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &metrics, + ) + .remote_handle(); + + let test_fut = async move { + assert_matches!( + ctx_handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _rp, + RuntimeApiRequest::AssumedValidationData( + _p, + _hash, + tx + ), + )) => { + let code_hash = ValidationCodeHash::from(Hash::zero()); + let _ = tx.send(Ok(Some((validation_data, code_hash)))); + } + ); + + assert_matches!( + check_result.await, + Ok(ValidationResult::Invalid(InvalidCandidate::CodeHashMismatch)) + ); + }; + + let test_fut = future::join(test_fut, check_fut); + executor::block_on(test_fut); +} + struct MockPreCheckBackend { result: Result<(), PrepareError>, } @@ -779,14 +860,14 @@ impl MockPreCheckBackend { impl ValidationBackend for MockPreCheckBackend { async fn validate_candidate( &mut self, - _raw_validation_code: Vec, + _raw_validation_code: PvfDescriptor, _timeout: Duration, _params: ValidationParams, ) -> Result { unreachable!() } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> { + async fn precheck_pvf(&mut self, _pvf: PvfPreimage) -> Result<(), PrepareError> { self.result.clone() } } @@ -813,6 +894,7 @@ fn precheck_works() { assert_matches!( ctx_handle.recv().await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( + rp, RuntimeApiRequest::ValidationCodeByHash( vch, @@ -846,7 +928,6 @@ fn precheck_invalid_pvf_blob_compression() { let pool = TaskExecutor::new(); let (mut ctx, mut ctx_handle) = test_helpers::make_subsystem_context::(pool.clone()); - let (check_fut, check_result) = precheck_pvf( ctx.sender(), MockPreCheckBackend::with_hardcoded_result(Ok(())), @@ -927,3 +1008,16 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); inner(Err(PrepareError::DidNotMakeIt), PreCheckOutcome::Failed); } + +fn make_commitments_from_validation_result( + validation_result: &WasmValidationResult, +) -> CandidateCommitments { + CandidateCommitments { + head_data: validation_result.head_data.clone(), + upward_messages: validation_result.upward_messages.clone(), + horizontal_messages: validation_result.horizontal_messages.clone(), + new_validation_code: validation_result.new_validation_code.clone(), + processed_downward_messages: validation_result.processed_downward_messages, + hrmp_watermark: validation_result.hrmp_watermark, + } +} diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 4aca2da4b3ba..3b14caf16e77 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -43,6 +43,8 @@ pub enum ValidationError { InvalidCandidate(InvalidCandidate), /// This error is raised due to inability to serve the request. InternalError(String), + /// Provided validation code hash is not present in the artifacts cache. + ArtifactNotFound, } /// A description of an error raised during executing a PVF and can be attributed to the combination diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 464f8d322648..b1f548192b66 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -24,7 +24,9 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, execute, metrics::Metrics, - prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, + prepare, + pvf::{PvfDescriptor, PvfPreimage}, + PrepareResult, Priority, ValidationError, LOG_TARGET, }; use always_assert::never; use async_std::path::{Path, PathBuf}; @@ -60,7 +62,7 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn precheck_pvf( &mut self, - pvf: Pvf, + pvf: PvfPreimage, result_tx: PrepareResultSender, ) -> Result<(), String> { self.to_host_tx @@ -78,7 +80,7 @@ impl ValidationHost { /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. pub async fn execute_pvf( &mut self, - pvf: Pvf, + pvf: PvfDescriptor, execution_timeout: Duration, params: Vec, priority: Priority, @@ -96,7 +98,7 @@ impl ValidationHost { /// situations this function should return immediately. /// /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. - pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { + pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { self.to_host_tx .send(ToHost::HeadsUp { active_pvfs }) .await @@ -106,18 +108,18 @@ impl ValidationHost { enum ToHost { PrecheckPvf { - pvf: Pvf, + pvf: PvfPreimage, result_tx: PrepareResultSender, }, ExecutePvf { - pvf: Pvf, + pvf: PvfDescriptor, execution_timeout: Duration, params: Vec, priority: Priority, result_tx: ResultSender, }, HeadsUp { - active_pvfs: Vec, + active_pvfs: Vec, }, } @@ -421,7 +423,7 @@ async fn handle_to_host( async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, - pvf: Pvf, + pvf: PvfPreimage, result_sender: PrepareResultSender, ) -> Result<(), Fatal> { let artifact_id = pvf.as_artifact_id(); @@ -452,7 +454,7 @@ async fn handle_execute_pvf( prepare_queue: &mut mpsc::Sender, execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, - pvf: Pvf, + pvf: PvfDescriptor, execution_timeout: Duration, params: Vec, priority: Priority, @@ -484,12 +486,16 @@ async fn handle_execute_pvf( }, } } else { - // Artifact is unknown: register it and enqueue a job with the corresponding priority and - // - artifacts.insert_preparing(artifact_id.clone(), Vec::new()); - send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; + if let PvfDescriptor::Preimage(code) = pvf { + // Artifact is unknown: register it and enqueue a job with the corresponding priority and + artifacts.insert_preparing(artifact_id.clone(), Vec::new()); + send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf: code }).await?; - awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); + awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); + } else { + // Expect another request with PVF code provided. + let _ = result_tx.send(Err(ValidationError::ArtifactNotFound)); + } } return Ok(()) @@ -498,7 +504,7 @@ async fn handle_execute_pvf( async fn handle_heads_up( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, - active_pvfs: Vec, + active_pvfs: Vec, ) -> Result<(), Fatal> { let now = SystemTime::now(); @@ -701,12 +707,12 @@ mod tests { } /// Creates a new PVF which artifact id can be uniquely identified by the given number. - fn artifact_id(descriminator: u32) -> ArtifactId { - Pvf::from_discriminator(descriminator).as_artifact_id() + fn artifact_id(discriminator: u32) -> ArtifactId { + PvfPreimage::from_discriminator(discriminator).as_artifact_id() } - fn artifact_path(descriminator: u32) -> PathBuf { - artifact_id(descriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned() + fn artifact_path(discriminator: u32) -> PathBuf { + artifact_id(discriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned() } struct Builder { @@ -857,6 +863,11 @@ mod tests { } } + /// Creates a new PVF which artifact id can be uniquely identified by the given number. + fn pvf(discriminator: u32) -> PvfPreimage { + PvfPreimage::from_discriminator(discriminator) + } + #[async_std::test] async fn shutdown_on_handle_drop() { let test = Builder::default().build(); @@ -881,7 +892,7 @@ mod tests { let mut test = builder.build(); let mut host = test.host_handle(); - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![pvf(1)]).await.unwrap(); let to_sweeper_rx = &mut test.to_sweeper_rx; run_until( @@ -895,7 +906,7 @@ mod tests { // Extend TTL for the first artifact and make sure we don't receive another file removal // request. - host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap(); + host.heads_up(vec![pvf(1)]).await.unwrap(); test.poll_ensure_to_sweeper_is_empty().await; } @@ -906,7 +917,7 @@ mod tests { let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Normal, @@ -917,7 +928,7 @@ mod tests { let (result_tx, result_rx_pvf_1_2) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Critical, @@ -928,7 +939,7 @@ mod tests { let (result_tx, result_rx_pvf_2) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(2), + PvfDescriptor::Preimage(PvfPreimage::from_discriminator(2)), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Normal, @@ -1000,7 +1011,7 @@ mod tests { // First, test a simple precheck request. let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + host.precheck_pvf(PvfPreimage::from_discriminator(1), result_tx).await.unwrap(); // The queue received the prepare request. assert_matches!( @@ -1021,7 +1032,7 @@ mod tests { let mut precheck_receivers = Vec::new(); for _ in 0..3 { let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap(); + host.precheck_pvf(PvfPreimage::from_discriminator(2), result_tx).await.unwrap(); precheck_receivers.push(result_rx); } // Received prepare request. @@ -1056,7 +1067,7 @@ mod tests { // Send PVF for the execution and request the prechecking for it. let (result_tx, result_rx_execute) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Critical, @@ -1071,7 +1082,7 @@ mod tests { ); let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap(); + host.precheck_pvf(PvfPreimage::from_discriminator(1), result_tx).await.unwrap(); // Suppose the preparation failed, the execution queue is empty and both // "clients" receive their results. @@ -1093,13 +1104,13 @@ mod tests { let mut precheck_receivers = Vec::new(); for _ in 0..3 { let (result_tx, result_rx) = oneshot::channel(); - host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap(); + host.precheck_pvf(PvfPreimage::from_discriminator(2), result_tx).await.unwrap(); precheck_receivers.push(result_rx); } let (result_tx, _result_rx_execute) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(2), + PvfDescriptor::Preimage(PvfPreimage::from_discriminator(2)), TEST_EXECUTION_TIMEOUT, b"pvf2".to_vec(), Priority::Critical, @@ -1134,7 +1145,7 @@ mod tests { let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( - Pvf::from_discriminator(1), + PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)), TEST_EXECUTION_TIMEOUT, b"pvf1".to_vec(), Priority::Normal, @@ -1157,4 +1168,64 @@ mod tests { test.poll_ensure_to_execute_queue_is_empty().await; } + + #[async_std::test] + async fn artifact_cache() { + let mut test = Builder::default().build(); + let mut host = test.host_handle(); + + let pvf_code = PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)); + let pvf_hash = PvfDescriptor::Hash(pvf_code.hash()); + + // First, ensure that we receive an `ArtifactNotFound` error when sending + // an unknown code hash. + let (result_tx, result_rx) = oneshot::channel(); + host.execute_pvf( + pvf_hash.clone(), + TEST_EXECUTION_TIMEOUT, + Vec::new(), + Priority::Normal, + result_tx, + ) + .await + .unwrap(); + + test.poll_ensure_to_execute_queue_is_empty().await; + assert_matches!( + result_rx.now_or_never().unwrap().unwrap(), + Err(ValidationError::ArtifactNotFound) + ); + + // Supply the code and retry the request. + let (result_tx, _result_rx) = oneshot::channel(); + host.execute_pvf( + pvf_code, // Code. + TEST_EXECUTION_TIMEOUT, + Vec::new(), + Priority::Normal, + result_tx, + ) + .await + .unwrap(); + + assert_matches!( + test.poll_and_recv_to_prepare_queue().await, + prepare::ToQueue::Enqueue { .. } + ); + + let (result_tx, result_rx) = oneshot::channel(); + host.execute_pvf( + pvf_hash, // Hash. + TEST_EXECUTION_TIMEOUT, + Vec::new(), + Priority::Normal, + result_tx, + ) + .await + .unwrap(); + + test.poll_ensure_to_execute_queue_is_empty().await; + // The execution is queued, no error expected. + assert_matches!(result_rx.now_or_never(), None); + } } diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index ef5f31889237..28b69acb9364 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -94,7 +94,7 @@ pub use sp_tracing; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use priority::Priority; -pub use pvf::Pvf; +pub use pvf::{PvfDescriptor, PvfPreimage}; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 5aa1402916d6..9a2ca433aeac 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -17,7 +17,9 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; +use crate::{ + artifacts::ArtifactId, metrics::Metrics, pvf::PvfPreimage, PrepareResult, Priority, LOG_TARGET, +}; use always_assert::{always, never}; use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -30,7 +32,7 @@ pub enum ToQueue { /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the /// [`FromQueue`] response. - Enqueue { priority: Priority, pvf: Pvf }, + Enqueue { priority: Priority, pvf: PvfPreimage }, } /// A response from queue. @@ -75,7 +77,7 @@ slotmap::new_key_type! { pub struct Job; } struct JobData { /// The priority of this job. Can be bumped. priority: Priority, - pvf: Pvf, + pvf: PvfPreimage, worker: Option, } @@ -210,7 +212,11 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat Ok(()) } -async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> { +async fn handle_enqueue( + queue: &mut Queue, + priority: Priority, + pvf: PvfPreimage, +) -> Result<(), Fatal> { gum::debug!( target: LOG_TARGET, validation_code_hash = ?pvf.code_hash, @@ -485,8 +491,8 @@ mod tests { use std::task::Poll; /// Creates a new PVF which artifact id can be uniquely identified by the given number. - fn pvf(descriminator: u32) -> Pvf { - Pvf::from_discriminator(descriminator) + fn pvf(discriminator: u32) -> PvfPreimage { + PvfPreimage::from_discriminator(discriminator) } async fn run_until( diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/src/pvf.rs index 901cc1c70d6e..ff7ae09a1d12 100644 --- a/node/core/pvf/src/pvf.rs +++ b/node/core/pvf/src/pvf.rs @@ -23,18 +23,12 @@ use std::{fmt, sync::Arc}; /// /// Should be cheap to clone. #[derive(Clone)] -pub struct Pvf { +pub struct PvfPreimage { pub(crate) code: Arc>, pub(crate) code_hash: ValidationCodeHash, } -impl fmt::Debug for Pvf { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Pvf {{ code, code_hash: {:?} }}", self.code_hash) - } -} - -impl Pvf { +impl PvfPreimage { /// Returns an instance of the PVF out of the given PVF code. pub fn from_code(code: Vec) -> Self { let code = Arc::new(code); @@ -45,12 +39,52 @@ impl Pvf { /// Creates a new PVF which artifact id can be uniquely identified by the given number. #[cfg(test)] pub(crate) fn from_discriminator(num: u32) -> Self { - let descriminator_buf = num.to_le_bytes().to_vec(); - Pvf::from_code(descriminator_buf) + let discriminator_buf = num.to_le_bytes().to_vec(); + Self::from_code(discriminator_buf) } +} - /// Returns the artifact ID that corresponds to this PVF. +impl fmt::Debug for PvfPreimage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Pvf {{ code, code_hash: {:?} }}", self.code_hash) + } +} + +impl PvfPreimage { pub(crate) fn as_artifact_id(&self) -> ArtifactId { ArtifactId::new(self.code_hash) } } + +/// An enum that either contains full preimage of the validation function +/// (see [`PvfPreimage`]) or the hash only. +#[derive(Clone, Debug)] +pub enum PvfDescriptor { + /// Hash-preimage of the validation function, carries the full bytecode. + Preimage(PvfPreimage), + /// Hash of the validation function. + Hash(ValidationCodeHash), +} + +impl PvfDescriptor { + /// Returns an instance of the PVF out of the given PVF code. + pub fn from_code(code: Vec) -> Self { + Self::Preimage(PvfPreimage::from_code(code)) + } + + /// Returns the validation code hash of the given PVF. + pub fn hash(&self) -> ValidationCodeHash { + match self { + Self::Preimage(code) => code.code_hash, + Self::Hash(hash) => *hash, + } + } + + /// Returns the artifact ID that corresponds to this PVF. + pub(crate) fn as_artifact_id(&self) -> ArtifactId { + match self { + Self::Preimage(ref inner) => inner.as_artifact_id(), + Self::Hash(code_hash) => ArtifactId::new(*code_hash), + } + } +} diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index bf0983d50874..bda85e5412d8 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -17,7 +17,7 @@ use async_std::sync::Mutex; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, + start, Config, InvalidCandidate, Metrics, PvfDescriptor, ValidationError, ValidationHost, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use std::time::Duration; @@ -65,7 +65,7 @@ impl TestHost { .lock() .await .execute_pvf( - Pvf::from_code(code.into()), + PvfDescriptor::from_code(code.into()), TEST_EXECUTION_TIMEOUT, params.encode(), polkadot_node_core_pvf::Priority::Normal, diff --git a/roadmap/implementers-guide/src/node/subsystems-and-jobs.md b/roadmap/implementers-guide/src/node/subsystems-and-jobs.md index 6e3b4cd2d166..20310bcfc620 100644 --- a/roadmap/implementers-guide/src/node/subsystems-and-jobs.md +++ b/roadmap/implementers-guide/src/node/subsystems-and-jobs.md @@ -90,8 +90,8 @@ digraph { cand_sel -> cand_back [arrowhead = "onormal", label = "Second"] cand_sel -> coll_prot [arrowhead = "onormal", label = "ReportCollator"] - cand_val -> runt_api [arrowhead = "diamond", label = "Request::PersistedValidationData"] - cand_val -> runt_api [arrowhead = "diamond", label = "Request::ValidationCode"] + cand_val -> runt_api [arrowhead = "diamond", label = "Request::AssumedValidationData"] + cand_val -> runt_api [arrowhead = "diamond", label = "Request::ValidationCodeByHash"] cand_val -> runt_api [arrowhead = "diamond", label = "Request::CheckValidationOutputs"] coll_gen -> coll_prot [arrowhead = "onormal", label = "DistributeCollation"] diff --git a/roadmap/implementers-guide/src/node/utility/candidate-validation.md b/roadmap/implementers-guide/src/node/utility/candidate-validation.md index 5393368c5c6b..404b18c09fb3 100644 --- a/roadmap/implementers-guide/src/node/utility/candidate-validation.md +++ b/roadmap/implementers-guide/src/node/utility/candidate-validation.md @@ -32,11 +32,11 @@ For a [`CandidateValidationMessage`][CVM]`::ValidateFromExhaustive`, these param For a [`CandidateValidationMessage`][CVM]`::ValidateFromChainState`, some more work needs to be done. Due to the uncertainty of Availability Cores (implemented in the [`Scheduler`](../../runtime/scheduler.md) module of the runtime), a candidate at a particular relay-parent and for a particular para may have two different valid validation-data to be executed under depending on what is assumed to happen if the para is occupying a core at the onset of the new block. This is encoded as an `OccupiedCoreAssumption` in the runtime API. -The way that we can determine which assumption the candidate is meant to be executed under is simply to do an exhaustive check of both possibilities based on the state of the relay-parent. First we fetch the validation data under the assumption that the block occupying becomes available. If the `validation_data_hash` of the `CandidateDescriptor` matches this validation data, we use that. Otherwise, if the `validation_data_hash` matches the validation data fetched under the `TimedOut` assumption, we use that. Otherwise, we return a `ValidationResult::Invalid` response and conclude. +For this reason this subsystem uses a convenient Runtime API endpoint — [`AssumedValidationData`](../../types/overseer-protocol.md#runtime-api-message). It accepts two parameters: parachain ID and the expected hash of the validation data, — the one we obtain from the `CandidateDescriptor`. Then the runtime tries to construct the validation data for the given parachain first under the assumption that the block occupying the core didn't advance the para, i.e. it didn't reach the availability, if the data hash doesn't match the expected one, tries again with force enacting the core, temporarily updating the state as if the block had been deemed available. In case of a match the API returns the constructed validation data along with the corresponding validation code hash, or `None` otherwise. The latter means that the validation data hash in the descriptor is not based on the relay parent and thus given candidate is invalid. -Then, we can fetch the validation code from the runtime based on which type of candidate this is. This gives us all the parameters. The descriptor and PoV come from the request itself, and the other parameters have been derived from the state. +The validation backend, the one that is responsible for actually compiling and executing wasm code, keeps an artifact cache. This allows the subsystem to attempt the validation by the code hash obtained earlier. If the code with the given hash is missing though, we will have to perform another request necessary to obtain the validation function: `ValidationCodeByHash`. -> TODO: This would be a great place for caching to avoid making lots of runtime requests. That would need a job, though. +This gives us all the parameters. The descriptor and PoV come from the request itself, and the other parameters have been derived from the state. ### Execution of the Parachain Wasm diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index b2559c4cfda7..655ce2cfaa98 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -700,6 +700,13 @@ enum RuntimeApiRequest { OccupiedCoreAssumption, ResponseChannel>, ), + /// Get the persisted validation data for a particular para along with the current validation code + /// hash, matching the data hash against an expected one. + AssumedValidationData( + ParaId, + Hash, + RuntimeApiSender>, + ), /// Sends back `true` if the commitments pass all acceptance criteria checks. CheckValidationOutputs( ParaId,