diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index dce589e4c708..59c4aa562e74 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -691,6 +691,9 @@ trait ValidationBackend { /// Tries executing a PVF. Will retry once if an error is encountered that may have been /// transient. + /// + /// NOTE: Should retry only on errors that are a result of execution itself, and not of + /// preparation. async fn validate_candidate_with_retry( &mut self, raw_validation_code: Vec, @@ -698,31 +701,44 @@ trait ValidationBackend { params: ValidationParams, executor_params: ExecutorParams, ) -> Result { - // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient); + // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout); let mut validation_result = self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await; - // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the - // assumption that the conditions that caused this error may have been transient. Note that - // this error is only a result of execution itself and not of preparation. - if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = - validation_result - { - // Wait a brief delay before retrying. - futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await; + // Allow limited retries for each kind of error. + let mut num_internal_retries_left = 1; + let mut num_awd_retries_left = 1; + loop { + match validation_result { + Err(ValidationError::InvalidCandidate( + WasmInvalidCandidate::AmbiguousWorkerDeath, + )) if num_awd_retries_left > 0 => num_awd_retries_left -= 1, + Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 => + num_internal_retries_left -= 1, + _ => break, + } + + // If we got a possibly transient error, retry once after a brief delay, on the assumption + // that the conditions that caused this error may have resolved on their own. + { + // Wait a brief delay before retrying. + futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await; - gum::warn!( - target: LOG_TARGET, - ?pvf, - "Re-trying failed candidate validation due to AmbiguousWorkerDeath." - ); + gum::warn!( + target: LOG_TARGET, + ?pvf, + "Re-trying failed candidate validation due to possible transient error: {:?}", + validation_result + ); - // Encode the params again when re-trying. We expect the retry case to be relatively - // rare, and we want to avoid unconditionally cloning data. - validation_result = self.validate_candidate(pvf, exec_timeout, params.encode()).await; + // Encode the params again when re-trying. We expect the retry case to be relatively + // rare, and we want to avoid unconditionally cloning data. + validation_result = + self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await; + } } validation_result diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index 4f0ebf037e3a..5d1cc75b7437 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -672,6 +672,62 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() { assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_))); } +// Test that we retry on internal errors. +#[test] +fn candidate_validation_retry_internal_errors() { + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let validation_code = ValidationCode(vec![2; 16]); + + let descriptor = make_valid_candidate_descriptor( + ParaId::from(1_u32), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + dummy_hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let check = perform_basic_checks( + &descriptor, + validation_data.max_pov_size, + &pov, + &validation_code.hash(), + ); + assert!(check.is_ok()); + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + + let pool = TaskExecutor::new(); + let (mut ctx, ctx_handle) = + test_helpers::make_subsystem_context::(pool.clone()); + let metrics = Metrics::default(); + + let v = test_with_executor_params(ctx_handle, || { + validate_candidate_exhaustive( + ctx.sender(), + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InternalError("foo".into())), + // Throw an AWD error, we should still retry again. + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + // Throw another internal error. + Err(ValidationError::InternalError("bar".into())), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + PvfExecTimeoutKind::Backing, + &metrics, + ) + }); + + assert_matches!(v, Err(ValidationFailed(s)) if s == "bar".to_string()); +} + #[test] fn candidate_validation_timeout_is_internal_error() { let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; diff --git a/node/core/pvf/src/execute/mod.rs b/node/core/pvf/src/execute/mod.rs index e863b4e24e54..b0e8cc482561 100644 --- a/node/core/pvf/src/execute/mod.rs +++ b/node/core/pvf/src/execute/mod.rs @@ -23,5 +23,5 @@ mod queue; mod worker; -pub use queue::{start, ToQueue}; +pub use queue::{start, PendingExecutionRequest, ToQueue}; pub use worker::{worker_entrypoint, Response as ExecuteResponse}; diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 3e977cfa4c5d..e1e02205256b 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -50,13 +50,17 @@ slotmap::new_key_type! { struct Worker; } #[derive(Debug)] pub enum ToQueue { - Enqueue { - artifact: ArtifactPathId, - exec_timeout: Duration, - params: Vec, - executor_params: ExecutorParams, - result_tx: ResultSender, - }, + Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest }, +} + +/// An execution request that should execute the PVF (known in the context) and send the results +/// to the given result sender. +#[derive(Debug)] +pub struct PendingExecutionRequest { + pub exec_timeout: Duration, + pub params: Vec, + pub executor_params: ExecutorParams, + pub result_tx: ResultSender, } struct ExecuteJob { @@ -259,7 +263,9 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { } fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { - let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue; + let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue; + let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } = + pending_execution_request; gum::debug!( target: LOG_TARGET, validation_code_hash = ?artifact.id.code_hash, diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index c30ebceae693..f20874083be3 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -349,6 +349,13 @@ fn validate_using_artifact( executor: Arc, cpu_time_start: ProcessTime, ) -> Response { + // Check here if the file exists, because the error from Substrate is not match-able. + // TODO: Re-evaluate after . + let file_metadata = std::fs::metadata(artifact_path); + if let Err(err) = file_metadata { + return Response::format_internal("execute: could not find or open file", &err.to_string()) + } + let descriptor_bytes = match unsafe { // SAFETY: this should be safe since the compiled artifact passed here comes from the // file created by the prepare workers. These files are obtained by calling diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index db2592bdf585..bfc775a32dee 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -23,7 +23,7 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, error::PrepareError, - execute, + execute::{self, PendingExecutionRequest}, metrics::Metrics, prepare, PrepareResult, Priority, PvfPrepData, ValidationError, LOG_TARGET, }; @@ -33,7 +33,6 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use polkadot_parachain::primitives::ValidationResult; -use polkadot_primitives::ExecutorParams; use std::{ collections::HashMap, path::{Path, PathBuf}, @@ -249,36 +248,14 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future, - executor_params: ExecutorParams, - result_tx: ResultSender, -} - /// A mapping from an artifact ID which is in preparation state to the list of pending execution /// requests that should be executed once the artifact's preparation is finished. #[derive(Default)] struct AwaitingPrepare(HashMap>); impl AwaitingPrepare { - fn add( - &mut self, - artifact_id: ArtifactId, - exec_timeout: Duration, - params: Vec, - executor_params: ExecutorParams, - result_tx: ResultSender, - ) { - self.0.entry(artifact_id).or_default().push(PendingExecutionRequest { - exec_timeout, - params, - executor_params, - result_tx, - }); + fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) { + self.0.entry(artifact_id).or_default().push(pending_execution_request); } fn take(&mut self, artifact_id: &ArtifactId) -> Vec { @@ -475,6 +452,8 @@ async fn handle_precheck_pvf( /// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a /// preparation job, we coalesce the two preparation jobs. /// +/// If the prepare job succeeded previously, we will enqueue an execute job right away. +/// /// If the prepare job failed previously, we may retry it under certain conditions. /// /// When preparing for execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`]) @@ -489,32 +468,63 @@ async fn handle_execute_pvf( ) -> Result<(), Fatal> { let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs; let artifact_id = pvf.as_artifact_id(); + let executor_params = (*pvf.executor_params()).clone(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { ArtifactState::Prepared { last_time_needed, .. } => { - *last_time_needed = SystemTime::now(); + let file_metadata = std::fs::metadata(artifact_id.path(cache_path)); + + if file_metadata.is_ok() { + *last_time_needed = SystemTime::now(); + + // This artifact has already been prepared, send it to the execute queue. + send_execute( + execute_queue, + execute::ToQueue::Enqueue { + artifact: ArtifactPathId::new(artifact_id, cache_path), + pending_execution_request: PendingExecutionRequest { + exec_timeout, + params, + executor_params, + result_tx, + }, + }, + ) + .await?; + } else { + gum::warn!( + target: LOG_TARGET, + ?pvf, + ?artifact_id, + "handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file." + ); - // This artifact has already been prepared, send it to the execute queue. - send_execute( - execute_queue, - execute::ToQueue::Enqueue { - artifact: ArtifactPathId::new(artifact_id, cache_path), - exec_timeout, - params, - executor_params: (*pvf.executor_params()).clone(), - result_tx, - }, - ) - .await?; + // The artifact has been prepared previously but the file is missing, prepare it again. + *state = ArtifactState::Preparing { + waiting_for_response: Vec::new(), + num_failures: 0, + }; + enqueue_prepare_for_execute( + prepare_queue, + awaiting_prepare, + pvf, + priority, + artifact_id, + PendingExecutionRequest { + exec_timeout, + params, + executor_params, + result_tx, + }, + ) + .await?; + } }, ArtifactState::Preparing { .. } => { awaiting_prepare.add( artifact_id, - exec_timeout, - params, - (*pvf.executor_params()).clone(), - result_tx, + PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, ); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { @@ -535,19 +545,20 @@ async fn handle_execute_pvf( waiting_for_response: Vec::new(), num_failures: *num_failures, }; - let executor_params = (*pvf.executor_params()).clone(); - send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }) - .await?; - - // Add an execution request that will wait to run after this prepare job has - // finished. - awaiting_prepare.add( + enqueue_prepare_for_execute( + prepare_queue, + awaiting_prepare, + pvf, + priority, artifact_id, - exec_timeout, - params, - executor_params, - result_tx, - ); + PendingExecutionRequest { + exec_timeout, + params, + executor_params, + result_tx, + }, + ) + .await?; } else { let _ = result_tx.send(Err(ValidationError::from(error.clone()))); } @@ -556,12 +567,16 @@ async fn handle_execute_pvf( } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and // PVF. - let executor_params = (*pvf.executor_params()).clone(); artifacts.insert_preparing(artifact_id.clone(), Vec::new()); - send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; - - // Add an execution request that will wait to run after this prepare job has finished. - awaiting_prepare.add(artifact_id, exec_timeout, params, executor_params, result_tx); + enqueue_prepare_for_execute( + prepare_queue, + awaiting_prepare, + pvf, + priority, + artifact_id, + PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + ) + .await?; } Ok(()) @@ -700,10 +715,12 @@ async fn handle_prepare_done( execute_queue, execute::ToQueue::Enqueue { artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), - exec_timeout, - params, - executor_params, - result_tx, + pending_execution_request: PendingExecutionRequest { + exec_timeout, + params, + executor_params, + result_tx, + }, }, ) .await?; @@ -745,6 +762,24 @@ async fn send_execute( execute_queue.send(to_queue).await.map_err(|_| Fatal) } +/// Sends a job to the preparation queue, and adds an execution request that will wait to run after +/// this prepare job has finished. +async fn enqueue_prepare_for_execute( + prepare_queue: &mut mpsc::Sender, + awaiting_prepare: &mut AwaitingPrepare, + pvf: PvfPrepData, + priority: Priority, + artifact_id: ArtifactId, + pending_execution_request: PendingExecutionRequest, +) -> Result<(), Fatal> { + send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; + + // Add an execution request that will wait to run after this prepare job has finished. + awaiting_prepare.add(artifact_id, pending_execution_request); + + Ok(()) +} + async fn handle_cleanup_pulse( cache_path: &Path, sweeper_tx: &mut mpsc::Sender, @@ -1125,11 +1160,11 @@ pub(crate) mod tests { .unwrap(); let result_tx_pvf_1_1 = assert_matches!( test.poll_and_recv_to_execute_queue().await, - execute::ToQueue::Enqueue { result_tx, .. } => result_tx + execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx ); let result_tx_pvf_1_2 = assert_matches!( test.poll_and_recv_to_execute_queue().await, - execute::ToQueue::Enqueue { result_tx, .. } => result_tx + execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx ); test.from_prepare_queue_tx @@ -1141,7 +1176,7 @@ pub(crate) mod tests { .unwrap(); let result_tx_pvf_2 = assert_matches!( test.poll_and_recv_to_execute_queue().await, - execute::ToQueue::Enqueue { result_tx, .. } => result_tx + execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx ); result_tx_pvf_1_1 @@ -1456,7 +1491,7 @@ pub(crate) mod tests { // Preparation should have been retried and succeeded this time. let result_tx_3 = assert_matches!( test.poll_and_recv_to_execute_queue().await, - execute::ToQueue::Enqueue { result_tx, .. } => result_tx + execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx ); // Send an error for the execution here, just so we can check the result receiver is still diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index ecd885ab642e..91c93e35ccd8 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -33,7 +33,7 @@ const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3); struct TestHost { - _cache_dir: tempfile::TempDir, + cache_dir: tempfile::TempDir, host: Mutex, } @@ -52,7 +52,7 @@ impl TestHost { f(&mut config); let (host, task) = start(config, Metrics::default()); let _ = tokio::task::spawn(task); - Self { _cache_dir: cache_dir, host: Mutex::new(host) } + Self { cache_dir, host: Mutex::new(host) } } async fn validate_candidate( @@ -240,3 +240,58 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { max_duration.as_millis() ); } + +// Test that deleting a prepared artifact does not lead to a dispute when we try to execute it. +#[tokio::test] +async fn deleting_prepared_artifact_does_not_dispute() { + let host = TestHost::new(); + let cache_dir = host.cache_dir.path().clone(); + + let result = host + .validate_candidate( + halt::wasm_binary_unwrap(), + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ) + .await; + + match result { + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {}, + r => panic!("{:?}", r), + } + + // Delete the prepared artifact. + { + // Get the artifact path (asserting it exists). + let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); + assert_eq!(cache_dir.len(), 1); + let artifact_path = cache_dir.pop().unwrap().unwrap(); + + // Delete the artifact. + std::fs::remove_file(artifact_path.path()).unwrap(); + } + + // Try to validate again, artifact should get recreated. + let result = host + .validate_candidate( + halt::wasm_binary_unwrap(), + ValidationParams { + block_data: BlockData(Vec::new()), + parent_head: Default::default(), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ) + .await; + + match result { + Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {}, + r => panic!("{:?}", r), + } +} diff --git a/roadmap/implementers-guide/src/node/utility/candidate-validation.md b/roadmap/implementers-guide/src/node/utility/candidate-validation.md index 6e7a5f3d0c8f..bc1d11d8f664 100644 --- a/roadmap/implementers-guide/src/node/utility/candidate-validation.md +++ b/roadmap/implementers-guide/src/node/utility/candidate-validation.md @@ -23,7 +23,7 @@ Upon receiving a validation request, the first thing the candidate validation su * The [`CandidateDescriptor`](../../types/candidate.md#candidatedescriptor). * The [`ValidationData`](../../types/candidate.md#validationdata). * The [`PoV`](../../types/availability.md#proofofvalidity). - + The second category is for PVF pre-checking. This is primarly used by the [PVF pre-checker](pvf-prechecker.md) subsystem. ### Determining Parameters @@ -67,8 +67,20 @@ or time out). We will only retry preparation if another request comes in after resolved. We will retry up to 5 times. If the actual **execution** of the artifact fails, we will retry once if it was -an ambiguous error after a brief delay, to allow any potential transient -conditions to clear. +a possibly transient error, to allow the conditions that led to the error to +hopefully resolve. We use a more brief delay here (1 second as opposed to 15 +minutes for preparation (see above)), because a successful execution must happen +in a short amount of time. + +We currently know of at least two specific cases that will lead to a retried +execution request: + +1. **OOM:** The host might have been temporarily low on memory due to other + processes running on the same machine. **NOTE:** This case will lead to + voting against the candidate (and possibly a dispute) if the retry is still + not successful. +2. **Artifact missing:** The prepared artifact might have been deleted due to + operator error or some bug in the system. We will re-create it on retry. #### Preparation timeouts