Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use PVF code paired with executor params wherever possible #6742

Merged
merged 2 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf,
PvfWithExecutorParams, ValidationError, ValidationHost,
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfWithExecutorParams,
ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -334,7 +334,7 @@ where
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
Ok(code) => PvfWithExecutorParams::new(Pvf::from_code(code.into_owned()), executor_params),
Ok(code) => PvfWithExecutorParams::from_code(code.into_owned(), executor_params),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid
Expand Down Expand Up @@ -683,7 +683,7 @@ trait ValidationBackend {
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf_with_params =
PvfWithExecutorParams::new(Pvf::from_code(raw_validation_code), executor_params);
PvfWithExecutorParams::from_code(raw_validation_code, executor_params);

let mut validation_result =
self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await;
Expand Down
6 changes: 3 additions & 3 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error
/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
pub fn prepare(
blob: RuntimeBlob,
executor_params: ExecutorParams,
executor_params: &ExecutorParams,
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
let semantics = params_to_wasmtime_semantics(executor_params)
.map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}

fn params_to_wasmtime_semantics(par: ExecutorParams) -> Result<Semantics, String> {
fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, String> {
let mut sem = DEFAULT_CONFIG.semantics.clone();
let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
stack_limit
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Executor {
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;

let mut config = DEFAULT_CONFIG.clone();
config.semantics = params_to_wasmtime_semantics(params)?;
config.semantics = params_to_wasmtime_semantics(&params)?;

Ok(Self { thread_pool, spawner, config })
}
Expand Down
8 changes: 4 additions & 4 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ async fn handle_execute_pvf(
artifact: ArtifactPathId::new(artifact_id, cache_path),
execution_timeout,
params,
executor_params: pvf_with_params.executor_params(),
executor_params: (*pvf_with_params.executor_params()).clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is executor_params an Arc? I played around with removing it and it seems to work fine, and not having an Arc simplified a lot of these clones etc. Or am I missing something? 🙂

https://github.com/paritytech/polkadot/compare/s0me0ne/pvf-with-executor-params...mrcnski/no-arc?expand=1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two considerations:

  1. I think you remember why that Arc appeared in the first place: because we want the whole structure to be easily clonable;
  2. There is already a method called code() (inherited from older Pvf struct) which returns Arc, and having two getters returning different types of references seems like an inconsistent design to me.

Probably you're right, and things shouldn't be overcomplicated here. I just need to either persuade myself it's okay or use some name like executor_params_as_ref().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you remember why that Arc appeared in the first place: because we want the whole structure to be easily clonable

Oh right, I remember that now. It's even there in the docstring.

result_tx,
},
)
Expand All @@ -534,7 +534,7 @@ async fn handle_execute_pvf(
artifact_id,
execution_timeout,
params,
pvf_with_params.executor_params(),
(*pvf_with_params.executor_params()).clone(),
result_tx,
);
},
Expand All @@ -556,7 +556,7 @@ async fn handle_execute_pvf(
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
let executor_params = pvf_with_params.executor_params().clone();
let executor_params = (*pvf_with_params.executor_params()).clone();
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
Expand Down Expand Up @@ -584,7 +584,7 @@ async fn handle_execute_pvf(
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
// PVF.
let executor_params = pvf_with_params.executor_params();
let executor_params = (*pvf_with_params.executor_params()).clone();
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(
prepare_queue,
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub use sp_tracing;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority;
pub use pvf::{Pvf, PvfWithExecutorParams};
pub use pvf::PvfWithExecutorParams;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
Expand Down
17 changes: 6 additions & 11 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ use super::worker::{self, Outcome};
use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
pvf::PvfWithExecutorParams,
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
use always_assert::never;
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
use polkadot_primitives::vstaging::ExecutorParams;
use slotmap::HopSlotMap;
use std::{
fmt,
path::{Path, PathBuf},
sync::Arc,
task::Poll,
time::Duration,
};
Expand Down Expand Up @@ -68,9 +67,8 @@ pub enum ToPool {
/// sent until either `Concluded` or `Rip` message is received.
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
artifact_path: PathBuf,
executor_params: ExecutorParams,
preparation_timeout: Duration,
},
}
Expand Down Expand Up @@ -216,7 +214,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, executor_params, preparation_timeout } => {
ToPool::StartWork { worker, pvf_with_params, artifact_path, preparation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -225,10 +223,9 @@ fn handle_to_pool(
metrics.clone(),
worker,
idle,
code,
pvf_with_params,
cache_path.to_owned(),
artifact_path,
executor_params,
preparation_timeout,
preparation_timer,
)
Expand Down Expand Up @@ -275,20 +272,18 @@ async fn start_work_task<Timer>(
metrics: Metrics,
worker: Worker,
idle: IdleWorker,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
cache_path: PathBuf,
artifact_path: PathBuf,
executor_params: ExecutorParams,
preparation_timeout: Duration,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker::start_work(
&metrics,
idle,
code,
pvf_with_params,
&cache_path,
artifact_path,
executor_params,
preparation_timeout,
)
.await;
Expand Down
3 changes: 1 addition & 2 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,8 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
code: job_data.pvf_with_params.code(),
pvf_with_params: job_data.pvf_with_params.clone(),
artifact_path,
executor_params: job_data.pvf_with_params.executor_params(),
preparation_timeout: job_data.preparation_timeout,
},
)
Expand Down
48 changes: 22 additions & 26 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
prepare::PrepareStats,
pvf::PvfWithExecutorParams,
worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
Expand All @@ -34,12 +35,12 @@ use crate::{
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use polkadot_primitives::vstaging::ExecutorParams;

use sp_core::hexdisplay::HexDisplay;
use std::{
panic,
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
sync::mpsc::channel,
time::Duration,
};
use tokio::{io, net::UnixStream};
Expand Down Expand Up @@ -83,10 +84,9 @@ pub enum Outcome {
pub async fn start_work(
metrics: &Metrics,
worker: IdleWorker,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
cache_path: &Path,
artifact_path: PathBuf,
executor_params: ExecutorParams,
preparation_timeout: Duration,
) -> Outcome {
let IdleWorker { stream, pid } = worker;
Expand All @@ -100,7 +100,7 @@ pub async fn start_work(

with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
if let Err(err) =
send_request(&mut stream, code, &tmp_file, &executor_params, preparation_timeout).await
send_request(&mut stream, pvf_with_params, &tmp_file, preparation_timeout).await
{
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -273,44 +273,42 @@ where

async fn send_request(
stream: &mut UnixStream,
code: Arc<Vec<u8>>,
pvf_with_params: PvfWithExecutorParams,
tmp_file: &Path,
executor_params: &ExecutorParams,
preparation_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &code).await?;
framed_send(stream, &pvf_with_params.encode()).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
framed_send(stream, &executor_params.encode()).await?;
framed_send(stream, &preparation_timeout.encode()).await?;
Ok(())
}

async fn recv_request(
stream: &mut UnixStream,
) -> io::Result<(Vec<u8>, PathBuf, ExecutorParams, Duration)> {
let code = framed_recv(stream).await?;
) -> io::Result<(PvfWithExecutorParams, PathBuf, Duration)> {
let pvf_with_params = framed_recv(stream).await?;
let pvf_with_params =
PvfWithExecutorParams::decode(&mut &pvf_with_params[..]).map_err(|e| {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode PvfWithExecutorParams: {}", e),
)
})?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
let executor_params_enc = framed_recv(stream).await?;
let executor_params = ExecutorParams::decode(&mut &executor_params_enc[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: failed to decode ExecutorParams".to_string(),
)
})?;
let preparation_timeout = framed_recv(stream).await?;
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode duration: {:?}", e),
)
})?;
Ok((code, tmp_file, executor_params, preparation_timeout))
Ok((pvf_with_params, tmp_file, preparation_timeout))
}

async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
Expand Down Expand Up @@ -362,8 +360,7 @@ pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
loop {
let worker_pid = std::process::id();
let (code, dest, executor_params, preparation_timeout) =
recv_request(&mut stream).await?;
let (pvf_with_params, dest, preparation_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
Expand All @@ -388,7 +385,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn another thread for preparation.
let prepare_fut = rt_handle
.spawn_blocking(move || {
let result = prepare_artifact(&code, executor_params);
let result = prepare_artifact(pvf_with_params);

// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -471,16 +468,15 @@ pub fn worker_entrypoint(socket_path: &str) {
}

fn prepare_artifact(
code: &[u8],
executor_params: ExecutorParams,
pvf_with_params: PvfWithExecutorParams,
) -> Result<CompiledArtifact, PrepareError> {
panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(code) {
let blob = match crate::executor_intf::prevalidate(&pvf_with_params.code()) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};

match crate::executor_intf::prepare(blob, executor_params) {
match crate::executor_intf::prepare(blob, &pvf_with_params.executor_params()) {
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
Expand Down
Loading