Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
PVF: Don't dispute on missing artifact (#7011)
Browse files Browse the repository at this point in the history
* PVF: Don't dispute on missing artifact

A dispute should never be raised if the local cache doesn't provide a certain
artifact. You can not dispute based on this reason, as it is a local hardware
issue and not related to the candidate to check.

Design:

Currently we assume that if we prepared an artifact, it remains there on-disk
until we prune it, i.e. we never check again if it's still there.

We can change it so that instead of artifact-not-found triggering a dispute, we
retry once (like we do for AmbiguousWorkerDeath, except we don't dispute if it
still doesn't work). And when enqueuing an execute job, we check for the
artifact on-disk, and start preparation if not found.

Changes:

- [x] Integration test (should fail without the following changes)
- [x] Check if artifact exists when executing, prepare if not
- [x] Return an internal error when file is missing
- [x] Retry once on internal errors
- [x] Document design (update impl guide)

* Add some context to wasm error message (it is quite long)

* Fix impl guide

* Add check for missing/inaccessible file

* Add comment referencing Substrate issue

* Add test for retrying internal errors

---------

Co-authored-by: parity-processbot <>
  • Loading branch information
mrcnski authored Apr 20, 2023
1 parent a5e253d commit 67278e0
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 99 deletions.
50 changes: 33 additions & 17 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,38 +691,54 @@ trait ValidationBackend {

/// Tries executing a PVF. Will retry once if an error is encountered that may have been
/// transient.
///
/// NOTE: Should retry only on errors that are a result of execution itself, and not of
/// preparation.
async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
exec_timeout: Duration,
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient);
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout);

let mut validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient. Note that
// this error is only a result of execution itself and not of preparation.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
// Allow limited retries for each kind of error.
let mut num_internal_retries_left = 1;
let mut num_awd_retries_left = 1;
loop {
match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
}

// If we got a possibly transient error, retry once after a brief delay, on the assumption
// that the conditions that caused this error may have resolved on their own.
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;

gum::warn!(
target: LOG_TARGET,
?pvf,
"Re-trying failed candidate validation due to AmbiguousWorkerDeath."
);
gum::warn!(
target: LOG_TARGET,
?pvf,
"Re-trying failed candidate validation due to possible transient error: {:?}",
validation_result
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result = self.validate_candidate(pvf, exec_timeout, params.encode()).await;
// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
}
}

validation_result
Expand Down
56 changes: 56 additions & 0 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,62 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_)));
}

// Test that we retry on internal errors.
#[test]
fn candidate_validation_retry_internal_errors() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };

let pov = PoV { block_data: BlockData(vec![1; 32]) };
let validation_code = ValidationCode(vec![2; 16]);

let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
dummy_hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);

let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());

let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };

let pool = TaskExecutor::new();
let (mut ctx, ctx_handle) =
test_helpers::make_subsystem_context::<AllMessages, _>(pool.clone());
let metrics = Metrics::default();

let v = test_with_executor_params(ctx_handle, || {
validate_candidate_exhaustive(
ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InternalError("foo".into())),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
// Throw another internal error.
Err(ValidationError::InternalError("bar".into())),
]),
validation_data,
validation_code,
candidate_receipt,
Arc::new(pov),
PvfExecTimeoutKind::Backing,
&metrics,
)
});

assert_matches!(v, Err(ValidationFailed(s)) if s == "bar".to_string());
}

#[test]
fn candidate_validation_timeout_is_internal_error() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
mod queue;
mod worker;

pub use queue::{start, ToQueue};
pub use queue::{start, PendingExecutionRequest, ToQueue};
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
22 changes: 14 additions & 8 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ slotmap::new_key_type! { struct Worker; }

#[derive(Debug)]
pub enum ToQueue {
Enqueue {
artifact: ArtifactPathId,
exec_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
},
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}

/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
pub struct PendingExecutionRequest {
pub exec_timeout: Duration,
pub params: Vec<u8>,
pub executor_params: ExecutorParams,
pub result_tx: ResultSender,
}

struct ExecuteJob {
Expand Down Expand Up @@ -259,7 +263,9 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}

fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue;
let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue;
let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } =
pending_execution_request;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
Expand Down
7 changes: 7 additions & 0 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ fn validate_using_artifact(
executor: Arc<Executor>,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
// TODO: Re-evaluate after <https://github.com/paritytech/substrate/issues/13860>.
let file_metadata = std::fs::metadata(artifact_path);
if let Err(err) = file_metadata {
return Response::format_internal("execute: could not find or open file", &err.to_string())
}

let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
Expand Down
Loading

0 comments on commit 67278e0

Please sign in to comment.