diff --git a/Cargo.lock b/Cargo.lock
index 4e37115acc04..54f27915d8a7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6862,7 +6862,7 @@ dependencies = [
"nix 0.26.2",
"polkadot-cli",
"polkadot-core-primitives",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf-prepare-worker",
"polkadot-overseer",
"substrate-rpc-client",
"tempfile",
@@ -6988,7 +6988,8 @@ dependencies = [
"futures",
"log",
"polkadot-client",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf-execute-worker",
+ "polkadot-node-core-pvf-prepare-worker",
"polkadot-node-metrics",
"polkadot-performance-test",
"polkadot-service",
@@ -7467,6 +7468,9 @@ dependencies = [
"parity-scale-codec",
"pin-project",
"polkadot-core-primitives",
+ "polkadot-node-core-pvf-common",
+ "polkadot-node-core-pvf-execute-worker",
+ "polkadot-node-core-pvf-prepare-worker",
"polkadot-node-metrics",
"polkadot-node-primitives",
"polkadot-parachain",
@@ -7479,6 +7483,8 @@ dependencies = [
"sp-wasm-interface",
"substrate-build-script-utils",
"tempfile",
+ "test-parachain-adder",
+ "test-parachain-halt",
"tokio",
"tracing-gum",
]
@@ -7507,15 +7513,32 @@ dependencies = [
]
[[package]]
-name = "polkadot-node-core-pvf-worker"
+name = "polkadot-node-core-pvf-common"
version = "0.9.41"
dependencies = [
- "assert_matches",
"cpu-time",
"futures",
"libc",
"parity-scale-codec",
- "polkadot-node-core-pvf",
+ "polkadot-parachain",
+ "polkadot-primitives",
+ "sc-executor-common",
+ "sc-executor-wasmtime",
+ "sp-core",
+ "sp-tracing",
+ "substrate-build-script-utils",
+ "tokio",
+ "tracing-gum",
+]
+
+[[package]]
+name = "polkadot-node-core-pvf-execute-worker"
+version = "0.9.41"
+dependencies = [
+ "cpu-time",
+ "futures",
+ "parity-scale-codec",
+ "polkadot-node-core-pvf-common",
"polkadot-parachain",
"polkadot-primitives",
"rayon",
@@ -7527,10 +7550,28 @@ dependencies = [
"sp-io",
"sp-maybe-compressed-blob",
"sp-tracing",
- "substrate-build-script-utils",
- "tempfile",
- "test-parachain-adder",
- "test-parachain-halt",
+ "tikv-jemalloc-ctl",
+ "tokio",
+ "tracing-gum",
+]
+
+[[package]]
+name = "polkadot-node-core-pvf-prepare-worker"
+version = "0.9.41"
+dependencies = [
+ "futures",
+ "libc",
+ "parity-scale-codec",
+ "polkadot-node-core-pvf-common",
+ "polkadot-parachain",
+ "polkadot-primitives",
+ "rayon",
+ "sc-executor",
+ "sc-executor-common",
+ "sc-executor-wasmtime",
+ "sp-io",
+ "sp-maybe-compressed-blob",
+ "sp-tracing",
"tikv-jemalloc-ctl",
"tokio",
"tracing-gum",
@@ -7786,7 +7827,7 @@ dependencies = [
"kusama-runtime",
"log",
"polkadot-erasure-coding",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf-prepare-worker",
"polkadot-node-primitives",
"polkadot-primitives",
"quote",
@@ -8292,7 +8333,8 @@ dependencies = [
"polkadot-node-core-backing",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-dispute-coordinator",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf-execute-worker",
+ "polkadot-node-core-pvf-prepare-worker",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
@@ -12464,7 +12506,7 @@ dependencies = [
"log",
"parity-scale-codec",
"polkadot-cli",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-parachain",
@@ -12512,7 +12554,7 @@ dependencies = [
"log",
"parity-scale-codec",
"polkadot-cli",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-parachain",
diff --git a/Cargo.toml b/Cargo.toml
index 02722fd28653..cc3b4e4c1d35 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,7 +24,7 @@ tikv-jemallocator = "0.5.0"
# Crates in our workspace, defined as dependencies so we can pass them feature flags.
polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ] }
-polkadot-node-core-pvf-worker = { path = "node/core/pvf/worker" }
+polkadot-node-core-pvf-prepare-worker = { path = "node/core/pvf/prepare-worker" }
polkadot-overseer = { path = "node/overseer" }
[dev-dependencies]
@@ -81,7 +81,9 @@ members = [
"node/core/parachains-inherent",
"node/core/provisioner",
"node/core/pvf",
- "node/core/pvf/worker",
+ "node/core/pvf/common",
+ "node/core/pvf/execute-worker",
+ "node/core/pvf/prepare-worker",
"node/core/pvf-checker",
"node/core/runtime-api",
"node/network/approval-distribution",
@@ -208,7 +210,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ]
fast-runtime = [ "polkadot-cli/fast-runtime" ]
runtime-metrics = [ "polkadot-cli/runtime-metrics" ]
pyroscope = ["polkadot-cli/pyroscope"]
-jemalloc-allocator = ["polkadot-node-core-pvf-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
+jemalloc-allocator = ["polkadot-node-core-pvf-prepare-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 04596d5a6d0b..1fe9fa696cfd 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -23,7 +23,8 @@ pyroscope_pprofrs = { version = "0.2", optional = true }
service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true }
polkadot-client = { path = "../node/client", optional = true }
-polkadot-node-core-pvf-worker = { path = "../node/core/pvf/worker", optional = true }
+polkadot-node-core-pvf-execute-worker = { path = "../node/core/pvf/execute-worker", optional = true }
+polkadot-node-core-pvf-prepare-worker = { path = "../node/core/pvf/prepare-worker", optional = true }
polkadot-performance-test = { path = "../node/test/performance-test", optional = true }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -54,7 +55,8 @@ cli = [
"frame-benchmarking-cli",
"try-runtime-cli",
"polkadot-client",
- "polkadot-node-core-pvf-worker",
+ "polkadot-node-core-pvf-execute-worker",
+ "polkadot-node-core-pvf-prepare-worker",
]
runtime-benchmarks = [
"service/runtime-benchmarks",
diff --git a/cli/src/command.rs b/cli/src/command.rs
index 802ba93941c3..132d8279eb13 100644
--- a/cli/src/command.rs
+++ b/cli/src/command.rs
@@ -495,7 +495,7 @@ pub fn run() -> Result<()> {
#[cfg(not(target_os = "android"))]
{
- polkadot_node_core_pvf_worker::prepare_worker_entrypoint(
+ polkadot_node_core_pvf_prepare_worker::worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
@@ -517,7 +517,7 @@ pub fn run() -> Result<()> {
#[cfg(not(target_os = "android"))]
{
- polkadot_node_core_pvf_worker::execute_worker_entrypoint(
+ polkadot_node_core_pvf_execute_worker::worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml
index 026930758b86..d00c13fda2b0 100644
--- a/node/core/pvf/Cargo.toml
+++ b/node/core/pvf/Cargo.toml
@@ -4,6 +4,10 @@ version.workspace = true
authors.workspace = true
edition.workspace = true
+[[bin]]
+name = "puppet_worker"
+path = "bin/puppet_worker.rs"
+
[dependencies]
always-assert = "0.1"
futures = "0.3.21"
@@ -13,12 +17,16 @@ libc = "0.2.139"
pin-project = "1.0.9"
rand = "0.8.5"
slotmap = "1.0"
+tempfile = "3.3.0"
tokio = { version = "1.24.2", features = ["fs", "process"] }
parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
+polkadot-node-core-pvf-common = { path = "common" }
+polkadot-node-core-pvf-execute-worker = { path = "execute-worker" }
+polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker" }
polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
@@ -34,4 +42,6 @@ substrate-build-script-utils = { git = "https://github.com/paritytech/substrate"
[dev-dependencies]
assert_matches = "1.4.0"
hex-literal = "0.3.4"
-tempfile = "3.3.0"
+
+adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" }
+halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }
diff --git a/node/core/pvf/worker/bin/puppet_worker.rs b/node/core/pvf/bin/puppet_worker.rs
similarity index 92%
rename from node/core/pvf/worker/bin/puppet_worker.rs
rename to node/core/pvf/bin/puppet_worker.rs
index ddd81971292b..7f93519d8454 100644
--- a/node/core/pvf/worker/bin/puppet_worker.rs
+++ b/node/core/pvf/bin/puppet_worker.rs
@@ -14,4 +14,4 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-polkadot_node_core_pvf_worker::decl_puppet_worker_main!();
+polkadot_node_core_pvf::decl_puppet_worker_main!();
diff --git a/node/core/pvf/common/Cargo.toml b/node/core/pvf/common/Cargo.toml
new file mode 100644
index 000000000000..de9fa10804c7
--- /dev/null
+++ b/node/core/pvf/common/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "polkadot-node-core-pvf-common"
+version.workspace = true
+authors.workspace = true
+edition.workspace = true
+
+[dependencies]
+cpu-time = "1.0.0"
+futures = "0.3.21"
+gum = { package = "tracing-gum", path = "../../../gum" }
+libc = "0.2.139"
+tokio = { version = "1.24.2", features = ["fs", "process", "io-util"] }
+
+parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
+
+polkadot-parachain = { path = "../../../../parachain" }
+polkadot-primitives = { path = "../../../../primitives" }
+
+sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[build-dependencies]
+substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/node/core/pvf/worker/build.rs b/node/core/pvf/common/build.rs
similarity index 100%
rename from node/core/pvf/worker/build.rs
rename to node/core/pvf/common/build.rs
diff --git a/node/core/pvf/common/src/error.rs b/node/core/pvf/common/src/error.rs
new file mode 100644
index 000000000000..56353c53b4d2
--- /dev/null
+++ b/node/core/pvf/common/src/error.rs
@@ -0,0 +1,106 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use crate::prepare::PrepareStats;
+use parity_scale_codec::{Decode, Encode};
+use std::fmt;
+
+/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
+/// successful
+pub type PrepareResult = Result;
+
+/// An error that occurred during the prepare part of the PVF pipeline.
+#[derive(Debug, Clone, Encode, Decode)]
+pub enum PrepareError {
+ /// During the prevalidation stage of preparation an issue was found with the PVF.
+ Prevalidation(String),
+ /// Compilation failed for the given PVF.
+ Preparation(String),
+ /// An unexpected panic has occurred in the preparation worker.
+ Panic(String),
+ /// Failed to prepare the PVF due to the time limit.
+ TimedOut,
+ /// An IO error occurred. This state is reported by either the validation host or by the worker.
+ IoErr(String),
+ /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
+ /// validation host (not by the worker).
+ CreateTmpFileErr(String),
+ /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination
+ /// location. This state is reported by the validation host (not by the worker).
+ RenameTmpFileErr(String),
+}
+
+impl PrepareError {
+ /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
+ /// errors depend on the PVF itself and the sc-executor/wasmtime logic.
+ ///
+ /// Non-deterministic errors can happen spuriously. Typically, they occur due to resource
+ /// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient
+ /// but may persist e.g. if the node is run by overwhelmingly underpowered machine.
+ pub fn is_deterministic(&self) -> bool {
+ use PrepareError::*;
+ match self {
+ Prevalidation(_) | Preparation(_) | Panic(_) => true,
+ TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
+ }
+ }
+}
+
+impl fmt::Display for PrepareError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use PrepareError::*;
+ match self {
+ Prevalidation(err) => write!(f, "prevalidation: {}", err),
+ Preparation(err) => write!(f, "preparation: {}", err),
+ Panic(err) => write!(f, "panic: {}", err),
+ TimedOut => write!(f, "prepare: timeout"),
+ IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
+ CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
+ RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err),
+ }
+ }
+}
+
+/// Some internal error occurred.
+///
+/// Should only ever be used for validation errors independent of the candidate and PVF, or for errors we ruled out
+/// during pre-checking (so preparation errors are fine).
+#[derive(Debug, Clone, Encode, Decode)]
+pub enum InternalValidationError {
+ /// Some communication error occurred with the host.
+ HostCommunication(String),
+ /// Could not find or open compiled artifact file.
+ CouldNotOpenFile(String),
+ /// An error occurred in the CPU time monitor thread. Should be totally unrelated to validation.
+ CpuTimeMonitorThread(String),
+ /// Some non-deterministic preparation error occurred.
+ NonDeterministicPrepareError(PrepareError),
+}
+
+impl fmt::Display for InternalValidationError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use InternalValidationError::*;
+ match self {
+ HostCommunication(err) =>
+ write!(f, "validation: some communication error occurred with the host: {}", err),
+ CouldNotOpenFile(err) =>
+ write!(f, "validation: could not find or open compiled artifact file: {}", err),
+ CpuTimeMonitorThread(err) =>
+ write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
+ NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err),
+ }
+ }
+}
diff --git a/node/core/pvf/common/src/execute.rs b/node/core/pvf/common/src/execute.rs
new file mode 100644
index 000000000000..de5ce39f7838
--- /dev/null
+++ b/node/core/pvf/common/src/execute.rs
@@ -0,0 +1,60 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use crate::error::InternalValidationError;
+use parity_scale_codec::{Decode, Encode};
+use polkadot_parachain::primitives::ValidationResult;
+use polkadot_primitives::ExecutorParams;
+use std::time::Duration;
+
+/// The payload of the one-time handshake that is done when a worker process is created. Carries
+/// data from the host to the worker.
+#[derive(Encode, Decode)]
+pub struct Handshake {
+ /// The executor parameters.
+ pub executor_params: ExecutorParams,
+}
+
+/// The response from an execution job on the worker.
+#[derive(Encode, Decode)]
+pub enum Response {
+ /// The job completed successfully.
+ Ok {
+ /// The result of parachain validation.
+ result_descriptor: ValidationResult,
+ /// The amount of CPU time taken by the job.
+ duration: Duration,
+ },
+ /// The candidate is invalid.
+ InvalidCandidate(String),
+ /// The job timed out.
+ TimedOut,
+ /// An unexpected panic has occurred in the execution worker.
+ Panic(String),
+ /// Some internal error occurred.
+ InternalError(InternalValidationError),
+}
+
+impl Response {
+ /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
+ pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
+ if msg.is_empty() {
+ Self::InvalidCandidate(ctx.to_string())
+ } else {
+ Self::InvalidCandidate(format!("{}: {}", ctx, msg))
+ }
+ }
+}
diff --git a/node/core/pvf/common/src/executor_intf.rs b/node/core/pvf/common/src/executor_intf.rs
new file mode 100644
index 000000000000..5926f3c5dbc7
--- /dev/null
+++ b/node/core/pvf/common/src/executor_intf.rs
@@ -0,0 +1,114 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Interface to the Substrate Executor
+
+use polkadot_primitives::{ExecutorParam, ExecutorParams};
+use sc_executor_common::wasm_runtime::HeapAllocStrategy;
+use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics};
+
+// Memory configuration
+//
+// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate
+// Runtime instance's linear memory. The exact number of pages is a sum of whatever the WASM blob
+// itself requests (by default at least enough to hold the data section as well as have some space
+// left for the stack; this is, of course, overridable at link time when compiling the runtime)
+// plus the number of pages specified in the `extra_heap_pages` passed to the executor.
+//
+// By default, rustc (or `lld` specifically) should allocate 1 MiB for the shadow stack, or 16 pages.
+// The data section for runtimes are typically rather small and can fit in a single digit number of
+// WASM pages, so let's say an extra 16 pages. Thus let's assume that 32 pages or 2 MiB are used for
+// these needs by default.
+const DEFAULT_HEAP_PAGES_ESTIMATE: u32 = 32;
+const EXTRA_HEAP_PAGES: u32 = 2048;
+
+/// The number of bytes devoted for the stack during wasm execution of a PVF.
+pub const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;
+
+// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED
+// They are used as base values for the execution environment parametrization.
+// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform
+// a runtime upgrade to make them active.
+pub const DEFAULT_CONFIG: Config = Config {
+ allow_missing_func_imports: true,
+ cache_path: None,
+ semantics: Semantics {
+ heap_alloc_strategy: sc_executor_common::wasm_runtime::HeapAllocStrategy::Dynamic {
+ maximum_pages: Some(DEFAULT_HEAP_PAGES_ESTIMATE + EXTRA_HEAP_PAGES),
+ },
+
+ instantiation_strategy:
+ sc_executor_wasmtime::InstantiationStrategy::RecreateInstanceCopyOnWrite,
+
+ // Enable deterministic stack limit to pin down the exact number of items the wasmtime stack
+ // can contain before it traps with stack overflow.
+ //
+ // Here is how the values below were chosen.
+ //
+ // At the moment of writing, the default native stack size limit is 1 MiB. Assuming a logical item
+ // (see the docs about the field and the instrumentation algorithm) is 8 bytes, 1 MiB can
+ // fit 2x 65536 logical items.
+ //
+ // Since reaching the native stack limit is undesirable, we halve the logical item limit and
+ // also increase the native 256x. This hopefully should preclude wasm code from reaching
+ // the stack limit set by the wasmtime.
+ deterministic_stack_limit: Some(DeterministicStackLimit {
+ logical_max: 65536,
+ native_stack_max: NATIVE_STACK_MAX,
+ }),
+ canonicalize_nans: true,
+ // Rationale for turning the multi-threaded compilation off is to make the preparation time
+ // easily reproducible and as deterministic as possible.
+ //
+ // Currently the prepare queue doesn't distinguish between precheck and prepare requests.
+ // On the one hand, it simplifies the code, on the other, however, slows down compile times
+ // for execute requests. This behavior may change in future.
+ parallel_compilation: false,
+
+ // WASM extensions. Only those that are meaningful to us may be controlled here. By default,
+ // we're using WASM MVP, which means all the extensions are disabled. Nevertheless, some
+ // extensions (e.g., sign extension ops) are enabled by Wasmtime and cannot be disabled.
+ wasm_reference_types: false,
+ wasm_simd: false,
+ wasm_bulk_memory: false,
+ wasm_multi_value: false,
+ },
+};
+
+pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result {
+ let mut sem = DEFAULT_CONFIG.semantics.clone();
+ let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
+ stack_limit
+ } else {
+ return Err("No default stack limit set".to_owned())
+ };
+
+ for p in par.iter() {
+ match p {
+ ExecutorParam::MaxMemoryPages(max_pages) =>
+ sem.heap_alloc_strategy =
+ HeapAllocStrategy::Dynamic { maximum_pages: Some(*max_pages) },
+ ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm,
+ ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm,
+ ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true,
+ // TODO: Not implemented yet; .
+ ExecutorParam::PrecheckingMaxMemory(_) => (),
+ ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here
+ }
+ }
+ sem.deterministic_stack_limit = Some(stack_limit);
+ Ok(sem)
+}
diff --git a/node/core/pvf/common/src/lib.rs b/node/core/pvf/common/src/lib.rs
new file mode 100644
index 000000000000..028fd9b17947
--- /dev/null
+++ b/node/core/pvf/common/src/lib.rs
@@ -0,0 +1,57 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Functionality that is shared by the host and the workers.
+
+pub mod error;
+pub mod execute;
+pub mod executor_intf;
+pub mod prepare;
+pub mod pvf;
+pub mod worker;
+
+pub use cpu_time::ProcessTime;
+
+const LOG_TARGET: &str = "parachain::pvf-common";
+
+use std::mem;
+use tokio::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
+
+#[doc(hidden)]
+pub mod tests {
+ use std::time::Duration;
+
+ pub const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
+ pub const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
+}
+
+/// Write some data prefixed by its length into `w`.
+pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
+ let len_buf = buf.len().to_le_bytes();
+ w.write_all(&len_buf).await?;
+ w.write_all(buf).await?;
+ Ok(())
+}
+
+/// Read some data prefixed by its length from `r`.
+pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result> {
+ let mut len_buf = [0u8; mem::size_of::()];
+ r.read_exact(&mut len_buf).await?;
+ let len = usize::from_le_bytes(len_buf);
+ let mut buf = vec![0; len];
+ r.read_exact(&mut buf).await?;
+ Ok(buf)
+}
diff --git a/node/core/pvf/common/src/prepare.rs b/node/core/pvf/common/src/prepare.rs
new file mode 100644
index 000000000000..ac64e2927a16
--- /dev/null
+++ b/node/core/pvf/common/src/prepare.rs
@@ -0,0 +1,48 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use parity_scale_codec::{Decode, Encode};
+
+/// Preparation statistics, including the CPU time and memory taken.
+#[derive(Debug, Clone, Default, Encode, Decode)]
+pub struct PrepareStats {
+ /// The CPU time that elapsed for the preparation job.
+ pub cpu_time_elapsed: std::time::Duration,
+ /// The observed memory statistics for the preparation job.
+ pub memory_stats: MemoryStats,
+}
+
+/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if
+/// supported by the OS, `ru_maxrss`.
+#[derive(Clone, Debug, Default, Encode, Decode)]
+pub struct MemoryStats {
+ /// Memory stats from `tikv_jemalloc_ctl`.
+ #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+ pub memory_tracker_stats: Option,
+ /// `ru_maxrss` from `getrusage`. `None` if an error occurred.
+ #[cfg(target_os = "linux")]
+ pub max_rss: Option,
+}
+
+/// Statistics of collected memory metrics.
+#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
+#[derive(Clone, Debug, Default, Encode, Decode)]
+pub struct MemoryAllocationStats {
+ /// Total resident memory, in bytes.
+ pub resident: u64,
+ /// Total allocated memory, in bytes.
+ pub allocated: u64,
+}
diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/common/src/pvf.rs
similarity index 81%
rename from node/core/pvf/src/pvf.rs
rename to node/core/pvf/common/src/pvf.rs
index c134cacb4acf..1661f324083a 100644
--- a/node/core/pvf/src/pvf.rs
+++ b/node/core/pvf/common/src/pvf.rs
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use crate::artifacts::ArtifactId;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParams;
@@ -26,9 +25,6 @@ use std::{
time::Duration,
};
-#[cfg(test)]
-use crate::host::tests::TEST_PREPARATION_TIMEOUT;
-
/// A struct that carries the exhaustive set of data to prepare an artifact out of plain
/// Wasm binary
///
@@ -58,13 +54,8 @@ impl PvfPrepData {
Self { code, code_hash, executor_params, prep_timeout }
}
- /// Returns artifact ID that corresponds to the PVF with given executor params
- pub(crate) fn as_artifact_id(&self) -> ArtifactId {
- ArtifactId::new(self.code_hash, self.executor_params.hash())
- }
-
/// Returns validation code hash for the PVF
- pub(crate) fn code_hash(&self) -> ValidationCodeHash {
+ pub fn code_hash(&self) -> ValidationCodeHash {
self.code_hash
}
@@ -83,16 +74,17 @@ impl PvfPrepData {
self.prep_timeout
}
- /// Creates a structure for tests
- #[cfg(test)]
- pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self {
+ /// Creates a structure for tests.
+ #[doc(hidden)]
+ pub fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self {
let descriminator_buf = num.to_le_bytes().to_vec();
Self::from_code(descriminator_buf, ExecutorParams::default(), timeout)
}
- #[cfg(test)]
- pub(crate) fn from_discriminator(num: u32) -> Self {
- Self::from_discriminator_and_timeout(num, TEST_PREPARATION_TIMEOUT)
+ /// Creates a structure for tests.
+ #[doc(hidden)]
+ pub fn from_discriminator(num: u32) -> Self {
+ Self::from_discriminator_and_timeout(num, crate::tests::TEST_PREPARATION_TIMEOUT)
}
}
diff --git a/node/core/pvf/worker/src/common.rs b/node/core/pvf/common/src/worker.rs
similarity index 90%
rename from node/core/pvf/worker/src/common.rs
rename to node/core/pvf/common/src/worker.rs
index 00289737a5c8..debe18985b37 100644
--- a/node/core/pvf/worker/src/common.rs
+++ b/node/core/pvf/common/src/worker.rs
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
+//! Functionality common to both prepare and execute workers.
+
use crate::LOG_TARGET;
use cpu_time::ProcessTime;
use futures::never::Never;
@@ -25,6 +27,42 @@ use std::{
};
use tokio::{io, net::UnixStream, runtime::Runtime};
+/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for
+/// spawning the desired worker.
+#[macro_export]
+macro_rules! decl_worker_main {
+ ($expected_command:expr, $entrypoint:expr) => {
+ fn main() {
+ ::sp_tracing::try_init_simple();
+
+ let args = std::env::args().collect::>();
+ if args.len() < 3 {
+ panic!("wrong number of arguments");
+ }
+
+ let mut version = None;
+ let mut socket_path: &str = "";
+
+ for i in 2..args.len() {
+ match args[i].as_ref() {
+ "--socket-path" => socket_path = args[i + 1].as_str(),
+ "--node-version" => version = Some(args[i + 1].as_str()),
+ _ => (),
+ }
+ }
+
+ let subcommand = &args[1];
+ if subcommand != $expected_command {
+ panic!(
+ "trying to run {} binary with the {} subcommand",
+ $expected_command, subcommand
+ )
+ }
+ $entrypoint(&socket_path, version);
+ }
+ };
+}
+
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
diff --git a/node/core/pvf/worker/Cargo.toml b/node/core/pvf/execute-worker/Cargo.toml
similarity index 67%
rename from node/core/pvf/worker/Cargo.toml
rename to node/core/pvf/execute-worker/Cargo.toml
index 53d548dbac6f..c360cee8bf5d 100644
--- a/node/core/pvf/worker/Cargo.toml
+++ b/node/core/pvf/execute-worker/Cargo.toml
@@ -1,27 +1,20 @@
[package]
-name = "polkadot-node-core-pvf-worker"
+name = "polkadot-node-core-pvf-execute-worker"
version.workspace = true
authors.workspace = true
edition.workspace = true
-[[bin]]
-name = "puppet_worker"
-path = "bin/puppet_worker.rs"
-
[dependencies]
-assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../../gum" }
-libc = "0.2.139"
rayon = "1.5.1"
-tempfile = "3.3.0"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
-tokio = "1.24.2"
+tokio = { version = "1.24.2", features = ["fs", "process"] }
parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
-polkadot-node-core-pvf = { path = ".." }
+polkadot-node-core-pvf-common = { path = "../common" }
polkadot-parachain = { path = "../../../../parachain" }
polkadot-primitives = { path = "../../../../primitives" }
@@ -37,12 +30,5 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master
[target.'cfg(target_os = "linux")'.dependencies]
tikv-jemalloc-ctl = "0.5.0"
-[build-dependencies]
-substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
-
-[dev-dependencies]
-adder = { package = "test-parachain-adder", path = "../../../../parachain/test-parachains/adder" }
-halt = { package = "test-parachain-halt", path = "../../../../parachain/test-parachains/halt" }
-
[features]
-jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
+builder = []
diff --git a/node/core/pvf/worker/src/executor_intf.rs b/node/core/pvf/execute-worker/src/executor_intf.rs
similarity index 65%
rename from node/core/pvf/worker/src/executor_intf.rs
rename to node/core/pvf/execute-worker/src/executor_intf.rs
index ff286dd74d64..98424a3dcd1d 100644
--- a/node/core/pvf/worker/src/executor_intf.rs
+++ b/node/core/pvf/execute-worker/src/executor_intf.rs
@@ -16,13 +16,16 @@
//! Interface to the Substrate Executor
-use polkadot_primitives::{ExecutorParam, ExecutorParams};
+use polkadot_node_core_pvf_common::executor_intf::{
+ params_to_wasmtime_semantics, DEFAULT_CONFIG, NATIVE_STACK_MAX,
+};
+use polkadot_primitives::ExecutorParams;
use sc_executor_common::{
error::WasmError,
runtime_blob::RuntimeBlob,
- wasm_runtime::{HeapAllocStrategy, InvokeMethod, WasmModule as _},
+ wasm_runtime::{InvokeMethod, WasmModule as _},
};
-use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics, WasmtimeRuntime};
+use sc_executor_wasmtime::{Config, WasmtimeRuntime};
use sp_core::storage::{ChildInfo, TrackedStorageKey};
use sp_externalities::MultiRemovalResults;
use std::any::{Any, TypeId};
@@ -63,119 +66,6 @@ use std::any::{Any, TypeId};
/// The stack size for the execute thread.
pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
-// Memory configuration
-//
-// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate
-// Runtime instance's linear memory. The exact number of pages is a sum of whatever the WASM blob
-// itself requests (by default at least enough to hold the data section as well as have some space
-// left for the stack; this is, of course, overridable at link time when compiling the runtime)
-// plus the number of pages specified in the `extra_heap_pages` passed to the executor.
-//
-// By default, rustc (or `lld` specifically) should allocate 1 MiB for the shadow stack, or 16 pages.
-// The data section for runtimes are typically rather small and can fit in a single digit number of
-// WASM pages, so let's say an extra 16 pages. Thus let's assume that 32 pages or 2 MiB are used for
-// these needs by default.
-const DEFAULT_HEAP_PAGES_ESTIMATE: u32 = 32;
-const EXTRA_HEAP_PAGES: u32 = 2048;
-
-/// The number of bytes devoted for the stack during wasm execution of a PVF.
-const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;
-
-// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED
-// They are used as base values for the execution environment parametrization.
-// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform
-// a runtime upgrade to make them active.
-const DEFAULT_CONFIG: Config = Config {
- allow_missing_func_imports: true,
- cache_path: None,
- semantics: Semantics {
- heap_alloc_strategy: sc_executor_common::wasm_runtime::HeapAllocStrategy::Dynamic {
- maximum_pages: Some(DEFAULT_HEAP_PAGES_ESTIMATE + EXTRA_HEAP_PAGES),
- },
-
- instantiation_strategy:
- sc_executor_wasmtime::InstantiationStrategy::RecreateInstanceCopyOnWrite,
-
- // Enable deterministic stack limit to pin down the exact number of items the wasmtime stack
- // can contain before it traps with stack overflow.
- //
- // Here is how the values below were chosen.
- //
- // At the moment of writing, the default native stack size limit is 1 MiB. Assuming a logical item
- // (see the docs about the field and the instrumentation algorithm) is 8 bytes, 1 MiB can
- // fit 2x 65536 logical items.
- //
- // Since reaching the native stack limit is undesirable, we halve the logical item limit and
- // also increase the native 256x. This hopefully should preclude wasm code from reaching
- // the stack limit set by the wasmtime.
- deterministic_stack_limit: Some(DeterministicStackLimit {
- logical_max: 65536,
- native_stack_max: NATIVE_STACK_MAX,
- }),
- canonicalize_nans: true,
- // Rationale for turning the multi-threaded compilation off is to make the preparation time
- // easily reproducible and as deterministic as possible.
- //
- // Currently the prepare queue doesn't distinguish between precheck and prepare requests.
- // On the one hand, it simplifies the code, on the other, however, slows down compile times
- // for execute requests. This behavior may change in future.
- parallel_compilation: false,
-
- // WASM extensions. Only those that are meaningful to us may be controlled here. By default,
- // we're using WASM MVP, which means all the extensions are disabled. Nevertheless, some
- // extensions (e.g., sign extension ops) are enabled by Wasmtime and cannot be disabled.
- wasm_reference_types: false,
- wasm_simd: false,
- wasm_bulk_memory: false,
- wasm_multi_value: false,
- },
-};
-
-/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
-pub fn prevalidate(code: &[u8]) -> Result {
- let blob = RuntimeBlob::new(code)?;
- // It's assumed this function will take care of any prevalidation logic
- // that needs to be done.
- //
- // Do nothing for now.
- Ok(blob)
-}
-
-/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
-/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
-pub fn prepare(
- blob: RuntimeBlob,
- executor_params: &ExecutorParams,
-) -> Result, sc_executor_common::error::WasmError> {
- let semantics = params_to_wasmtime_semantics(executor_params)
- .map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
- sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
-}
-
-fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result {
- let mut sem = DEFAULT_CONFIG.semantics.clone();
- let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
- stack_limit
- } else {
- return Err("No default stack limit set".to_owned())
- };
-
- for p in par.iter() {
- match p {
- ExecutorParam::MaxMemoryPages(max_pages) =>
- sem.heap_alloc_strategy =
- HeapAllocStrategy::Dynamic { maximum_pages: Some(*max_pages) },
- ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm,
- ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm,
- ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true,
- ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet
- ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here
- }
- }
- sem.deterministic_stack_limit = Some(stack_limit);
- Ok(sem)
-}
-
#[derive(Clone)]
pub struct Executor {
config: Config,
diff --git a/node/core/pvf/worker/src/execute.rs b/node/core/pvf/execute-worker/src/lib.rs
similarity index 93%
rename from node/core/pvf/worker/src/execute.rs
rename to node/core/pvf/execute-worker/src/lib.rs
index c5b8ddc9dd18..0ac39aafb0c9 100644
--- a/node/core/pvf/worker/src/execute.rs
+++ b/node/core/pvf/execute-worker/src/lib.rs
@@ -14,20 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use crate::{
- common::{
+mod executor_intf;
+
+pub use executor_intf::Executor;
+
+// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
+// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
+const LOG_TARGET: &str = "parachain::pvf-execute-worker";
+
+use crate::executor_intf::EXECUTE_THREAD_STACK_SIZE;
+use cpu_time::ProcessTime;
+use parity_scale_codec::{Decode, Encode};
+use polkadot_node_core_pvf_common::{
+ error::InternalValidationError,
+ execute::{Handshake, Response},
+ framed_recv, framed_send,
+ worker::{
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload,
thread::{self, WaitOutcome},
worker_event_loop,
},
- executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE},
- LOG_TARGET,
-};
-use cpu_time::ProcessTime;
-use parity_scale_codec::{Decode, Encode};
-use polkadot_node_core_pvf::{
- framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
- InternalValidationError,
};
use polkadot_parachain::primitives::ValidationResult;
use std::{
diff --git a/node/core/pvf/prepare-worker/Cargo.toml b/node/core/pvf/prepare-worker/Cargo.toml
new file mode 100644
index 000000000000..07386de35962
--- /dev/null
+++ b/node/core/pvf/prepare-worker/Cargo.toml
@@ -0,0 +1,33 @@
+[package]
+name = "polkadot-node-core-pvf-prepare-worker"
+version.workspace = true
+authors.workspace = true
+edition.workspace = true
+
+[dependencies]
+futures = "0.3.21"
+gum = { package = "tracing-gum", path = "../../../gum" }
+libc = "0.2.139"
+rayon = "1.5.1"
+tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
+tokio = { version = "1.24.2", features = ["fs", "process"] }
+
+parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }
+
+polkadot-node-core-pvf-common = { path = "../common" }
+polkadot-parachain = { path = "../../../../parachain" }
+polkadot-primitives = { path = "../../../../primitives" }
+
+sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[target.'cfg(target_os = "linux")'.dependencies]
+tikv-jemalloc-ctl = "0.5.0"
+
+[features]
+builder = []
+jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
diff --git a/node/core/pvf/prepare-worker/src/executor_intf.rs b/node/core/pvf/prepare-worker/src/executor_intf.rs
new file mode 100644
index 000000000000..1f88f6a6dd6e
--- /dev/null
+++ b/node/core/pvf/prepare-worker/src/executor_intf.rs
@@ -0,0 +1,42 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Interface to the Substrate Executor
+
+use polkadot_node_core_pvf_common::executor_intf::params_to_wasmtime_semantics;
+use polkadot_primitives::ExecutorParams;
+use sc_executor_common::runtime_blob::RuntimeBlob;
+
+/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
+pub fn prevalidate(code: &[u8]) -> Result {
+ let blob = RuntimeBlob::new(code)?;
+ // It's assumed this function will take care of any prevalidation logic
+ // that needs to be done.
+ //
+ // Do nothing for now.
+ Ok(blob)
+}
+
+/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
+/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
+pub fn prepare(
+ blob: RuntimeBlob,
+ executor_params: &ExecutorParams,
+) -> Result, sc_executor_common::error::WasmError> {
+ let semantics = params_to_wasmtime_semantics(executor_params)
+ .map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
+ sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
+}
diff --git a/node/core/pvf/worker/src/prepare.rs b/node/core/pvf/prepare-worker/src/lib.rs
similarity index 90%
rename from node/core/pvf/worker/src/prepare.rs
rename to node/core/pvf/prepare-worker/src/lib.rs
index fe9c1a85545a..8f36ef397cfb 100644
--- a/node/core/pvf/worker/src/prepare.rs
+++ b/node/core/pvf/prepare-worker/src/lib.rs
@@ -14,23 +14,31 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
+mod executor_intf;
+mod memory_stats;
+
+pub use executor_intf::{prepare, prevalidate};
+
+// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
+// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`.
+const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
+
#[cfg(target_os = "linux")]
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
-use crate::{
- common::{
+use parity_scale_codec::{Decode, Encode};
+use polkadot_node_core_pvf_common::{
+ error::{PrepareError, PrepareResult},
+ framed_recv, framed_send,
+ prepare::{MemoryStats, PrepareStats},
+ pvf::PvfPrepData,
+ worker::{
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload,
thread::{self, WaitOutcome},
worker_event_loop,
},
- prepare, prevalidate, LOG_TARGET,
-};
-use cpu_time::ProcessTime;
-use parity_scale_codec::{Decode, Encode};
-use polkadot_node_core_pvf::{
- framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult,
- PrepareStats, PvfPrepData,
+ ProcessTime,
};
use std::{
path::PathBuf,
@@ -39,6 +47,22 @@ use std::{
};
use tokio::{io, net::UnixStream};
+/// Contains the bytes for a successfully compiled artifact.
+pub struct CompiledArtifact(Vec);
+
+impl CompiledArtifact {
+ /// Creates a `CompiledArtifact`.
+ pub fn new(code: Vec) -> Self {
+ Self(code)
+ }
+}
+
+impl AsRef<[u8]> for CompiledArtifact {
+ fn as_ref(&self) -> &[u8] {
+ self.0.as_slice()
+ }
+}
+
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
let pvf = framed_recv(stream).await?;
let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
diff --git a/node/core/pvf/worker/src/memory_stats.rs b/node/core/pvf/prepare-worker/src/memory_stats.rs
similarity index 97%
rename from node/core/pvf/worker/src/memory_stats.rs
rename to node/core/pvf/prepare-worker/src/memory_stats.rs
index 907f793d87af..e6dc8572c4a3 100644
--- a/node/core/pvf/worker/src/memory_stats.rs
+++ b/node/core/pvf/prepare-worker/src/memory_stats.rs
@@ -33,11 +33,11 @@
/// NOTE: Requires jemalloc enabled.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub mod memory_tracker {
- use crate::{
- common::{stringify_panic_payload, thread},
- LOG_TARGET,
+ use crate::LOG_TARGET;
+ use polkadot_node_core_pvf_common::{
+ prepare::MemoryAllocationStats,
+ worker::{stringify_panic_payload, thread},
};
- use polkadot_node_core_pvf::MemoryAllocationStats;
use std::{thread::JoinHandle, time::Duration};
use tikv_jemalloc_ctl::{epoch, stats, Error};
diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs
index d5a660cc3aa5..78d2f88941b8 100644
--- a/node/core/pvf/src/artifacts.rs
+++ b/node/core/pvf/src/artifacts.rs
@@ -55,8 +55,9 @@
//! older by a predefined parameter. This process is run very rarely (say, once a day). Once the
//! artifact is expired it is removed from disk eagerly atomically.
-use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats};
+use crate::host::PrepareResultSender;
use always_assert::always;
+use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData};
use polkadot_parachain::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParamsHash;
use std::{
@@ -65,22 +66,6 @@ use std::{
time::{Duration, SystemTime},
};
-/// Contains the bytes for a successfully compiled artifact.
-pub struct CompiledArtifact(Vec);
-
-impl CompiledArtifact {
- /// Creates a `CompiledArtifact`.
- pub fn new(code: Vec) -> Self {
- Self(code)
- }
-}
-
-impl AsRef<[u8]> for CompiledArtifact {
- fn as_ref(&self) -> &[u8] {
- self.0.as_slice()
- }
-}
-
/// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactId {
@@ -96,6 +81,11 @@ impl ArtifactId {
Self { code_hash, executor_params_hash }
}
+ /// Returns an artifact ID that corresponds to the PVF with given executor params.
+ pub fn from_pvf_prep_data(pvf: &PvfPrepData) -> Self {
+ Self::new(pvf.code_hash(), pvf.executor_params().hash())
+ }
+
/// Tries to recover the artifact id from the given file name.
#[cfg(test)]
pub fn from_file_name(file_name: &str) -> Option {
@@ -304,7 +294,7 @@ mod tests {
#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
- let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap();
+ let fake_cache_path = crate::worker_intf::tmpfile("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs
index 33f3f00810f2..7372cd233c49 100644
--- a/node/core/pvf/src/error.rs
+++ b/node/core/pvf/src/error.rs
@@ -14,65 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use crate::prepare::PrepareStats;
-use parity_scale_codec::{Decode, Encode};
-use std::fmt;
-
-/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
-/// successful
-pub type PrepareResult = Result;
-
-/// An error that occurred during the prepare part of the PVF pipeline.
-#[derive(Debug, Clone, Encode, Decode)]
-pub enum PrepareError {
- /// During the prevalidation stage of preparation an issue was found with the PVF.
- Prevalidation(String),
- /// Compilation failed for the given PVF.
- Preparation(String),
- /// An unexpected panic has occurred in the preparation worker.
- Panic(String),
- /// Failed to prepare the PVF due to the time limit.
- TimedOut,
- /// An IO error occurred. This state is reported by either the validation host or by the worker.
- IoErr(String),
- /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
- /// validation host (not by the worker).
- CreateTmpFileErr(String),
- /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination
- /// location. This state is reported by the validation host (not by the worker).
- RenameTmpFileErr(String),
-}
-
-impl PrepareError {
- /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
- /// errors depend on the PVF itself and the sc-executor/wasmtime logic.
- ///
- /// Non-deterministic errors can happen spuriously. Typically, they occur due to resource
- /// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient
- /// but may persist e.g. if the node is run by overwhelmingly underpowered machine.
- pub fn is_deterministic(&self) -> bool {
- use PrepareError::*;
- match self {
- Prevalidation(_) | Preparation(_) | Panic(_) => true,
- TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
- }
- }
-}
-
-impl fmt::Display for PrepareError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- use PrepareError::*;
- match self {
- Prevalidation(err) => write!(f, "prevalidation: {}", err),
- Preparation(err) => write!(f, "preparation: {}", err),
- Panic(err) => write!(f, "panic: {}", err),
- TimedOut => write!(f, "prepare: timeout"),
- IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
- CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
- RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err),
- }
- }
-}
+use polkadot_node_core_pvf_common::error::{InternalValidationError, PrepareError};
/// A error raised during validation of the candidate.
#[derive(Debug, Clone)]
@@ -122,37 +64,6 @@ pub enum InvalidCandidate {
Panic(String),
}
-/// Some internal error occurred.
-///
-/// Should only ever be used for validation errors independent of the candidate and PVF, or for errors we ruled out
-/// during pre-checking (so preparation errors are fine).
-#[derive(Debug, Clone, Encode, Decode)]
-pub enum InternalValidationError {
- /// Some communication error occurred with the host.
- HostCommunication(String),
- /// Could not find or open compiled artifact file.
- CouldNotOpenFile(String),
- /// An error occurred in the CPU time monitor thread. Should be totally unrelated to validation.
- CpuTimeMonitorThread(String),
- /// Some non-deterministic preparation error occurred.
- NonDeterministicPrepareError(PrepareError),
-}
-
-impl fmt::Display for InternalValidationError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- use InternalValidationError::*;
- match self {
- HostCommunication(err) =>
- write!(f, "validation: some communication error occurred with the host: {}", err),
- CouldNotOpenFile(err) =>
- write!(f, "validation: could not find or open compiled artifact file: {}", err),
- CpuTimeMonitorThread(err) =>
- write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
- NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err),
- }
- }
-}
-
impl From for ValidationError {
fn from(error: InternalValidationError) -> Self {
Self::InternalError(error)
diff --git a/node/core/pvf/src/execute/mod.rs b/node/core/pvf/src/execute/mod.rs
index 8e3b17d71569..669b9dc04d7c 100644
--- a/node/core/pvf/src/execute/mod.rs
+++ b/node/core/pvf/src/execute/mod.rs
@@ -24,4 +24,3 @@ mod queue;
mod worker_intf;
pub use queue::{start, PendingExecutionRequest, ToQueue};
-pub use worker_intf::{Handshake as ExecuteHandshake, Response as ExecuteResponse};
diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs
index 61cebc5e2c46..395697616b36 100644
--- a/node/core/pvf/src/execute/queue.rs
+++ b/node/core/pvf/src/execute/queue.rs
@@ -21,7 +21,7 @@ use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
- worker_common::{IdleWorker, WorkerHandle},
+ worker_intf::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use futures::{
diff --git a/node/core/pvf/src/execute/worker_intf.rs b/node/core/pvf/src/execute/worker_intf.rs
index 4c26aeb0260a..6e54e17e515a 100644
--- a/node/core/pvf/src/execute/worker_intf.rs
+++ b/node/core/pvf/src/execute/worker_intf.rs
@@ -18,17 +18,20 @@
use crate::{
artifacts::ArtifactPathId,
- error::InternalValidationError,
- worker_common::{
- framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr,
- WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+ worker_intf::{
+ path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr, WorkerHandle,
+ JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
use futures::FutureExt;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
-
+use polkadot_node_core_pvf_common::{
+ error::InternalValidationError,
+ execute::{Handshake, Response},
+ framed_recv, framed_send,
+};
use polkadot_parachain::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
use std::{path::Path, time::Duration};
@@ -208,42 +211,3 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result {
)
})
}
-
-/// The payload of the one-time handshake that is done when a worker process is created. Carries
-/// data from the host to the worker.
-#[derive(Encode, Decode)]
-pub struct Handshake {
- /// The executor parameters.
- pub executor_params: ExecutorParams,
-}
-
-/// The response from an execution job on the worker.
-#[derive(Encode, Decode)]
-pub enum Response {
- /// The job completed successfully.
- Ok {
- /// The result of parachain validation.
- result_descriptor: ValidationResult,
- /// The amount of CPU time taken by the job.
- duration: Duration,
- },
- /// The candidate is invalid.
- InvalidCandidate(String),
- /// The job timed out.
- TimedOut,
- /// An unexpected panic has occurred in the execution worker.
- Panic(String),
- /// Some internal error occurred.
- InternalError(InternalValidationError),
-}
-
-impl Response {
- /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
- pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
- if msg.is_empty() {
- Self::InvalidCandidate(ctx.to_string())
- } else {
- Self::InvalidCandidate(format!("{}: {}", ctx, msg))
- }
- }
-}
diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs
index bfc775a32dee..67f4a66e9748 100644
--- a/node/core/pvf/src/host.rs
+++ b/node/core/pvf/src/host.rs
@@ -22,16 +22,19 @@
use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
- error::PrepareError,
execute::{self, PendingExecutionRequest},
metrics::Metrics,
- prepare, PrepareResult, Priority, PvfPrepData, ValidationError, LOG_TARGET,
+ prepare, Priority, ValidationError, LOG_TARGET,
};
use always_assert::never;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
};
+use polkadot_node_core_pvf_common::{
+ error::{PrepareError, PrepareResult},
+ pvf::PvfPrepData,
+};
use polkadot_parachain::primitives::ValidationResult;
use std::{
collections::HashMap,
@@ -423,7 +426,7 @@ async fn handle_precheck_pvf(
pvf: PvfPrepData,
result_sender: PrepareResultSender,
) -> Result<(), Fatal> {
- let artifact_id = pvf.as_artifact_id();
+ let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
@@ -467,7 +470,7 @@ async fn handle_execute_pvf(
inputs: ExecutePvfInputs,
) -> Result<(), Fatal> {
let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs;
- let artifact_id = pvf.as_artifact_id();
+ let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
let executor_params = (*pvf.executor_params()).clone();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
@@ -590,7 +593,7 @@ async fn handle_heads_up(
let now = SystemTime::now();
for active_pvf in active_pvfs {
- let artifact_id = active_pvf.as_artifact_id();
+ let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf);
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, .. } => {
@@ -854,9 +857,10 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream-
#[cfg(test)]
pub(crate) mod tests {
use super::*;
- use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError};
+ use crate::InvalidCandidate;
use assert_matches::assert_matches;
use futures::future::BoxFuture;
+ use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats};
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
@@ -877,7 +881,7 @@ pub(crate) mod tests {
/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn artifact_id(descriminator: u32) -> ArtifactId {
- PvfPrepData::from_discriminator(descriminator).as_artifact_id()
+ ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(descriminator))
}
fn artifact_path(descriminator: u32) -> PathBuf {
diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs
index 9b302150fd36..d8b801292ca8 100644
--- a/node/core/pvf/src/lib.rs
+++ b/node/core/pvf/src/lib.rs
@@ -95,27 +95,31 @@ mod host;
mod metrics;
mod prepare;
mod priority;
-mod pvf;
-mod worker_common;
+mod worker_intf;
-pub use artifacts::CompiledArtifact;
-pub use error::{
- InternalValidationError, InvalidCandidate, PrepareError, PrepareResult, ValidationError,
-};
-pub use execute::{ExecuteHandshake, ExecuteResponse};
-#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-pub use prepare::MemoryAllocationStats;
-pub use prepare::{MemoryStats, PrepareStats};
-pub use priority::Priority;
-pub use pvf::PvfPrepData;
+#[doc(hidden)]
+pub mod testing;
+
+// Used by `decl_puppet_worker_main!`.
+#[doc(hidden)]
+pub use sp_tracing;
+pub use error::{InvalidCandidate, ValidationError};
pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
-pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
+pub use priority::Priority;
+pub use worker_intf::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
+
+// Re-export some common types.
+pub use polkadot_node_core_pvf_common::{
+ error::{InternalValidationError, PrepareError},
+ prepare::PrepareStats,
+ pvf::PvfPrepData,
+};
-const LOG_TARGET: &str = "parachain::pvf";
+// Re-export worker entrypoints.
+pub use polkadot_node_core_pvf_execute_worker::worker_entrypoint as execute_worker_entrypoint;
+pub use polkadot_node_core_pvf_prepare_worker::worker_entrypoint as prepare_worker_entrypoint;
-#[doc(hidden)]
-pub mod testing {
- pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
-}
+/// The log target for this crate.
+pub const LOG_TARGET: &str = "parachain::pvf";
diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs
index 12bcd9eadad3..62f8c6dc5157 100644
--- a/node/core/pvf/src/metrics.rs
+++ b/node/core/pvf/src/metrics.rs
@@ -16,7 +16,7 @@
//! Prometheus metrics related to the validation host.
-use crate::prepare::MemoryStats;
+use polkadot_node_core_pvf_common::prepare::MemoryStats;
use polkadot_node_metrics::metrics::{self, prometheus};
/// Validation host metrics.
diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs
index de40c48464c4..580f67f73fa0 100644
--- a/node/core/pvf/src/prepare/mod.rs
+++ b/node/core/pvf/src/prepare/mod.rs
@@ -28,36 +28,3 @@ mod worker_intf;
pub use pool::start as start_pool;
pub use queue::{start as start_queue, FromQueue, ToQueue};
-
-use parity_scale_codec::{Decode, Encode};
-
-/// Preparation statistics, including the CPU time and memory taken.
-#[derive(Debug, Clone, Default, Encode, Decode)]
-pub struct PrepareStats {
- /// The CPU time that elapsed for the preparation job.
- pub cpu_time_elapsed: std::time::Duration,
- /// The observed memory statistics for the preparation job.
- pub memory_stats: MemoryStats,
-}
-
-/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if
-/// supported by the OS, `ru_maxrss`.
-#[derive(Clone, Debug, Default, Encode, Decode)]
-pub struct MemoryStats {
- /// Memory stats from `tikv_jemalloc_ctl`.
- #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
- pub memory_tracker_stats: Option,
- /// `ru_maxrss` from `getrusage`. `None` if an error occurred.
- #[cfg(target_os = "linux")]
- pub max_rss: Option,
-}
-
-/// Statistics of collected memory metrics.
-#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
-#[derive(Clone, Debug, Default, Encode, Decode)]
-pub struct MemoryAllocationStats {
- /// Total resident memory, in bytes.
- pub resident: u64,
- /// Total allocated memory, in bytes.
- pub allocated: u64,
-}
diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs
index d151f097805e..ae8ecff5285c 100644
--- a/node/core/pvf/src/prepare/pool.rs
+++ b/node/core/pvf/src/prepare/pool.rs
@@ -16,16 +16,18 @@
use super::worker_intf::{self, Outcome};
use crate::{
- error::{PrepareError, PrepareResult},
metrics::Metrics,
- pvf::PvfPrepData,
- worker_common::{IdleWorker, WorkerHandle},
+ worker_intf::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
use always_assert::never;
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
+use polkadot_node_core_pvf_common::{
+ error::{PrepareError, PrepareResult},
+ pvf::PvfPrepData,
+};
use slotmap::HopSlotMap;
use std::{
fmt,
diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs
index f84d5ab0e56e..5e19a4c7217a 100644
--- a/node/core/pvf/src/prepare/queue.rs
+++ b/node/core/pvf/src/prepare/queue.rs
@@ -17,11 +17,10 @@
//! A queue that handles requests for PVF preparation.
use super::pool::{self, Worker};
-use crate::{
- artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfPrepData, LOG_TARGET,
-};
+use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, LOG_TARGET};
use always_assert::{always, never};
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
+use polkadot_node_core_pvf_common::{error::PrepareResult, pvf::PvfPrepData};
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
@@ -231,7 +230,7 @@ async fn handle_enqueue(
);
queue.metrics.prepare_enqueued();
- let artifact_id = pvf.as_artifact_id();
+ let artifact_id = ArtifactId::from_pvf_prep_data(&pvf);
if never!(
queue.artifact_id_to_job.contains_key(&artifact_id),
"second Enqueue sent for a known artifact"
@@ -339,7 +338,7 @@ async fn handle_worker_concluded(
// this can't be None;
// qed.
let job_data = never_none!(queue.jobs.remove(job));
- let artifact_id = job_data.pvf.as_artifact_id();
+ let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf);
queue.artifact_id_to_job.remove(&artifact_id);
@@ -425,7 +424,7 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat
async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
let job_data = &mut queue.jobs[job];
- let artifact_id = job_data.pvf.as_artifact_id();
+ let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf);
let artifact_path = artifact_id.path(&queue.cache_path);
job_data.worker = Some(worker);
@@ -488,11 +487,10 @@ pub fn start(
#[cfg(test)]
mod tests {
use super::*;
- use crate::{
- error::PrepareError, host::tests::TEST_PREPARATION_TIMEOUT, prepare::PrepareStats,
- };
+ use crate::host::tests::TEST_PREPARATION_TIMEOUT;
use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt};
+ use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats};
use slotmap::SlotMap;
use std::task::Poll;
@@ -616,7 +614,10 @@ mod tests {
result: Ok(PrepareStats::default()),
});
- assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
+ assert_eq!(
+ test.poll_and_recv_from_queue().await.artifact_id,
+ ArtifactId::from_pvf_prep_data(&pvf(1))
+ );
}
#[tokio::test]
@@ -735,7 +736,10 @@ mod tests {
// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
- assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
+ assert_eq!(
+ test.poll_and_recv_from_queue().await.artifact_id,
+ ArtifactId::from_pvf_prep_data(&pvf(1))
+ );
}
#[tokio::test]
diff --git a/node/core/pvf/src/prepare/worker_intf.rs b/node/core/pvf/src/prepare/worker_intf.rs
index daf94aadc672..47522d3f0856 100644
--- a/node/core/pvf/src/prepare/worker_intf.rs
+++ b/node/core/pvf/src/prepare/worker_intf.rs
@@ -17,17 +17,20 @@
//! Host interface to the prepare worker.
use crate::{
- error::{PrepareError, PrepareResult},
metrics::Metrics,
- prepare::PrepareStats,
- pvf::PvfPrepData,
- worker_common::{
- framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker,
- SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+ worker_intf::{
+ path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker, SpawnErr, WorkerHandle,
+ JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
use parity_scale_codec::{Decode, Encode};
+use polkadot_node_core_pvf_common::{
+ error::{PrepareError, PrepareResult},
+ framed_recv, framed_send,
+ prepare::PrepareStats,
+ pvf::PvfPrepData,
+};
use sp_core::hexdisplay::HexDisplay;
use std::{
diff --git a/node/core/pvf/worker/src/testing.rs b/node/core/pvf/src/testing.rs
similarity index 93%
rename from node/core/pvf/worker/src/testing.rs
rename to node/core/pvf/src/testing.rs
index 7497d4aed31c..cc07d7aeef02 100644
--- a/node/core/pvf/worker/src/testing.rs
+++ b/node/core/pvf/src/testing.rs
@@ -19,6 +19,9 @@
//! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final
//! artifact even for production builds.
+#[doc(hidden)]
+pub use crate::worker_intf::{spawn_with_program_path, SpawnErr};
+
use polkadot_primitives::ExecutorParams;
/// A function that emulates the stitches together behaviors of the preparation and the execution
@@ -27,7 +30,8 @@ pub fn validate_candidate(
code: &[u8],
params: &[u8],
) -> Result, Box> {
- use crate::executor_intf::{prepare, prevalidate, Executor};
+ use polkadot_node_core_pvf_execute_worker::Executor;
+ use polkadot_node_core_pvf_prepare_worker::{prepare, prevalidate};
let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024)
.expect("Decompressing code failed");
diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_intf.rs
similarity index 100%
rename from node/core/pvf/src/worker_common.rs
rename to node/core/pvf/src/worker_intf.rs
diff --git a/node/core/pvf/worker/tests/it/adder.rs b/node/core/pvf/tests/it/adder.rs
similarity index 100%
rename from node/core/pvf/worker/tests/it/adder.rs
rename to node/core/pvf/tests/it/adder.rs
diff --git a/node/core/pvf/worker/tests/it/main.rs b/node/core/pvf/tests/it/main.rs
similarity index 100%
rename from node/core/pvf/worker/tests/it/main.rs
rename to node/core/pvf/tests/it/main.rs
diff --git a/node/core/pvf/worker/tests/it/worker_common.rs b/node/core/pvf/tests/it/worker_common.rs
similarity index 100%
rename from node/core/pvf/worker/tests/it/worker_common.rs
rename to node/core/pvf/tests/it/worker_common.rs
diff --git a/node/core/pvf/worker/src/lib.rs b/node/core/pvf/worker/src/lib.rs
deleted file mode 100644
index 456362cf8f57..000000000000
--- a/node/core/pvf/worker/src/lib.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-mod common;
-mod execute;
-mod executor_intf;
-mod memory_stats;
-mod prepare;
-
-#[doc(hidden)]
-pub mod testing;
-
-#[doc(hidden)]
-pub use sp_tracing;
-
-pub use execute::worker_entrypoint as execute_worker_entrypoint;
-pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
-
-pub use executor_intf::{prepare, prevalidate};
-
-// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
-// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-worker=trace`.
-const LOG_TARGET: &str = "parachain::pvf-worker";
-
-/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for
-/// spawning the desired worker.
-#[macro_export(local_inner_macros)]
-macro_rules! decl_worker_main {
- ($command:tt) => {
- fn main() {
- $crate::sp_tracing::try_init_simple();
-
- let args = std::env::args().collect::>();
-
- let mut version = None;
- let mut socket_path: &str = "";
-
- for i in 1..args.len() {
- match args[i].as_ref() {
- "--socket-path" => socket_path = args[i + 1].as_str(),
- "--node-version" => version = Some(args[i + 1].as_str()),
- _ => (),
- }
- }
-
- decl_worker_main_command!($command, socket_path, version)
- }
- };
-}
-
-#[macro_export]
-#[doc(hidden)]
-macro_rules! decl_worker_main_command {
- (prepare, $socket_path:expr, $version: expr) => {
- $crate::prepare_worker_entrypoint(&$socket_path, $version)
- };
- (execute, $socket_path:expr, $version: expr) => {
- $crate::execute_worker_entrypoint(&$socket_path, $version)
- };
-}
diff --git a/node/malus/Cargo.toml b/node/malus/Cargo.toml
index a36822b041a3..8e23e623174f 100644
--- a/node/malus/Cargo.toml
+++ b/node/malus/Cargo.toml
@@ -20,7 +20,8 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" }
polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator" }
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
polkadot-node-core-backing = { path = "../core/backing" }
-polkadot-node-core-pvf-worker = { path = "../core/pvf/worker" }
+polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" }
+polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-primitives = { path = "../../primitives" }
color-eyre = { version = "0.6.1", default-features = false }
diff --git a/node/malus/src/malus.rs b/node/malus/src/malus.rs
index 36cf0cca06bf..d09f8be990a4 100644
--- a/node/malus/src/malus.rs
+++ b/node/malus/src/malus.rs
@@ -97,7 +97,7 @@ impl MalusCli {
#[cfg(not(target_os = "android"))]
{
- polkadot_node_core_pvf_worker::prepare_worker_entrypoint(
+ polkadot_node_core_pvf_prepare_worker::worker_entrypoint(
&cmd.socket_path,
None,
);
@@ -111,7 +111,7 @@ impl MalusCli {
#[cfg(not(target_os = "android"))]
{
- polkadot_node_core_pvf_worker::execute_worker_entrypoint(
+ polkadot_node_core_pvf_execute_worker::worker_entrypoint(
&cmd.socket_path,
None,
);
diff --git a/node/test/performance-test/Cargo.toml b/node/test/performance-test/Cargo.toml
index 70f072c03ae1..4e3001b3ee66 100644
--- a/node/test/performance-test/Cargo.toml
+++ b/node/test/performance-test/Cargo.toml
@@ -10,7 +10,7 @@ quote = "1.0.26"
env_logger = "0.9"
log = "0.4"
-polkadot-node-core-pvf-worker = { path = "../../core/pvf/worker" }
+polkadot-node-core-pvf-prepare-worker = { path = "../../core/pvf/prepare-worker" }
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
diff --git a/node/test/performance-test/src/lib.rs b/node/test/performance-test/src/lib.rs
index 1afa43cc62ba..15073912654a 100644
--- a/node/test/performance-test/src/lib.rs
+++ b/node/test/performance-test/src/lib.rs
@@ -65,9 +65,9 @@ pub fn measure_pvf_prepare(wasm_code: &[u8]) -> Result
.or(Err(PerfCheckError::CodeDecompressionFailed))?;
// Recreate the pipeline from the pvf prepare worker.
- let blob =
- polkadot_node_core_pvf_worker::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?;
- polkadot_node_core_pvf_worker::prepare(blob, &ExecutorParams::default())
+ let blob = polkadot_node_core_pvf_prepare_worker::prevalidate(code.as_ref())
+ .map_err(PerfCheckError::from)?;
+ polkadot_node_core_pvf_prepare_worker::prepare(blob, &ExecutorParams::default())
.map_err(PerfCheckError::from)?;
Ok(start.elapsed())
diff --git a/parachain/test-parachains/adder/collator/Cargo.toml b/parachain/test-parachains/adder/collator/Cargo.toml
index ee20cb0b0d17..7fe4aefc688d 100644
--- a/parachain/test-parachains/adder/collator/Cargo.toml
+++ b/parachain/test-parachains/adder/collator/Cargo.toml
@@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
# This one is tricky. Even though it is not used directly by the collator, we still need it for the
# `puppet_worker` binary, which is required for the integration test. However, this shouldn't be
# a big problem since it is used transitively anyway.
-polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" }
+polkadot-node-core-pvf = { path = "../../../../node/core/pvf" }
[dev-dependencies]
polkadot-parachain = { path = "../../.." }
diff --git a/parachain/test-parachains/adder/collator/bin/puppet_worker.rs b/parachain/test-parachains/adder/collator/bin/puppet_worker.rs
index ddd81971292b..7f93519d8454 100644
--- a/parachain/test-parachains/adder/collator/bin/puppet_worker.rs
+++ b/parachain/test-parachains/adder/collator/bin/puppet_worker.rs
@@ -14,4 +14,4 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-polkadot_node_core_pvf_worker::decl_puppet_worker_main!();
+polkadot_node_core_pvf::decl_puppet_worker_main!();
diff --git a/parachain/test-parachains/adder/collator/src/lib.rs b/parachain/test-parachains/adder/collator/src/lib.rs
index 4b2b9248de22..02a4598f9e47 100644
--- a/parachain/test-parachains/adder/collator/src/lib.rs
+++ b/parachain/test-parachains/adder/collator/src/lib.rs
@@ -272,7 +272,7 @@ mod tests {
}
fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
- use polkadot_node_core_pvf_worker::testing::validate_candidate;
+ use polkadot_node_core_pvf::testing::validate_candidate;
let block_data = match collation.proof_of_validity {
MaybeCompressedPoV::Raw(pov) => pov.block_data,
diff --git a/parachain/test-parachains/undying/collator/Cargo.toml b/parachain/test-parachains/undying/collator/Cargo.toml
index 1b2ccf3be0ca..2b9d80401f5d 100644
--- a/parachain/test-parachains/undying/collator/Cargo.toml
+++ b/parachain/test-parachains/undying/collator/Cargo.toml
@@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
# This one is tricky. Even though it is not used directly by the collator, we still need it for the
# `puppet_worker` binary, which is required for the integration test. However, this shouldn't be
# a big problem since it is used transitively anyway.
-polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" }
+polkadot-node-core-pvf = { path = "../../../../node/core/pvf" }
[dev-dependencies]
polkadot-parachain = { path = "../../.." }
diff --git a/parachain/test-parachains/undying/collator/bin/puppet_worker.rs b/parachain/test-parachains/undying/collator/bin/puppet_worker.rs
index ddd81971292b..7f93519d8454 100644
--- a/parachain/test-parachains/undying/collator/bin/puppet_worker.rs
+++ b/parachain/test-parachains/undying/collator/bin/puppet_worker.rs
@@ -14,4 +14,4 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-polkadot_node_core_pvf_worker::decl_puppet_worker_main!();
+polkadot_node_core_pvf::decl_puppet_worker_main!();
diff --git a/parachain/test-parachains/undying/collator/src/lib.rs b/parachain/test-parachains/undying/collator/src/lib.rs
index dcaf9b63296d..838590fa16f5 100644
--- a/parachain/test-parachains/undying/collator/src/lib.rs
+++ b/parachain/test-parachains/undying/collator/src/lib.rs
@@ -354,7 +354,7 @@ mod tests {
}
fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) {
- use polkadot_node_core_pvf_worker::testing::validate_candidate;
+ use polkadot_node_core_pvf::testing::validate_candidate;
let block_data = match collation.proof_of_validity {
MaybeCompressedPoV::Raw(pov) => pov.block_data,