Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF: Vote invalid on panics in execution thread (after a retry) #7155

Merged
merged 29 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
866a690
PVF: Remove `rayon` and some uses of `tokio`
mrcnski May 1, 2023
aff4779
PVF: Vote invalid on panics in execution thread (after a retry)
mrcnski May 1, 2023
fc481bc
Merge branch 'mrcnski/pvf-remove-rayon-tokio' into mrcnski/pvf-catch-…
mrcnski May 1, 2023
b03e1b0
Address a couple of TODOs
mrcnski May 1, 2023
0fcedbe
Add some documentation to implementer's guide
mrcnski May 1, 2023
c7e44d1
Fix compile error
mrcnski May 1, 2023
dc0e9c9
Fix compile errors
mrcnski May 1, 2023
67994f5
Fix compile error
mrcnski May 1, 2023
181e89f
Update roadmap/implementers-guide/src/node/utility/candidate-validati…
mrcnski May 2, 2023
df22727
Address comments + couple other changes (see message)
mrcnski May 2, 2023
5223995
Implement proper thread synchronization
mrcnski May 2, 2023
d4eb740
Catch panics in threads so we always notify condvar
mrcnski May 2, 2023
850d2c0
Use `WaitOutcome` enum instead of bool condition variable
mrcnski May 2, 2023
5885222
Merge branch 'mrcnski/pvf-remove-rayon-tokio' into mrcnski/pvf-catch-…
mrcnski May 2, 2023
4f8f1cc
Fix retry timeouts to depend on exec timeout kind
mrcnski May 2, 2023
87437b8
Merge remote-tracking branch 'origin/mrcnski/pvf-catch-panics-in-exec…
mrcnski May 2, 2023
f4c0b4b
Address review comments
mrcnski May 2, 2023
883952c
Make the API for condvars in workers nicer
mrcnski May 2, 2023
cc89ab8
Add a doc
mrcnski May 3, 2023
1967ddb
Use condvar for memory stats thread
mrcnski May 3, 2023
6e7a13c
Small refactor
mrcnski May 3, 2023
566a438
Merge branch 'mrcnski/pvf-remove-rayon-tokio' into mrcnski/pvf-catch-…
mrcnski May 3, 2023
660dadd
Enumerate internal validation errors in an enum
mrcnski May 3, 2023
187528c
Fix comment
mrcnski May 3, 2023
6d7cbf0
Add a log
mrcnski May 3, 2023
a2d0c72
Fix test
mrcnski May 3, 2023
6c54e42
Update variant naming
mrcnski May 4, 2023
7da8cdf
Merge branch 'master' into mrcnski/pvf-catch-panics-in-execution
mrcnski May 16, 2023
da7d191
Address a missed TODO
mrcnski May 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfPrepData,
ValidationError, ValidationHost,
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats,
PvfPrepData, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand All @@ -51,7 +51,11 @@ use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*};

use std::{path::PathBuf, sync::Arc, time::Duration};
use std::{
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};

use async_trait::async_trait;

Expand All @@ -63,11 +67,19 @@ mod tests;

const LOG_TARGET: &'static str = "parachain::candidate-validation";

/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error.
/// The amount of time to wait before retrying after a retry-able backing validation error. We use a lower value for the
/// backing case, to fit within the lower backing timeout.
#[cfg(not(test))]
const PVF_BACKING_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(500);
#[cfg(test)]
const PVF_BACKING_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
/// The amount of time to wait before retrying after a retry-able approval validation error. We use a higher value for
/// the approval case since we have more time, and if we wait longer it is more likely that transient conditions will
/// resolve.
#[cfg(not(test))]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

// Default PVF timeouts. Must never be changed! Use executor environment parameters in
// `session_info` pallet to adjust them. See also `PvfTimeoutKind` docs.
Expand Down Expand Up @@ -617,6 +629,7 @@ where
.validate_candidate_with_retry(
raw_validation_code.to_vec(),
pvf_exec_timeout(&executor_params, exec_timeout_kind),
exec_timeout_kind,
params,
executor_params,
)
Expand All @@ -627,7 +640,7 @@ where
}

