Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Move PVF timeouts to executor environment parameters #6823

Merged
merged 12 commits into from
Mar 8, 2023
8 changes: 4 additions & 4 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use polkadot_node_primitives::{
approval::{
BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote,
},
ValidationResult, APPROVAL_EXECUTION_TIMEOUT,
ValidationResult,
};
use polkadot_node_subsystem::{
errors::RecoveryError,
Expand All @@ -50,8 +50,8 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
ApprovalVote, BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement,
GroupIndex, Hash, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId,
ValidatorIndex, ValidatorPair, ValidatorSignature,
GroupIndex, Hash, PvfExecTimeoutKind, SessionIndex, SessionInfo, ValidDisputeStatementKind,
ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
};
use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
Expand Down Expand Up @@ -2399,7 +2399,7 @@ async fn launch_approval<Context>(
validation_code,
candidate.clone(),
available_data.pov,
APPROVAL_EXECUTION_TIMEOUT,
PvfExecTimeoutKind::Approval,
val_tx,
))
.await;
Expand Down
2 changes: 1 addition & 1 deletion node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2427,7 +2427,7 @@ pub async fn handle_double_assignment_import(
},
AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx),
) if timeout == APPROVAL_EXECUTION_TIMEOUT => {
) if timeout == PvfExecTimeoutKind::Approval => {
tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
},
Expand Down
7 changes: 3 additions & 4 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use futures::{
use error::{Error, FatalResult};
use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedFullStatement, Statement, ValidationResult,
BACKING_EXECUTION_TIMEOUT,
};
use polkadot_node_subsystem::{
jaeger,
Expand All @@ -50,8 +49,8 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId,
CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SigningContext,
ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, PvfExecTimeoutKind,
SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
};
use sp_keystore::SyncCryptoStorePtr;
use statement_table::{
Expand Down Expand Up @@ -650,7 +649,7 @@ async fn request_candidate_validation(
.send_message(CandidateValidationMessage::ValidateFromChainState(
candidate_receipt,
pov,
BACKING_EXECUTION_TIMEOUT,
PvfExecTimeoutKind::Backing,
tx,
))
.await;
Expand Down
20 changes: 10 additions & 10 deletions node/core/backing/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_primitives::{
CandidateDescriptor, CollatorId, GroupRotationInfo, HeadData, PersistedValidationData,
ScheduledCore,
PvfExecTimeoutKind, ScheduledCore,
};
use sp_application_crypto::AppKey;
use sp_keyring::Sr25519Keyring;
Expand Down Expand Up @@ -307,7 +307,7 @@ fn backing_second_works() {
timeout,
tx,
)
) if pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate.commitments.hash() == candidate_receipt.commitments_hash => {
) if pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate.commitments.hash() == candidate_receipt.commitments_hash => {
tx.send(Ok(
ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(),
Expand Down Expand Up @@ -453,7 +453,7 @@ fn backing_works() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate_a_commitments_hash=> {
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate_a_commitments_hash=> {
tx.send(Ok(
ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(),
Expand Down Expand Up @@ -625,7 +625,7 @@ fn backing_works_while_validation_ongoing() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate_a_commitments_hash == c.commitments_hash => {
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate_a_commitments_hash == c.commitments_hash => {
// we never validate the candidate. our local node
// shouldn't issue any statements.
std::mem::forget(tx);
Expand Down Expand Up @@ -777,7 +777,7 @@ fn backing_misbehavior_works() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate_a_commitments_hash == c.commitments_hash => {
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate_a_commitments_hash == c.commitments_hash => {
tx.send(Ok(
ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(),
Expand Down Expand Up @@ -921,7 +921,7 @@ fn backing_dont_second_invalid() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => {
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing => {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap();
}
);
Expand Down Expand Up @@ -950,7 +950,7 @@ fn backing_dont_second_invalid() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate_b.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => {
) if pov == pov && c.descriptor() == candidate_b.descriptor() && timeout == PvfExecTimeoutKind::Backing => {
tx.send(Ok(
ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(),
Expand Down Expand Up @@ -1065,7 +1065,7 @@ fn backing_second_after_first_fails_works() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() => {
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() => {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap();
}
);
Expand Down Expand Up @@ -1191,7 +1191,7 @@ fn backing_works_after_failed_validation() {
timeout,
tx,
)
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() => {
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() => {
tx.send(Err(ValidationFailed("Internal test error".into()))).unwrap();
}
);
Expand Down Expand Up @@ -1544,7 +1544,7 @@ fn retry_works() {
timeout,
_tx,
)
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash()
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash()
);
virtual_overseer
});
Expand Down
85 changes: 53 additions & 32 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfWithExecutorParams,
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfPrepData,
ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
Expand All @@ -43,7 +43,8 @@ use polkadot_node_subsystem_util::executor_params_at_relay_parent;
use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult};
use polkadot_primitives::{
vstaging::ExecutorParams, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash,
OccupiedCoreAssumption, PersistedValidationData, ValidationCode, ValidationCodeHash,
OccupiedCoreAssumption, PersistedValidationData, PvfExecTimeoutKind, PvfPrepTimeoutKind,
ValidationCode, ValidationCodeHash,
};

use parity_scale_codec::Encode;
Expand All @@ -68,6 +69,13 @@ const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

/// Default PVF timeouts. Must never be changed! Use executor environment parameters in
/// `session_info` pallet to adjust them. See also `PvfTimeoutKind` docs.
const DEFAULT_PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60);
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved
const DEFAULT_LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360);
const DEFAULT_BACKING_EXECUTION_TIMEOUT: Duration = Duration::from_secs(2);
const DEFAULT_APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12);

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
Expand Down Expand Up @@ -330,18 +338,20 @@ where
return PreCheckOutcome::Invalid
};

let pvf_with_params = match sp_maybe_compressed_blob::decompress(
let timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Precheck);

let pvf = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfWithExecutorParams::from_code(code.into_owned(), executor_params),
Ok(code) => PvfPrepData::from_code(code.into_owned(), executor_params, timeout),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid
},
};

match validation_backend.precheck_pvf(pvf_with_params).await {
match validation_backend.precheck_pvf(pvf).await {
Ok(_) => PreCheckOutcome::Valid,
Err(prepare_err) =>
if prepare_err.is_deterministic() {
Expand Down Expand Up @@ -465,7 +475,7 @@ async fn validate_from_chain_state<Sender>(
validation_host: ValidationHost,
candidate_receipt: CandidateReceipt,
pov: Arc<PoV>,
timeout: Duration,
exec_timeout_kind: PvfExecTimeoutKind,
metrics: &Metrics,
) -> Result<ValidationResult, ValidationFailed>
where
Expand All @@ -485,7 +495,7 @@ where
validation_code,
candidate_receipt.clone(),
pov,
timeout,
exec_timeout_kind,
metrics,
)
.await;
Expand Down Expand Up @@ -521,7 +531,7 @@ async fn validate_candidate_exhaustive<Sender>(
validation_code: ValidationCode,
candidate_receipt: CandidateReceipt,
pov: Arc<PoV>,
timeout: Duration,
exec_timeout_kind: PvfExecTimeoutKind,
metrics: &Metrics,
) -> Result<ValidationResult, ValidationFailed>
where
Expand Down Expand Up @@ -606,7 +616,7 @@ where
let result = validation_backend
.validate_candidate_with_retry(
raw_validation_code.to_vec(),
timeout,
pvf_exec_timeout(&executor_params, exec_timeout_kind),
params,
executor_params,
)
Expand Down Expand Up @@ -667,8 +677,8 @@ trait ValidationBackend {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
pvf_with_params: PvfWithExecutorParams,
timeout: Duration,
pvf: PvfPrepData,
exec_timeout: Duration,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError>;

Expand All @@ -677,16 +687,16 @@ trait ValidationBackend {
async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
timeout: Duration,
exec_timeout: Duration,
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf_with_params =
PvfWithExecutorParams::from_code(raw_validation_code, executor_params);
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient);
let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout);

let mut validation_result =
self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await;
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient. Note that
Expand All @@ -699,40 +709,34 @@ trait ValidationBackend {

gum::warn!(
target: LOG_TARGET,
?pvf_with_params,
?pvf,
"Re-trying failed candidate validation due to AmbiguousWorkerDeath."
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result =
self.validate_candidate(pvf_with_params, timeout, params.encode()).await;
validation_result = self.validate_candidate(pvf, exec_timeout, params.encode()).await;
}

validation_result
}

async fn precheck_pvf(
&mut self,
pvf_with_params: PvfWithExecutorParams,
) -> Result<PrepareStats, PrepareError>;
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError>;
}

#[async_trait]
impl ValidationBackend for ValidationHost {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
pvf_with_params: PvfWithExecutorParams,
timeout: Duration,
pvf: PvfPrepData,
exec_timeout: Duration,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
let priority = polkadot_node_core_pvf::Priority::Normal;

let (tx, rx) = oneshot::channel();
if let Err(err) =
self.execute_pvf(pvf_with_params, timeout, encoded_params, priority, tx).await
{
if let Err(err) = self.execute_pvf(pvf, exec_timeout, encoded_params, priority, tx).await {
return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}",
err
Expand All @@ -743,12 +747,9 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(
&mut self,
pvf_with_params: PvfWithExecutorParams,
) -> Result<PrepareStats, PrepareError> {
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf_with_params, tx).await {
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr(err))
}
Expand Down Expand Up @@ -788,3 +789,23 @@ fn perform_basic_checks(

Ok(())
}

fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepTimeoutKind) -> Duration {
if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
return timeout
}
match kind {
PvfPrepTimeoutKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
PvfPrepTimeoutKind::Lenient => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
}
}

fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: PvfExecTimeoutKind) -> Duration {
if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
return timeout
}
match kind {
PvfExecTimeoutKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
PvfExecTimeoutKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
}
}
Loading