Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Accept PVF code hashes in validation host #3655

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2880898
Refactor runtime API requests
slumber Aug 17, 2021
2c6a031
Try to use cached compiled PVF
slumber Aug 17, 2021
da26bf0
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Aug 17, 2021
162be9b
fmt
slumber Aug 17, 2021
427bc1a
Refactor runtime API requests
slumber Aug 19, 2021
7311c27
Introduce PvfPreimage
slumber Aug 19, 2021
094d164
Reliable error handling
slumber Aug 19, 2021
50fcfca
Improve candidate validation readability
slumber Aug 22, 2021
2c39c37
Send correct hash to the PVF host
slumber Aug 22, 2021
8d40599
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Aug 22, 2021
c7cf9f5
Test validation by hash feature
slumber Aug 23, 2021
2d0d047
Remove extra comma
slumber Aug 30, 2021
828af04
Rename
slumber Oct 25, 2021
d2a2dbf
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Oct 25, 2021
8da537a
Rework candidate validation to use new runtime API endpoint
slumber Oct 25, 2021
bec11a3
fmt
slumber Oct 25, 2021
33560ca
Remove extra line
slumber Oct 26, 2021
0a93b6a
Get rid of unreachable
slumber Oct 27, 2021
e58b1ec
Remove mutable result
slumber Oct 27, 2021
0e9b0fe
Log the error
slumber Oct 27, 2021
a11d53a
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Dec 17, 2021
e4edd44
Resolve precheck todo
slumber Jan 18, 2022
9c91729
Host tests
slumber Jan 19, 2022
cb82bd4
Update implementers guide
slumber Jan 19, 2022
fb7e4e7
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Jan 19, 2022
c38eb81
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Jan 21, 2022
2a48e29
Remove unnecessary log message
slumber Feb 14, 2022
24c043c
Improve naming
slumber Feb 14, 2022
c3cf617
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Feb 14, 2022
249fc28
Revert error handling
slumber Feb 14, 2022
47e9e7e
Merge remote-tracking branch 'origin/master' into slumber-use-cached-pvf
slumber Mar 30, 2022
0a847ab
Replace tracing usage
slumber Mar 30, 2022
bccbbc3
Explain force enacting
slumber Apr 1, 2022
3668cfa
Merge branch 'master' into slumber-use-cached-pvf
mrcnski Oct 11, 2022
32680ca
Fix leftover errors from merge
mrcnski Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 155 additions & 147 deletions node/core/candidate-validation/src/lib.rs

Large diffs are not rendered by default.

516 changes: 305 additions & 211 deletions node/core/candidate-validation/src/tests.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum ValidationError {
InvalidCandidate(InvalidCandidate),
/// This error is raised due to inability to serve the request.
InternalError(String),
/// Provided validation code hash is not present in the artifacts cache.
ArtifactNotFound,
}