match result {
Err(ValidationError::InternalError(e)) => Err(ValidationFailed(e)),
Err(ValidationError::InternalError(e)) => Err(ValidationFailed(e.to_string())),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) =>
Expand All @@ -636,6 +649,8 @@ where
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => {
// In principle if preparation of the `WASM` fails, the current candidate can not be the
// reason for that. So we can't say whether it is invalid or not. In addition, with
Expand Down Expand Up @@ -698,24 +713,44 @@ trait ValidationBackend {
&mut self,
raw_validation_code: Vec<u8>,
exec_timeout: Duration,
exec_timeout_kind: PvfExecTimeoutKind,
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<WasmValidationResult, ValidationError> {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient);
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout);
// We keep track of the total time that has passed and stop retrying if we are taking too long.
let total_time_start = Instant::now();

let mut validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
if validation_result.is_ok() {
return validation_result
}

let retry_delay = match exec_timeout_kind {
PvfExecTimeoutKind::Backing => PVF_BACKING_EXECUTION_RETRY_DELAY,
PvfExecTimeoutKind::Approval => PVF_APPROVAL_EXECUTION_RETRY_DELAY,
};

// Allow limited retries for each kind of error.
let mut num_internal_retries_left = 1;
let mut num_awd_retries_left = 1;
let mut num_panic_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
break
}

match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_)))
if num_panic_retries_left > 0 =>
num_panic_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
Expand All @@ -725,19 +760,22 @@ trait ValidationBackend {
// that the conditions that caused this error may have resolved on their own.
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
futures_timer::Delay::new(retry_delay).await;

let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());

gum::warn!(
target: LOG_TARGET,
?pvf,
?new_timeout,
"Re-trying failed candidate validation due to possible transient error: {:?}",
validation_result
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
self.validate_candidate(pvf.clone(), new_timeout, params.encode()).await;
}
}

Expand All @@ -760,14 +798,18 @@ impl ValidationBackend for ValidationHost {

let (tx, rx) = oneshot::channel();
if let Err(err) = self.execute_pvf(pvf, exec_timeout, encoded_params, priority, tx).await {
return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}",
return Err(InternalValidationError::HostCommunication(format!(
"cannot send pvf to the validation host, it might have shut down: {:?}",
err
)))
))
.into())
}

rx.await
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
rx.await.map_err(|_| {
ValidationError::from(InternalValidationError::HostCommunication(
"validation was cancelled".into(),
))
})?
}

async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
Expand Down
61 changes: 59 additions & 2 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ fn candidate_validation_bad_return_is_invalid() {
assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)));
}

// Test that we vote valid if we get `AmbiguousWorkerDeath`, retry, and then succeed.
#[test]
fn candidate_validation_one_ambiguous_error_is_valid() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
Expand Down Expand Up @@ -710,11 +711,11 @@ fn candidate_validation_retry_internal_errors() {
validate_candidate_exhaustive(
ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InternalError("foo".into())),
Err(InternalValidationError::HostCommunication("foo".into()).into()),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
// Throw another internal error.
Err(ValidationError::InternalError("bar".into())),
Err(InternalValidationError::HostCommunication("bar".into()).into()),
]),
validation_data,
validation_code,
Expand All @@ -728,6 +729,62 @@ fn candidate_validation_retry_internal_errors() {
assert_matches!(v, Err(ValidationFailed(s)) if s == "bar".to_string());
}

// Test that we retry on panic errors.
#[test]
fn candidate_validation_retry_panic_errors() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };

let pov = PoV { block_data: BlockData(vec![1; 32]) };
let validation_code = ValidationCode(vec![2; 16]);

let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
dummy_hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);

let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());

let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };

let pool = TaskExecutor::new();
let (mut ctx, ctx_handle) =
test_helpers::make_subsystem_context::<AllMessages, _>(pool.clone());
let metrics = Metrics::default();

let v = test_with_executor_params(ctx_handle, || {
validate_candidate_exhaustive(
ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
// Throw another panic error.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))),
]),
validation_data,
validation_code,
candidate_receipt,
Arc::new(pov),
PvfExecTimeoutKind::Backing,
&metrics,
)
});

assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(s))) if s == "bar".to_string());
}

#[test]
fn candidate_validation_timeout_is_internal_error() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
Expand Down
54 changes: 49 additions & 5 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ impl fmt::Display for PrepareError {
#[derive(Debug, Clone)]
pub enum ValidationError {
/// The error was raised because the candidate is invalid.
///
/// Whenever we are unsure if the error was due to the candidate or not, we must vote invalid.
InvalidCandidate(InvalidCandidate),
/// This error is raised due to inability to serve the request.
InternalError(String),
/// Some internal error occurred.
InternalError(InternalValidationError),
}

/// A description of an error raised during executing a PVF and can be attributed to the combination
Expand All @@ -103,7 +105,7 @@ pub enum InvalidCandidate {
/// an `rlimit` (if set) or, again, invited OOM killer. Another possibility is a bug in
/// wasmtime allowed the PVF to gain control over the execution worker.
///
/// We attribute such an event to an invalid candidate in either case.
/// We attribute such an event to an *invalid candidate* in either case.
///
/// The rationale for this is that a glitch may lead to unfair rejecting candidate by a single
/// validator. If the glitch is somewhat more persistent the validator will reject all candidate
Expand All @@ -113,16 +115,58 @@ pub enum InvalidCandidate {
AmbiguousWorkerDeath,
/// PVF execution (compilation is not included) took more time than was allotted.
HardTimeout,
/// A panic occurred and we can't be sure whether the candidate is really invalid or some internal glitch occurred.
/// Whenever we are unsure, we can never treat an error as internal as we would abstain from voting. This is bad
/// because if the issue was due to the candidate, then all validators would abstain, stalling finality on the
/// chain. So we will first retry the candidate, and if the issue persists we are forced to vote invalid.
Panic(String),
}

/// Some internal error occurred.
///
/// Should only ever be used for validation errors independent of the candidate and PVF, or for errors we ruled out
/// during pre-checking (so preparation errors are fine).
#[derive(Debug, Clone, Encode, Decode)]
pub enum InternalValidationError {
/// Some communication error occurred with the host.
HostCommunication(String),
/// Could not find or open compiled artifact file.
CouldNotOpenFile(String),
/// An error occurred in the CPU time monitor thread. Should be totally unrelated to validation.
CpuTimeMonitorThread(String),
/// Some non-deterministic preparation error occurred.
PrepareError(PrepareError),
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
}

impl fmt::Display for InternalValidationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use InternalValidationError::*;
match self {
HostCommunication(err) =>
write!(f, "validation: some communication error occurred with the host: {}", err),
CouldNotOpenFile(err) =>
write!(f, "validation: could not find or open compiled artifact file: {}", err),
CpuTimeMonitorThread(err) =>
write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
PrepareError(err) => write!(f, "validation: prepare: {}", err),
}
}
}

impl From<InternalValidationError> for ValidationError {
fn from(error: InternalValidationError) -> Self {
Self::InternalError(error)
}
}

impl From<PrepareError> for ValidationError {
fn from(error: PrepareError) -> Self {
// Here we need to classify the errors into two errors: deterministic and non-deterministic.
// See [`PrepareError::is_deterministic`].
if error.is_deterministic() {
ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(error.to_string()))
Self::InvalidCandidate(InvalidCandidate::PrepareError(error.to_string()))
} else {
ValidationError::InternalError(error.to_string())
Self::InternalError(InternalValidationError::PrepareError(error))
}
}
}
8 changes: 5 additions & 3 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ fn handle_job_finish(
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(err))),
None,
),
Outcome::InternalError { err, idle_worker } =>
(Some(idle_worker), Err(ValidationError::InternalError(err)), None),
Outcome::InternalError { err } => (None, Err(ValidationError::InternalError(err)), None),
Outcome::HardTimeout =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), None),
// "Maybe invalid" errors (will retry).
Outcome::IoErr => (
None,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)),
None,
),
Outcome::Panic { err } =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::Panic(err))), None),
};

queue.metrics.execute_finished();
Expand All @@ -356,7 +358,7 @@ fn handle_job_finish(
err
);
} else {
gum::debug!(
gum::trace!(
target: LOG_TARGET,
?artifact_id,
?worker,
Expand Down
Loading