/// A description of an error raised during executing a PVF and can be attributed to the combination
Expand Down
133 changes: 102 additions & 31 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute,
metrics::Metrics,
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
prepare,
pvf::{PvfDescriptor, PvfPreimage},
PrepareResult, Priority, ValidationError, LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
Expand Down Expand Up @@ -60,7 +62,7 @@ impl ValidationHost {
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn precheck_pvf(
&mut self,
pvf: Pvf,
pvf: PvfPreimage,
result_tx: PrepareResultSender,
) -> Result<(), String> {
self.to_host_tx
Expand All @@ -78,7 +80,7 @@ impl ValidationHost {
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn execute_pvf(
&mut self,
pvf: Pvf,
pvf: PvfDescriptor,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
Expand All @@ -96,7 +98,7 @@ impl ValidationHost {
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn heads_up(&mut self, active_pvfs: Vec<Pvf>) -> Result<(), String> {
pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPreimage>) -> Result<(), String> {
slumber marked this conversation as resolved.
Show resolved Hide resolved
self.to_host_tx
.send(ToHost::HeadsUp { active_pvfs })
.await
Expand All @@ -106,18 +108,18 @@ impl ValidationHost {

enum ToHost {
PrecheckPvf {
pvf: Pvf,
pvf: PvfPreimage,
result_tx: PrepareResultSender,
},
ExecutePvf {
pvf: Pvf,
pvf: PvfDescriptor,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
},
HeadsUp {
active_pvfs: Vec<Pvf>,
active_pvfs: Vec<PvfPreimage>,
},
}

Expand Down Expand Up @@ -421,7 +423,7 @@ async fn handle_to_host(
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
pvf: Pvf,
pvf: PvfPreimage,
result_sender: PrepareResultSender,
) -> Result<(), Fatal> {
let artifact_id = pvf.as_artifact_id();
Expand Down Expand Up @@ -452,7 +454,7 @@ async fn handle_execute_pvf(
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
pvf: Pvf,
pvf: PvfDescriptor,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
Expand Down Expand Up @@ -484,12 +486,16 @@ async fn handle_execute_pvf(
},
}
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
//
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
if let PvfDescriptor::Preimage(code) = pvf {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf: code }).await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
} else {
// Expect another request with PVF code provided.
let _ = result_tx.send(Err(ValidationError::ArtifactNotFound));
}
}

return Ok(())
Expand All @@ -498,7 +504,7 @@ async fn handle_execute_pvf(
async fn handle_heads_up(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
active_pvfs: Vec<Pvf>,
active_pvfs: Vec<PvfPreimage>,
) -> Result<(), Fatal> {
let now = SystemTime::now();

Expand Down Expand Up @@ -701,12 +707,12 @@ mod tests {
}

/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn artifact_id(descriminator: u32) -> ArtifactId {
Pvf::from_discriminator(descriminator).as_artifact_id()
fn artifact_id(discriminator: u32) -> ArtifactId {
PvfPreimage::from_discriminator(discriminator).as_artifact_id()
}

fn artifact_path(descriminator: u32) -> PathBuf {
artifact_id(descriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned()
fn artifact_path(discriminator: u32) -> PathBuf {
artifact_id(discriminator).path(&PathBuf::from(std::env::temp_dir())).to_owned()
}

struct Builder {
Expand Down Expand Up @@ -857,6 +863,11 @@ mod tests {
}
}

/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn pvf(discriminator: u32) -> PvfPreimage {
PvfPreimage::from_discriminator(discriminator)
}

#[async_std::test]
async fn shutdown_on_handle_drop() {
let test = Builder::default().build();
Expand All @@ -881,7 +892,7 @@ mod tests {
let mut test = builder.build();
let mut host = test.host_handle();

host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
host.heads_up(vec![pvf(1)]).await.unwrap();

let to_sweeper_rx = &mut test.to_sweeper_rx;
run_until(
Expand All @@ -895,7 +906,7 @@ mod tests {

// Extend TTL for the first artifact and make sure we don't receive another file removal
// request.
host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
host.heads_up(vec![pvf(1)]).await.unwrap();
test.poll_ensure_to_sweeper_is_empty().await;
}

Expand All @@ -906,7 +917,7 @@ mod tests {

let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Normal,
Expand All @@ -917,7 +928,7 @@ mod tests {

let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Critical,
Expand All @@ -928,7 +939,7 @@ mod tests {

let (result_tx, result_rx_pvf_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(2),
PvfDescriptor::Preimage(PvfPreimage::from_discriminator(2)),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Normal,
Expand Down Expand Up @@ -1000,7 +1011,7 @@ mod tests {

// First, test a simple precheck request.
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
host.precheck_pvf(PvfPreimage::from_discriminator(1), result_tx).await.unwrap();

// The queue received the prepare request.
assert_matches!(
Expand All @@ -1021,7 +1032,7 @@ mod tests {
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
host.precheck_pvf(PvfPreimage::from_discriminator(2), result_tx).await.unwrap();
precheck_receivers.push(result_rx);
}
// Received prepare request.
Expand Down Expand Up @@ -1056,7 +1067,7 @@ mod tests {
// Send PVF for the execution and request the prechecking for it.
let (result_tx, result_rx_execute) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Critical,
Expand All @@ -1071,7 +1082,7 @@ mod tests {
);

let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
host.precheck_pvf(PvfPreimage::from_discriminator(1), result_tx).await.unwrap();

// Suppose the preparation failed, the execution queue is empty and both
// "clients" receive their results.
Expand All @@ -1093,13 +1104,13 @@ mod tests {
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
host.precheck_pvf(PvfPreimage::from_discriminator(2), result_tx).await.unwrap();
precheck_receivers.push(result_rx);
}

let (result_tx, _result_rx_execute) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(2),
PvfDescriptor::Preimage(PvfPreimage::from_discriminator(2)),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Critical,
Expand Down Expand Up @@ -1134,7 +1145,7 @@ mod tests {

let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1)),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Normal,
Expand All @@ -1157,4 +1168,64 @@ mod tests {

test.poll_ensure_to_execute_queue_is_empty().await;
}

#[async_std::test]
async fn artifact_cache() {
let mut test = Builder::default().build();
let mut host = test.host_handle();

let pvf_code = PvfDescriptor::Preimage(PvfPreimage::from_discriminator(1));
let pvf_hash = PvfDescriptor::Hash(pvf_code.hash());

// First, ensure that we receive an `ArtifactNotFound` error when sending
// an unknown code hash.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
pvf_hash.clone(),
TEST_EXECUTION_TIMEOUT,
Vec::new(),
Priority::Normal,
result_tx,
)
.await
.unwrap();

test.poll_ensure_to_execute_queue_is_empty().await;
assert_matches!(
result_rx.now_or_never().unwrap().unwrap(),
Err(ValidationError::ArtifactNotFound)
);

// Supply the code and retry the request.
let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(
pvf_code, // Code.
TEST_EXECUTION_TIMEOUT,
Vec::new(),
Priority::Normal,
result_tx,
)
.await
.unwrap();

assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
prepare::ToQueue::Enqueue { .. }
);

let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
pvf_hash, // Hash.
TEST_EXECUTION_TIMEOUT,
Vec::new(),
Priority::Normal,
result_tx,
)
.await
.unwrap();

test.poll_ensure_to_execute_queue_is_empty().await;
// The execution is queued, no error expected.
assert_matches!(result_rx.now_or_never(), None);
}
}
2 changes: 1 addition & 1 deletion node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub use sp_tracing;

pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use priority::Priority;
pub use pvf::Pvf;
pub use pvf::{PvfDescriptor, PvfPreimage};

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
Expand Down
18 changes: 12 additions & 6 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
//! A queue that handles requests for PVF preparation.

use super::pool::{self, Worker};
use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET};
use crate::{
artifacts::ArtifactId, metrics::Metrics, pvf::PvfPreimage, PrepareResult, Priority, LOG_TARGET,
};
use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
Expand All @@ -30,7 +32,7 @@ pub enum ToQueue {
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
Enqueue { priority: Priority, pvf: PvfPreimage },
}

/// A response from queue.
Expand Down Expand Up @@ -75,7 +77,7 @@ slotmap::new_key_type! { pub struct Job; }
struct JobData {
/// The priority of this job. Can be bumped.
priority: Priority,
pvf: Pvf,
pvf: PvfPreimage,
worker: Option<Worker>,
}

Expand Down Expand Up @@ -210,7 +212,11 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
Ok(())
}

async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> {
async fn handle_enqueue(
queue: &mut Queue,
priority: Priority,
pvf: PvfPreimage,
) -> Result<(), Fatal> {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash,
Expand Down Expand Up @@ -485,8 +491,8 @@ mod tests {
use std::task::Poll;

/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn pvf(descriminator: u32) -> Pvf {
Pvf::from_discriminator(descriminator)
fn pvf(discriminator: u32) -> PvfPreimage {
PvfPreimage::from_discriminator(discriminator)
}

async fn run_until<R>(
Expand Down
Loading