diff --git a/Cargo.lock b/Cargo.lock index 0a004b3a7364..73d5acd6f6dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9028,16 +9028,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "os_pipe" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177" -dependencies = [ - "libc", - "windows-sys 0.48.0", -] - [[package]] name = "os_str_bytes" version = "6.5.1" @@ -12744,6 +12734,7 @@ dependencies = [ "futures", "landlock", "libc", + "nix 0.27.1", "parity-scale-codec", "polkadot-parachain-primitives", "polkadot-primitives", @@ -12764,10 +12755,10 @@ dependencies = [ name = "polkadot-node-core-pvf-execute-worker" version = "1.0.0" dependencies = [ + "cfg-if", "cpu-time", "libc", "nix 0.27.1", - "os_pipe", "parity-scale-codec", "polkadot-node-core-pvf-common", "polkadot-parachain-primitives", @@ -12784,7 +12775,6 @@ dependencies = [ "criterion 0.4.0", "libc", "nix 0.27.1", - "os_pipe", "parity-scale-codec", "polkadot-node-core-pvf-common", "polkadot-primitives", diff --git a/polkadot/node/core/pvf/common/Cargo.toml b/polkadot/node/core/pvf/common/Cargo.toml index b4f6ada9dbb7..f2ec505e623c 100644 --- a/polkadot/node/core/pvf/common/Cargo.toml +++ b/polkadot/node/core/pvf/common/Cargo.toml @@ -33,6 +33,7 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" } [target.'cfg(target_os = "linux")'.dependencies] landlock = "0.3.0" +nix = { version = "0.27.1", features = ["sched"] } seccompiler = "0.4.0" [dev-dependencies] diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index 5ba5b443e6a1..6b3becf524d7 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -92,10 +92,11 @@ pub enum JobError { TimedOut, #[error("An unexpected panic has occurred in the execution job: {0}")] Panic(String), + /// Some error occurred when interfacing with the kernel. + #[error("Error interfacing with the kernel: {0}")] + Kernel(String), #[error("Could not spawn the requested thread: {0}")] CouldNotSpawnThread(String), #[error("An error occurred in the CPU time monitor thread: {0}")] CpuTimeMonitorThread(String), - #[error("Could not set pdeathsig: {0}")] - CouldNotSetPdeathsig(String), } diff --git a/polkadot/node/core/pvf/common/src/executor_interface.rs b/polkadot/node/core/pvf/common/src/executor_interface.rs index e634940dbe65..4cd2f5c85eec 100644 --- a/polkadot/node/core/pvf/common/src/executor_interface.rs +++ b/polkadot/node/core/pvf/common/src/executor_interface.rs @@ -140,7 +140,7 @@ pub unsafe fn create_runtime_from_artifact_bytes( executor_params: &ExecutorParams, ) -> Result { let mut config = DEFAULT_CONFIG.clone(); - config.semantics = params_to_wasmtime_semantics(executor_params); + config.semantics = params_to_wasmtime_semantics(executor_params).0; sc_executor_wasmtime::create_runtime_from_artifact_bytes::( compiled_artifact_blob, @@ -148,7 +148,10 @@ pub unsafe fn create_runtime_from_artifact_bytes( ) } -pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Semantics { +/// Takes the default config and overwrites any settings with existing executor parameters. +/// +/// Returns the semantics as well as the stack limit (since we are guaranteed to have it). +pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> (Semantics, DeterministicStackLimit) { let mut sem = DEFAULT_CONFIG.semantics.clone(); let mut stack_limit = sem .deterministic_stack_limit @@ -169,8 +172,8 @@ pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Semantics { ExecutorParam::PvfExecTimeout(_, _) => (), /* Not used here */ } } - sem.deterministic_stack_limit = Some(stack_limit); - sem + sem.deterministic_stack_limit = Some(stack_limit.clone()); + (sem, stack_limit) } /// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds. @@ -187,7 +190,7 @@ pub fn prepare( blob: RuntimeBlob, executor_params: &ExecutorParams, ) -> Result, sc_executor_common::error::WasmError> { - let semantics = params_to_wasmtime_semantics(executor_params); + let (semantics, _) = params_to_wasmtime_semantics(executor_params); sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) } diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs index af4a7526e553..d891c06bf2ad 100644 --- a/polkadot/node/core/pvf/common/src/lib.rs +++ b/polkadot/node/core/pvf/common/src/lib.rs @@ -57,6 +57,8 @@ pub struct SecurityStatus { pub can_enable_seccomp: bool, /// Whether we are able to unshare the user namespace and change the filesystem root. pub can_unshare_user_namespace_and_change_root: bool, + /// Whether we are able to call `clone` with all sandboxing flags. + pub can_do_secure_clone: bool, } /// A handshake with information for the worker. diff --git a/polkadot/node/core/pvf/common/src/worker/mod.rs b/polkadot/node/core/pvf/common/src/worker/mod.rs index 9dcd535d5335..d7c95d9e7047 100644 --- a/polkadot/node/core/pvf/common/src/worker/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/mod.rs @@ -18,14 +18,19 @@ pub mod security; -use crate::{framed_recv_blocking, WorkerHandshake, LOG_TARGET}; +use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET}; use cpu_time::ProcessTime; use futures::never::Never; use parity_scale_codec::Decode; use std::{ any::Any, - fmt, io, - os::unix::net::UnixStream, + fmt::{self}, + fs::File, + io::{self, Read, Write}, + os::{ + fd::{AsRawFd, FromRawFd, RawFd}, + unix::net::UnixStream, + }, path::PathBuf, sync::mpsc::{Receiver, RecvTimeoutError}, time::Duration, @@ -78,7 +83,7 @@ macro_rules! decl_worker_main { "--check-can-enable-landlock" => { #[cfg(target_os = "linux")] - let status = if let Err(err) = security::landlock::check_is_fully_enabled() { + let status = if let Err(err) = security::landlock::check_can_fully_enable() { // Write the error to stderr, log it on the host-side. eprintln!("{}", err); -1 @@ -91,7 +96,7 @@ macro_rules! decl_worker_main { }, "--check-can-enable-seccomp" => { #[cfg(all(target_os = "linux", target_arch = "x86_64"))] - let status = if let Err(err) = security::seccomp::check_is_fully_enabled() { + let status = if let Err(err) = security::seccomp::check_can_fully_enable() { // Write the error to stderr, log it on the host-side. eprintln!("{}", err); -1 @@ -107,7 +112,7 @@ macro_rules! decl_worker_main { let cache_path_tempdir = std::path::Path::new(&args[2]); #[cfg(target_os = "linux")] let status = if let Err(err) = - security::change_root::check_is_fully_enabled(&cache_path_tempdir) + security::change_root::check_can_fully_enable(&cache_path_tempdir) { // Write the error to stderr, log it on the host-side. eprintln!("{}", err); @@ -119,6 +124,21 @@ macro_rules! decl_worker_main { let status = -1; std::process::exit(status) }, + "--check-can-do-secure-clone" => { + #[cfg(target_os = "linux")] + // SAFETY: new process is spawned within a single threaded process. This + // invariant is enforced by tests. + let status = if let Err(err) = unsafe { security::clone::check_can_fully_clone() } { + // Write the error to stderr, log it on the host-side. + eprintln!("{}", err); + -1 + } else { + 0 + }; + #[cfg(not(target_os = "linux"))] + let status = -1; + std::process::exit(status) + }, "test-sleep" => { std::thread::sleep(std::time::Duration::from_secs(5)); @@ -171,6 +191,84 @@ macro_rules! decl_worker_main { }; } +//taken from the os_pipe crate. Copied here to reduce one dependency and +// because its type-safe abstractions do not play well with nix's clone +#[cfg(not(target_os = "macos"))] +pub fn pipe2_cloexec() -> io::Result<(libc::c_int, libc::c_int)> { + let mut fds: [libc::c_int; 2] = [0; 2]; + let res = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) }; + if res != 0 { + return Err(io::Error::last_os_error()) + } + Ok((fds[0], fds[1])) +} + +#[cfg(target_os = "macos")] +pub fn pipe2_cloexec() -> io::Result<(libc::c_int, libc::c_int)> { + let mut fds: [libc::c_int; 2] = [0; 2]; + let res = unsafe { libc::pipe(fds.as_mut_ptr()) }; + if res != 0 { + return Err(io::Error::last_os_error()) + } + let res = unsafe { libc::fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC) }; + if res != 0 { + return Err(io::Error::last_os_error()) + } + let res = unsafe { libc::fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC) }; + if res != 0 { + return Err(io::Error::last_os_error()) + } + Ok((fds[0], fds[1])) +} + +/// A wrapper around a file descriptor used to encapsulate and restrict +/// functionality for pipe operations. +pub struct PipeFd { + file: File, +} + +impl AsRawFd for PipeFd { + /// Returns the raw file descriptor associated with this `PipeFd` + fn as_raw_fd(&self) -> RawFd { + self.file.as_raw_fd() + } +} + +impl FromRawFd for PipeFd { + /// Creates a new `PipeFd` instance from a raw file descriptor. + /// + /// # Safety + /// + /// The fd passed in must be an owned file descriptor; in particular, it must be open. + unsafe fn from_raw_fd(fd: RawFd) -> Self { + PipeFd { file: File::from_raw_fd(fd) } + } +} + +impl Read for PipeFd { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.file.read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.file.read_to_end(buf) + } +} + +impl Write for PipeFd { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.file.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.file.flush() + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.file.write_all(buf) + } +} + /// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the /// child process. pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); @@ -192,14 +290,12 @@ impl fmt::Display for WorkerKind { } } -// Some fields are only used for logging, and dead-code analysis ignores Debug. -#[allow(dead_code)] #[derive(Debug)] pub struct WorkerInfo { - pid: u32, - kind: WorkerKind, - version: Option, - worker_dir_path: PathBuf, + pub pid: u32, + pub kind: WorkerKind, + pub version: Option, + pub worker_dir_path: PathBuf, } // NOTE: The worker version must be passed in so that we accurately get the version of the worker, @@ -218,7 +314,7 @@ pub fn run_worker( worker_version: Option<&str>, mut event_loop: F, ) where - F: FnMut(UnixStream, PathBuf) -> io::Result, + F: FnMut(UnixStream, &WorkerInfo, SecurityStatus) -> io::Result, { #[cfg_attr(not(target_os = "linux"), allow(unused_mut))] let mut worker_info = WorkerInfo { @@ -250,11 +346,8 @@ pub fn run_worker( } // Make sure that we can read the worker dir path, and log its contents. - let entries = || -> Result, io::Error> { - std::fs::read_dir(&worker_info.worker_dir_path)? - .map(|res| res.map(|e| e.file_name())) - .collect() - }(); + let entries: io::Result> = std::fs::read_dir(&worker_info.worker_dir_path) + .and_then(|d| d.map(|res| res.map(|e| e.file_name())).collect()); match entries { Ok(entries) => gum::trace!(target: LOG_TARGET, ?worker_info, "content of worker dir: {:?}", entries), @@ -284,6 +377,22 @@ pub fn run_worker( { gum::trace!(target: LOG_TARGET, ?security_status, "Enabling security features"); + // First, make sure env vars were cleared, to match the environment we perform the checks + // within. (In theory, running checks with different env vars could result in different + // outcomes of the checks.) + if !security::check_env_vars_were_cleared(&worker_info) { + let err = "not all env vars were cleared when spawning the process"; + gum::error!( + target: LOG_TARGET, + ?worker_info, + "{}", + err + ); + if security_status.secure_validator_mode { + worker_shutdown(worker_info, err); + } + } + // Call based on whether we can change root. Error out if it should work but fails. // // NOTE: This should not be called in a multi-threaded context (i.e. inside the tokio @@ -337,23 +446,10 @@ pub fn run_worker( } } } - - if !security::check_env_vars_were_cleared(&worker_info) { - let err = "not all env vars were cleared when spawning the process"; - gum::error!( - target: LOG_TARGET, - ?worker_info, - "{}", - err - ); - if security_status.secure_validator_mode { - worker_shutdown(worker_info, err); - } - } } // Run the main worker loop. - let err = event_loop(stream, worker_info.worker_dir_path.clone()) + let err = event_loop(stream, &worker_info, security_status) // It's never `Ok` because it's `Ok(Never)`. .unwrap_err(); diff --git a/polkadot/node/core/pvf/common/src/worker/security/change_root.rs b/polkadot/node/core/pvf/common/src/worker/security/change_root.rs index 375cc8ff6f28..9ec66906819f 100644 --- a/polkadot/node/core/pvf/common/src/worker/security/change_root.rs +++ b/polkadot/node/core/pvf/common/src/worker/security/change_root.rs @@ -54,8 +54,7 @@ pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> { /// /// NOTE: This should not be called in a multi-threaded context. `unshare(2)`: /// "CLONE_NEWUSER requires that the calling process is not threaded." -#[cfg(target_os = "linux")] -pub fn check_is_fully_enabled(tempdir: &Path) -> Result<()> { +pub fn check_can_fully_enable(tempdir: &Path) -> Result<()> { let worker_dir_path = tempdir.to_owned(); try_restrict(&WorkerInfo { pid: std::process::id(), @@ -69,7 +68,6 @@ pub fn check_is_fully_enabled(tempdir: &Path) -> Result<()> { /// /// NOTE: This should not be called in a multi-threaded context. `unshare(2)`: /// "CLONE_NEWUSER requires that the calling process is not threaded." -#[cfg(target_os = "linux")] fn try_restrict(worker_info: &WorkerInfo) -> Result<()> { // TODO: Remove this once this is stable: https://github.com/rust-lang/rust/issues/105723 macro_rules! cstr_ptr { @@ -78,12 +76,6 @@ fn try_restrict(worker_info: &WorkerInfo) -> Result<()> { }; } - gum::trace!( - target: LOG_TARGET, - ?worker_info, - "unsharing the user namespace and calling pivot_root", - ); - let worker_dir_path_c = CString::new(worker_info.worker_dir_path.as_os_str().as_bytes()) .expect("on unix; the path will never contain 0 bytes; qed"); diff --git a/polkadot/node/core/pvf/common/src/worker/security/clone.rs b/polkadot/node/core/pvf/common/src/worker/security/clone.rs new file mode 100644 index 000000000000..707f68d18591 --- /dev/null +++ b/polkadot/node/core/pvf/common/src/worker/security/clone.rs @@ -0,0 +1,93 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Functionality for securing the job processes spawned by the workers using `clone`. If +//! unsupported, falls back to `fork`. + +use crate::{worker::WorkerInfo, LOG_TARGET}; +use nix::{ + errno::Errno, + sched::{CloneCb, CloneFlags}, + unistd::Pid, +}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("could not clone, errno: {0}")] + Clone(Errno), +} + +pub type Result = std::result::Result; + +/// Try to run clone(2) on the current worker. +/// +/// SAFETY: new process should be either spawned within a single threaded process, or use only +/// async-signal-safe functions. +pub unsafe fn clone_on_worker( + worker_info: &WorkerInfo, + have_unshare_newuser: bool, + cb: CloneCb, +) -> Result { + let flags = clone_flags(have_unshare_newuser); + + gum::trace!( + target: LOG_TARGET, + ?worker_info, + "calling clone with flags: {:?}", + flags + ); + + try_clone(cb, flags) +} + +/// Runs a check for clone(2) with all sandboxing flags and returns an error indicating whether it +/// can be fully enabled on the current Linux environment. +/// +/// SAFETY: new process should be either spawned within a single threaded process, or use only +/// async-signal-safe functions. +pub unsafe fn check_can_fully_clone() -> Result<()> { + try_clone(Box::new(|| 0), clone_flags(false)).map(|_pid| ()) +} + +/// Runs clone(2) with all sandboxing flags. +/// +/// SAFETY: new process should be either spawned within a single threaded process, or use only +/// async-signal-safe functions. +unsafe fn try_clone(cb: CloneCb, flags: CloneFlags) -> Result { + let mut stack = [0u8; 2 * 1024 * 1024]; + + nix::sched::clone(cb, stack.as_mut_slice(), flags, None).map_err(|errno| Error::Clone(errno)) +} + +/// Returns flags for `clone(2)`, including all the sandbox-related ones. +fn clone_flags(have_unshare_newuser: bool) -> CloneFlags { + // NOTE: CLONE_NEWUSER does not work in `clone` if we previously called `unshare` with this + // flag. On the other hand, if we did not call `unshare` we need this flag for the CAP_SYS_ADMIN + // capability. + let maybe_clone_newuser = + if have_unshare_newuser { CloneFlags::empty() } else { CloneFlags::CLONE_NEWUSER }; + // SIGCHLD flag is used to inform clone that the parent process is + // expecting a child termination signal, without this flag `waitpid` function + // return `ECHILD` error. + maybe_clone_newuser | + CloneFlags::CLONE_NEWCGROUP | + CloneFlags::CLONE_NEWIPC | + CloneFlags::CLONE_NEWNET | + CloneFlags::CLONE_NEWNS | + CloneFlags::CLONE_NEWPID | + CloneFlags::CLONE_NEWUTS | + CloneFlags::from_bits_retain(libc::SIGCHLD) +} diff --git a/polkadot/node/core/pvf/common/src/worker/security/landlock.rs b/polkadot/node/core/pvf/common/src/worker/security/landlock.rs index 211d12c2e443..10d00a0e2c66 100644 --- a/polkadot/node/core/pvf/common/src/worker/security/landlock.rs +++ b/polkadot/node/core/pvf/common/src/worker/security/landlock.rs @@ -112,7 +112,7 @@ pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> { // TODO: /// Runs a check for landlock in its own thread, and returns an error indicating whether the given /// landlock ABI is fully enabled on the current Linux environment. -pub fn check_is_fully_enabled() -> Result<()> { +pub fn check_can_fully_enable() -> Result<()> { match std::thread::spawn(|| try_restrict(std::iter::empty::<(PathBuf, AccessFs)>())).join() { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err), @@ -165,7 +165,7 @@ mod tests { #[test] fn restricted_thread_cannot_read_file() { // TODO: This would be nice: . - if check_is_fully_enabled().is_err() { + if check_can_fully_enable().is_err() { return } @@ -230,7 +230,7 @@ mod tests { #[test] fn restricted_thread_cannot_write_file() { // TODO: This would be nice: . - if check_is_fully_enabled().is_err() { + if check_can_fully_enable().is_err() { return } @@ -289,7 +289,7 @@ mod tests { #[test] fn restricted_thread_can_truncate_file() { // TODO: This would be nice: . - if check_is_fully_enabled().is_err() { + if check_can_fully_enable().is_err() { return } diff --git a/polkadot/node/core/pvf/common/src/worker/security/mod.rs b/polkadot/node/core/pvf/common/src/worker/security/mod.rs index ff4c712f6bdc..72d47235d47a 100644 --- a/polkadot/node/core/pvf/common/src/worker/security/mod.rs +++ b/polkadot/node/core/pvf/common/src/worker/security/mod.rs @@ -27,15 +27,17 @@ //! - Restrict networking by blocking socket creation and io_uring. //! - Remove env vars -use crate::{worker::WorkerInfo, LOG_TARGET}; - #[cfg(target_os = "linux")] pub mod change_root; #[cfg(target_os = "linux")] +pub mod clone; +#[cfg(target_os = "linux")] pub mod landlock; #[cfg(all(target_os = "linux", target_arch = "x86_64"))] pub mod seccomp; +use crate::{worker::WorkerInfo, LOG_TARGET}; + /// Require env vars to have been removed when spawning the process, to prevent malicious code from /// accessing them. pub fn check_env_vars_were_cleared(worker_info: &WorkerInfo) -> bool { diff --git a/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs b/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs index 4f270f75b345..f6100d236c8b 100644 --- a/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs +++ b/polkadot/node/core/pvf/common/src/worker/security/seccomp.rs @@ -110,7 +110,7 @@ pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> { /// Runs a check for seccomp in its own thread, and returns an error indicating whether seccomp with /// our rules is fully enabled on the current Linux environment. -pub fn check_is_fully_enabled() -> Result<()> { +pub fn check_can_fully_enable() -> Result<()> { match std::thread::spawn(|| try_restrict()).join() { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err), @@ -161,7 +161,7 @@ mod tests { #[test] fn sandboxed_thread_cannot_use_sockets() { // TODO: This would be nice: . - if check_is_fully_enabled().is_err() { + if check_can_fully_enable().is_err() { return } diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml index 83118041fdff..a0a987fb439a 100644 --- a/polkadot/node/core/pvf/execute-worker/Cargo.toml +++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml @@ -12,8 +12,8 @@ workspace = true [dependencies] cpu-time = "1.0.0" gum = { package = "tracing-gum", path = "../../../gum" } -os_pipe = "1.1.4" -nix = { version = "0.27.1", features = ["process", "resource"] } +cfg-if = "1.0" +nix = { version = "0.27.1", features = ["process", "resource", "sched"] } libc = "0.2.152" parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 6980f913d0d7..0cfa5a786946 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -16,7 +16,7 @@ //! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary. -pub use polkadot_node_core_pvf_common::{executor_interface::execute_artifact, worker_dir}; +pub use polkadot_node_core_pvf_common::executor_interface::execute_artifact; // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`. @@ -31,64 +31,41 @@ use nix::{ }, unistd::{ForkResult, Pid}, }; -use os_pipe::{self, PipeReader, PipeWriter}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::InternalValidationError, execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse}, + executor_interface::params_to_wasmtime_semantics, framed_recv_blocking, framed_send_blocking, worker::{ - cpu_time_monitor_loop, run_worker, stringify_panic_payload, + cpu_time_monitor_loop, pipe2_cloexec, run_worker, stringify_panic_payload, thread::{self, WaitOutcome}, - WorkerKind, + PipeFd, WorkerInfo, WorkerKind, }, + worker_dir, }; use polkadot_parachain_primitives::primitives::ValidationResult; -use polkadot_primitives::{executor_params::DEFAULT_NATIVE_STACK_MAX, ExecutorParams}; +use polkadot_primitives::ExecutorParams; use std::{ io::{self, Read}, - os::unix::net::UnixStream, + os::{ + fd::{AsRawFd, FromRawFd}, + unix::net::UnixStream, + }, path::PathBuf, process, sync::{mpsc::channel, Arc}, time::Duration, }; -// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. -// That native code does not create any stacks and just reuses the stack of the thread that -// wasmtime was invoked from. -// -// Also, we configure the executor to provide the deterministic stack and that requires -// supplying the amount of the native stack space that wasm is allowed to use. This is -// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`. -// -// There are quirks to that configuration knob: -// -// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check that -// the stack space is actually available. -// -// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes -// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the -// guard page and the Rust stack overflow handler will be triggered. That leads to an -// **abort**. -// -// 2. It cannot and does not limit the stack space consumed by Rust code. -// -// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code -// will abort and that will abort the process as well. -// -// Typically on Linux the main thread gets the stack size specified by the `ulimit` and -// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the -// DEFAULT_NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough. -// -// Hence we need to increase it. The simplest way to fix that is to spawn a thread with the desired -// stack limit. -// -// The reasoning why we pick this particular size is: -// -// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack. -/// The stack size for the execute thread. -pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + DEFAULT_NATIVE_STACK_MAX as usize; +/// The number of threads for the child process: +/// 1 - Main thread +/// 2 - Cpu monitor thread +/// 3 - Execute thread +/// +/// NOTE: The correctness of this value is enforced by a test. If the number of threads inside +/// the child process changes in the future, this value must be changed as well. +pub const EXECUTE_WORKER_THREAD_NUMBER: u32 = 3; /// Receives a handshake with information specific to the execute worker. fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result { @@ -145,17 +122,20 @@ pub fn worker_entrypoint( worker_dir_path, node_version, worker_version, - |mut stream, worker_dir_path| { - let worker_pid = process::id(); - let artifact_path = worker_dir::execute_artifact(&worker_dir_path); + |mut stream, worker_info, security_status| { + let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path); let Handshake { executor_params } = recv_execute_handshake(&mut stream)?; + let executor_params: Arc = Arc::new(executor_params); + let execute_thread_stack_size = max_stack_size(&executor_params); + loop { let (params, execution_timeout) = recv_request(&mut stream)?; gum::debug!( target: LOG_TARGET, - %worker_pid, + ?worker_info, + ?security_status, "worker: validating artifact {}", artifact_path.display(), ); @@ -172,7 +152,7 @@ pub fn worker_entrypoint( }, }; - let (pipe_reader, pipe_writer) = os_pipe::pipe()?; + let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?; let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { Ok(usage) => usage, @@ -182,44 +162,65 @@ pub fn worker_entrypoint( continue }, }; - - // SAFETY: new process is spawned within a single threaded process. This invariant - // is enforced by tests. - let response = match unsafe { nix::unistd::fork() } { - Err(errno) => internal_error_from_errno("fork", errno), - Ok(ForkResult::Child) => { - // Dropping the stream closes the underlying socket. We want to make sure - // that the sandboxed child can't get any kind of information from the - // outside world. The only IPC it should be able to do is sending its - // response over the pipe. - drop(stream); - // Drop the read end so we don't have too many FDs open. - drop(pipe_reader); - - handle_child_process( - pipe_writer, - compiled_artifact_blob, - executor_params, - params, + let stream_fd = stream.as_raw_fd(); + + let compiled_artifact_blob = Arc::new(compiled_artifact_blob); + let params = Arc::new(params); + + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + let result = if security_status.can_do_secure_clone { + handle_clone( + pipe_write_fd, + pipe_read_fd, + stream_fd, + &compiled_artifact_blob, + &executor_params, + ¶ms, + execution_timeout, + execute_thread_stack_size, + worker_info, + security_status.can_unshare_user_namespace_and_change_root, + usage_before, + )? + } else { + // Fall back to using fork. + handle_fork( + pipe_write_fd, + pipe_read_fd, + stream_fd, + &compiled_artifact_blob, + &executor_params, + ¶ms, + execution_timeout, + execute_thread_stack_size, + worker_info, + usage_before, + )? + }; + } else { + let result = handle_fork( + pipe_write_fd, + pipe_read_fd, + stream_fd, + &compiled_artifact_blob, + &executor_params, + ¶ms, execution_timeout, - ) - }, - Ok(ForkResult::Parent { child }) => { - // the read end will wait until all write ends have been closed, - // this drop is necessary to avoid deadlock - drop(pipe_writer); - - handle_parent_process( - pipe_reader, - child, - worker_pid, + execute_thread_stack_size, + worker_info, usage_before, - execution_timeout, - )? - }, - }; + )?; + } + } - send_response(&mut stream, response)?; + gum::trace!( + target: LOG_TARGET, + ?worker_info, + "worker: sending result to host: {:?}", + result + ); + send_response(&mut stream, result)?; } }, ); @@ -252,39 +253,122 @@ fn validate_using_artifact( JobResponse::Ok { result_descriptor } } +#[cfg(target_os = "linux")] +fn handle_clone( + pipe_write_fd: i32, + pipe_read_fd: i32, + stream_fd: i32, + compiled_artifact_blob: &Arc>, + executor_params: &Arc, + params: &Arc>, + execution_timeout: Duration, + execute_stack_size: usize, + worker_info: &WorkerInfo, + have_unshare_newuser: bool, + usage_before: Usage, +) -> io::Result { + use polkadot_node_core_pvf_common::worker::security; + + // SAFETY: new process is spawned within a single threaded process. This invariant + // is enforced by tests. Stack size being specified to ensure child doesn't overflow + match unsafe { + security::clone::clone_on_worker( + worker_info, + have_unshare_newuser, + Box::new(|| { + handle_child_process( + pipe_write_fd, + pipe_read_fd, + stream_fd, + Arc::clone(compiled_artifact_blob), + Arc::clone(executor_params), + Arc::clone(params), + execution_timeout, + execute_stack_size, + ) + }), + ) + } { + Ok(child) => handle_parent_process( + pipe_read_fd, + pipe_write_fd, + worker_info, + child, + usage_before, + execution_timeout, + ), + Err(security::clone::Error::Clone(errno)) => Ok(internal_error_from_errno("clone", errno)), + } +} + +fn handle_fork( + pipe_write_fd: i32, + pipe_read_fd: i32, + stream_fd: i32, + compiled_artifact_blob: &Arc>, + executor_params: &Arc, + params: &Arc>, + execution_timeout: Duration, + execute_worker_stack_size: usize, + worker_info: &WorkerInfo, + usage_before: Usage, +) -> io::Result { + // SAFETY: new process is spawned within a single threaded process. This invariant + // is enforced by tests. + match unsafe { nix::unistd::fork() } { + Ok(ForkResult::Child) => handle_child_process( + pipe_write_fd, + pipe_read_fd, + stream_fd, + Arc::clone(compiled_artifact_blob), + Arc::clone(executor_params), + Arc::clone(params), + execution_timeout, + execute_worker_stack_size, + ), + Ok(ForkResult::Parent { child }) => handle_parent_process( + pipe_read_fd, + pipe_write_fd, + worker_info, + child, + usage_before, + execution_timeout, + ), + Err(errno) => Ok(internal_error_from_errno("fork", errno)), + } +} + /// This is used to handle child process during pvf execute worker. -/// It execute the artifact and pipes back the response to the parent process -/// -/// # Arguments -/// -/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe. -/// -/// - `compiled_artifact_blob`: The artifact bytes from compiled by the prepare worker`. -/// -/// - `executor_params`: Deterministically serialized execution environment semantics. -/// -/// - `params`: Validation parameters. -/// -/// - `execution_timeout`: The timeout in `Duration`. +/// It executes the artifact and pipes back the response to the parent process. /// /// # Returns /// /// - pipe back `JobResponse` to the parent process. fn handle_child_process( - mut pipe_write: PipeWriter, - compiled_artifact_blob: Vec, - executor_params: ExecutorParams, - params: Vec, + pipe_write_fd: i32, + pipe_read_fd: i32, + stream_fd: i32, + compiled_artifact_blob: Arc>, + executor_params: Arc, + params: Arc>, execution_timeout: Duration, + execute_thread_stack_size: usize, ) -> ! { - // Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded). - // - // RACE: the worker may die before we install the death signal. In practice this is unlikely, - // and most of the time the job process should terminate on its own when it completes. - #[cfg(target_os = "linux")] - nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| { - send_child_response(&mut pipe_write, Err(JobError::CouldNotSetPdeathsig(err.to_string()))) - }); + // SAFETY: this is an open and owned file descriptor at this point. + let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) }; + + // Drop the read end so we don't have too many FDs open. + if let Err(errno) = nix::unistd::close(pipe_read_fd) { + send_child_response(&mut pipe_write, job_error_from_errno("closing pipe", errno)); + } + + // Dropping the stream closes the underlying socket. We want to make sure + // that the sandboxed child can't get any kind of information from the + // outside world. The only IPC it should be able to do is sending its + // response over the pipe. + if let Err(errno) = nix::unistd::close(stream_fd) { + send_child_response(&mut pipe_write, job_error_from_errno("closing stream", errno)); + } gum::debug!( target: LOG_TARGET, @@ -308,13 +392,12 @@ fn handle_child_process( send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string()))) }); - let executor_params_2 = executor_params.clone(); let execute_thread = thread::spawn_worker_thread_with_stack_size( "execute thread", - move || validate_using_artifact(&compiled_artifact_blob, &executor_params_2, ¶ms), + move || validate_using_artifact(&compiled_artifact_blob, &executor_params, ¶ms), Arc::clone(&condvar), WaitOutcome::Finished, - EXECUTE_THREAD_STACK_SIZE, + execute_thread_stack_size, ) .unwrap_or_else(|err| { send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string()))) @@ -343,28 +426,69 @@ fn handle_child_process( send_child_response(&mut pipe_write, response); } -/// Waits for child process to finish and handle child response from pipe. +/// Returns stack size based on the number of threads. +/// The stack size is represented by 2MiB * number_of_threads + native stack; /// -/// # Arguments +/// # Background /// -/// - `pipe_read`: A `PipeReader` used to read data from the child process. +/// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. +/// That native code does not create any stacks and just reuses the stack of the thread that +/// wasmtime was invoked from. /// -/// - `child`: The child pid. +/// Also, we configure the executor to provide the deterministic stack and that requires +/// supplying the amount of the native stack space that wasm is allowed to use. This is +/// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`. /// -/// - `usage_before`: Resource usage statistics before executing the child process. +/// There are quirks to that configuration knob: /// -/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`. +/// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check that +/// the stack space is actually available. +/// +/// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes +/// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the +/// guard page and the Rust stack overflow handler will be triggered. That leads to an +/// **abort**. +/// +/// 2. It cannot and does not limit the stack space consumed by Rust code. +/// +/// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code +/// will abort and that will abort the process as well. +/// +/// Typically on Linux the main thread gets the stack size specified by the `ulimit` and +/// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the +/// DEFAULT_NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough. +/// +/// Hence we need to increase it. The simplest way to fix that is to spawn an execute thread with +/// the desired stack limit. We must also make sure the job process has enough stack for *all* its +/// threads. This function can be used to get the stack size of either the execute thread or execute +/// job process. +fn max_stack_size(executor_params: &ExecutorParams) -> usize { + let (_sem, deterministic_stack_limit) = params_to_wasmtime_semantics(executor_params); + return (2 * 1024 * 1024 + deterministic_stack_limit.native_stack_max) as usize; +} + +/// Waits for child process to finish and handle child response from pipe. /// /// # Returns /// /// - The response, either `Ok` or some error state. fn handle_parent_process( - mut pipe_read: PipeReader, + pipe_read_fd: i32, + pipe_write_fd: i32, + worker_info: &WorkerInfo, job_pid: Pid, - worker_pid: u32, usage_before: Usage, timeout: Duration, ) -> io::Result { + // the read end will wait until all write ends have been closed, + // this drop is necessary to avoid deadlock + if let Err(errno) = nix::unistd::close(pipe_write_fd) { + return Ok(internal_error_from_errno("closing pipe write fd", errno)); + }; + + // SAFETY: pipe_read_fd is an open and owned file descriptor at this point. + let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) }; + // Read from the child. Don't decode unless the process exited normally, which we check later. let mut received_data = Vec::new(); pipe_read @@ -376,7 +500,7 @@ fn handle_parent_process( let status = nix::sys::wait::waitpid(job_pid, None); gum::trace!( target: LOG_TARGET, - %worker_pid, + ?worker_info, %job_pid, "execute worker received wait status from job: {:?}", status, @@ -396,7 +520,7 @@ fn handle_parent_process( if cpu_tv >= timeout { gum::warn!( target: LOG_TARGET, - %worker_pid, + ?worker_info, %job_pid, "execute job took {}ms cpu time, exceeded execute timeout {}ms", cpu_tv.as_millis(), @@ -429,7 +553,7 @@ fn handle_parent_process( Err(job_error) => { gum::warn!( target: LOG_TARGET, - %worker_pid, + ?worker_info, %job_pid, "execute job error: {}", job_error, @@ -490,14 +614,14 @@ fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result ! { +/// - `response`: Child process response +fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! { framed_send_blocking(pipe_write, response.encode().as_slice()) .unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE)); @@ -516,3 +640,7 @@ fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerRespo io::Error::last_os_error() ))) } + +fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult { + Err(JobError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error()))) +} diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index 2ef3dfc0e74b..7ed7d3295064 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -18,8 +18,7 @@ rayon = "1.5.1" tracking-allocator = { package = "staging-tracking-allocator", path = "../../../tracking-allocator" } tikv-jemalloc-ctl = { version = "0.5.0", optional = true } tikv-jemallocator = { version = "0.5.0", optional = true } -os_pipe = "1.1.4" -nix = { version = "0.27.1", features = ["process", "resource"] } +nix = { version = "0.27.1", features = ["process", "resource", "sched"] } parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] } diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index f77eed871ec9..82a56107ef53 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -18,8 +18,6 @@ mod memory_stats; -use polkadot_node_core_pvf_common::executor_interface::{prepare, prevalidate}; - // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`. const LOG_TARGET: &str = "parachain::pvf-prepare-worker"; @@ -37,7 +35,11 @@ use nix::{ }, unistd::{ForkResult, Pid}, }; -use os_pipe::{self, PipeReader, PipeWriter}; +use polkadot_node_core_pvf_common::{ + executor_interface::{prepare, prevalidate}, + worker::{pipe2_cloexec, PipeFd, WorkerInfo}, +}; + use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareWorkerResult}, @@ -57,10 +59,10 @@ use std::{ fs, io::{self, Read}, os::{ - fd::{AsRawFd, RawFd}, + fd::{AsRawFd, FromRawFd, RawFd}, unix::net::UnixStream, }, - path::PathBuf, + path::{Path, PathBuf}, process, sync::{mpsc::channel, Arc}, time::Duration, @@ -76,6 +78,16 @@ static ALLOC: TrackingAllocator = #[global_allocator] static ALLOC: TrackingAllocator = TrackingAllocator(std::alloc::System); +/// The number of threads for the child process: +/// 1 - Main thread +/// 2 - Cpu monitor thread +/// 3 - Memory tracker thread +/// 4 - Prepare thread +/// +/// NOTE: The correctness of this value is enforced by a test. If the number of threads inside +/// the child process changes in the future, this value must be changed as well. +pub const PREPARE_WORKER_THREAD_NUMBER: u32 = 4; + /// Contains the bytes for a successfully compiled artifact. #[derive(Encode, Decode)] pub struct CompiledArtifact(Vec); @@ -200,15 +212,15 @@ pub fn worker_entrypoint( worker_dir_path, node_version, worker_version, - |mut stream, worker_dir_path| { - let worker_pid = process::id(); - let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path); + |mut stream, worker_info, security_status| { + let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_info.worker_dir_path); loop { let pvf = recv_request(&mut stream)?; gum::debug!( target: LOG_TARGET, - %worker_pid, + ?worker_info, + ?security_status, "worker: preparing artifact", ); @@ -216,7 +228,7 @@ pub fn worker_entrypoint( let prepare_job_kind = pvf.prep_kind(); let executor_params = pvf.executor_params(); - let (pipe_reader, pipe_writer) = os_pipe::pipe()?; + let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?; let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { Ok(usage) => usage, @@ -227,46 +239,58 @@ pub fn worker_entrypoint( }, }; - // SAFETY: new process is spawned within a single threaded process. This invariant - // is enforced by tests. - let result = match unsafe { nix::unistd::fork() } { - Err(errno) => Err(error_from_errno("fork", errno)), - Ok(ForkResult::Child) => { - // Dropping the stream closes the underlying socket. We want to make sure - // that the sandboxed child can't get any kind of information from the - // outside world. The only IPC it should be able to do is sending its - // response over the pipe. - drop(stream); - // Drop the read end so we don't have too many FDs open. - drop(pipe_reader); - - handle_child_process( - pvf, - pipe_writer, + let stream_fd = stream.as_raw_fd(); + + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + let result = if security_status.can_do_secure_clone { + handle_clone( + &pvf, + pipe_write_fd, + pipe_read_fd, + stream_fd, + preparation_timeout, + prepare_job_kind, + &executor_params, + worker_info, + security_status.can_unshare_user_namespace_and_change_root, + &temp_artifact_dest, + usage_before, + ) + } else { + // Fall back to using fork. + handle_fork( + &pvf, + pipe_write_fd, + pipe_read_fd, + stream_fd, + preparation_timeout, + prepare_job_kind, + &executor_params, + worker_info, + &temp_artifact_dest, + usage_before, + ) + }; + } else { + let result = handle_fork( + &pvf, + pipe_write_fd, + pipe_read_fd, + stream_fd, preparation_timeout, prepare_job_kind, - executor_params, - ) - }, - Ok(ForkResult::Parent { child }) => { - // the read end will wait until all write ends have been closed, - // this drop is necessary to avoid deadlock - drop(pipe_writer); - - handle_parent_process( - pipe_reader, - worker_pid, - child, - temp_artifact_dest.clone(), + &executor_params, + worker_info, + &temp_artifact_dest, usage_before, - preparation_timeout, - ) - }, - }; + ); + } + } gum::trace!( target: LOG_TARGET, - %worker_pid, + ?worker_info, "worker: sending result to host: {:?}", result ); @@ -306,21 +330,94 @@ struct JobResponse { memory_stats: MemoryStats, } +#[cfg(target_os = "linux")] +fn handle_clone( + pvf: &PvfPrepData, + pipe_write_fd: i32, + pipe_read_fd: i32, + stream_fd: i32, + preparation_timeout: Duration, + prepare_job_kind: PrepareJobKind, + executor_params: &Arc, + worker_info: &WorkerInfo, + have_unshare_newuser: bool, + temp_artifact_dest: &Path, + usage_before: Usage, +) -> Result { + use polkadot_node_core_pvf_common::worker::security; + + // SAFETY: new process is spawned within a single threaded process. This invariant + // is enforced by tests. Stack size being specified to ensure child doesn't overflow + match unsafe { + security::clone::clone_on_worker( + worker_info, + have_unshare_newuser, + Box::new(|| { + handle_child_process( + pvf.clone(), + pipe_write_fd, + pipe_read_fd, + stream_fd, + preparation_timeout, + prepare_job_kind, + Arc::clone(&executor_params), + ) + }), + ) + } { + Ok(child) => handle_parent_process( + pipe_read_fd, + pipe_write_fd, + worker_info, + child, + temp_artifact_dest, + usage_before, + preparation_timeout, + ), + Err(security::clone::Error::Clone(errno)) => Err(error_from_errno("clone", errno)), + } +} + +fn handle_fork( + pvf: &PvfPrepData, + pipe_write_fd: i32, + pipe_read_fd: i32, + stream_fd: i32, + preparation_timeout: Duration, + prepare_job_kind: PrepareJobKind, + executor_params: &Arc, + worker_info: &WorkerInfo, + temp_artifact_dest: &Path, + usage_before: Usage, +) -> Result { + // SAFETY: new process is spawned within a single threaded process. This invariant + // is enforced by tests. + match unsafe { nix::unistd::fork() } { + Ok(ForkResult::Child) => handle_child_process( + pvf.clone(), + pipe_write_fd, + pipe_read_fd, + stream_fd, + preparation_timeout, + prepare_job_kind, + Arc::clone(executor_params), + ), + Ok(ForkResult::Parent { child }) => handle_parent_process( + pipe_read_fd, + pipe_write_fd, + worker_info, + child, + temp_artifact_dest, + usage_before, + preparation_timeout, + ), + Err(errno) => Err(error_from_errno("fork", errno)), + } +} + /// This is used to handle child process during pvf prepare worker. /// It prepares the artifact and tracks memory stats during preparation -/// and pipes back the response to the parent process -/// -/// # Arguments -/// -/// - `pvf`: `PvfPrepData` structure, containing data to prepare the artifact -/// -/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe. -/// -/// - `preparation_timeout`: The timeout in `Duration`. -/// -/// - `prepare_job_kind`: The kind of prepare job. -/// -/// - `executor_params`: Deterministically serialized execution environment semantics. +/// and pipes back the response to the parent process. /// /// # Returns /// @@ -329,19 +426,34 @@ struct JobResponse { /// - If success, pipe back `JobResponse`. fn handle_child_process( pvf: PvfPrepData, - mut pipe_write: PipeWriter, + pipe_write_fd: i32, + pipe_read_fd: i32, + stream_fd: i32, preparation_timeout: Duration, prepare_job_kind: PrepareJobKind, executor_params: Arc, ) -> ! { - // Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded). - // - // RACE: the worker may die before we install the death signal. In practice this is unlikely, - // and most of the time the job process should terminate on its own when it completes. - #[cfg(target_os = "linux")] - nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| { - send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string()))) - }); + // SAFETY: pipe_writer is an open and owned file descriptor at this point. + let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) }; + + // Drop the read end so we don't have too many FDs open. + if let Err(errno) = nix::unistd::close(pipe_read_fd) { + send_child_response( + &mut pipe_write, + JobResult::Err(error_from_errno("closing pipe", errno)), + ); + } + + // Dropping the stream closes the underlying socket. We want to make sure + // that the sandboxed child can't get any kind of information from the + // outside world. The only IPC it should be able to do is sending its + // response over the pipe. + if let Err(errno) = nix::unistd::close(stream_fd) { + send_child_response( + &mut pipe_write, + JobResult::Err(error_from_errno("error closing stream", errno)), + ); + } let worker_job_pid = process::id(); gum::debug!( @@ -489,20 +601,6 @@ fn handle_child_process( /// Waits for child process to finish and handle child response from pipe. /// -/// # Arguments -/// -/// - `pipe_read`: A `PipeReader` used to read data from the child process. -/// -/// - `child`: The child pid. -/// -/// - `temp_artifact_dest`: The destination `PathBuf` to write the temporary artifact file. -/// -/// - `worker_pid`: The PID of the child process. -/// -/// - `usage_before`: Resource usage statistics before executing the child process. -/// -/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`. -/// /// # Returns /// /// - If the child send response without an error, this function returns `Ok(PrepareStats)` @@ -512,13 +610,23 @@ fn handle_child_process( /// /// - If the child process timeout, it returns `PrepareError::TimedOut`. fn handle_parent_process( - mut pipe_read: PipeReader, - worker_pid: u32, + pipe_read_fd: i32, + pipe_write_fd: i32, + worker_info: &WorkerInfo, job_pid: Pid, - temp_artifact_dest: PathBuf, + temp_artifact_dest: &Path, usage_before: Usage, timeout: Duration, ) -> Result { + // the read end will wait until all write ends have been closed, + // this drop is necessary to avoid deadlock + if let Err(errno) = nix::unistd::close(pipe_write_fd) { + return Err(error_from_errno("closing pipe write fd", errno)); + }; + + // SAFETY: this is an open and owned file descriptor at this point. + let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) }; + // Read from the child. Don't decode unless the process exited normally, which we check later. let mut received_data = Vec::new(); pipe_read @@ -528,7 +636,7 @@ fn handle_parent_process( let status = nix::sys::wait::waitpid(job_pid, None); gum::trace!( target: LOG_TARGET, - %worker_pid, + ?worker_info, %job_pid, "prepare worker received wait status from job: {:?}", status, @@ -546,7 +654,7 @@ fn handle_parent_process( if cpu_tv >= timeout { gum::warn!( target: LOG_TARGET, - %worker_pid, + ?worker_info, %job_pid, "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", cpu_tv.as_millis(), @@ -581,13 +689,13 @@ fn handle_parent_process( // success. gum::debug!( target: LOG_TARGET, - %worker_pid, + ?worker_info, %job_pid, "worker: writing artifact to {}", temp_artifact_dest.display(), ); // Write to the temp file created by the host. - if let Err(err) = fs::write(&temp_artifact_dest, &artifact) { + if let Err(err) = fs::write(temp_artifact_dest, &artifact) { return Err(PrepareError::IoErr(err.to_string())) }; @@ -651,10 +759,10 @@ fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result ! { +fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! { framed_send_blocking(pipe_write, response.encode().as_slice()) .unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE)); diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 2668ce41583d..b07b3da4c267 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -906,7 +906,7 @@ pub(crate) mod tests { let _ = pulse.next().await.unwrap(); let el = start.elapsed().as_millis(); - assert!(el > 50 && el < 150, "{}", el); + assert!(el > 50 && el < 150, "pulse duration: {}", el); } } diff --git a/polkadot/node/core/pvf/src/security.rs b/polkadot/node/core/pvf/src/security.rs index 9d0d4cf49afe..f62a232abf27 100644 --- a/polkadot/node/core/pvf/src/security.rs +++ b/polkadot/node/core/pvf/src/security.rs @@ -32,14 +32,20 @@ use std::{fmt, path::Path}; pub async fn check_security_status(config: &Config) -> Result { let Config { prepare_worker_program_path, secure_validator_mode, cache_path, .. } = config; - let (landlock, seccomp, change_root) = join!( + let (landlock, seccomp, change_root, secure_clone) = join!( check_landlock(prepare_worker_program_path), check_seccomp(prepare_worker_program_path), - check_can_unshare_user_namespace_and_change_root(prepare_worker_program_path, cache_path) + check_can_unshare_user_namespace_and_change_root(prepare_worker_program_path, cache_path), + check_can_do_secure_clone(prepare_worker_program_path), ); - let full_security_status = - FullSecurityStatus::new(*secure_validator_mode, landlock, seccomp, change_root); + let full_security_status = FullSecurityStatus::new( + *secure_validator_mode, + landlock, + seccomp, + change_root, + secure_clone, + ); let security_status = full_security_status.as_partial(); if full_security_status.err_occurred() { @@ -73,6 +79,7 @@ impl FullSecurityStatus { landlock: SecureModeResult, seccomp: SecureModeResult, change_root: SecureModeResult, + secure_clone: SecureModeResult, ) -> Self { Self { partial: SecurityStatus { @@ -80,8 +87,9 @@ impl FullSecurityStatus { can_enable_landlock: landlock.is_ok(), can_enable_seccomp: seccomp.is_ok(), can_unshare_user_namespace_and_change_root: change_root.is_ok(), + can_do_secure_clone: secure_clone.is_ok(), }, - errs: [landlock, seccomp, change_root] + errs: [landlock, seccomp, change_root, secure_clone] .into_iter() .filter_map(|result| result.err()) .collect(), @@ -120,9 +128,10 @@ type SecureModeResult = std::result::Result<(), SecureModeError>; /// Errors related to enabling Secure Validator Mode. #[derive(Debug)] enum SecureModeError { - CannotEnableLandlock(String), + CannotEnableLandlock { err: String, abi: u8 }, CannotEnableSeccomp(String), CannotUnshareUserNamespaceAndChangeRoot(String), + CannotDoSecureClone(String), } impl SecureModeError { @@ -132,12 +141,16 @@ impl SecureModeError { match self { // Landlock is present on relatively recent Linuxes. This is optional if the unshare // capability is present, providing FS sandboxing a different way. - CannotEnableLandlock(_) => security_status.can_unshare_user_namespace_and_change_root, + CannotEnableLandlock { .. } => + security_status.can_unshare_user_namespace_and_change_root, // seccomp should be present on all modern Linuxes unless it's been disabled. CannotEnableSeccomp(_) => false, // Should always be present on modern Linuxes. If not, Landlock also provides FS // sandboxing, so don't enforce this. CannotUnshareUserNamespaceAndChangeRoot(_) => security_status.can_enable_landlock, + // We have not determined the kernel requirements for this capability, and it's also not + // necessary for FS or networking restrictions. + CannotDoSecureClone(_) => true, } } } @@ -146,9 +159,10 @@ impl fmt::Display for SecureModeError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use SecureModeError::*; match self { - CannotEnableLandlock(err) => write!(f, "Cannot enable landlock, a Linux 5.13+ kernel security feature: {err}"), + CannotEnableLandlock{err, abi} => write!(f, "Cannot enable landlock (ABI {abi}), a Linux 5.13+ kernel security feature: {err}"), CannotEnableSeccomp(err) => write!(f, "Cannot enable seccomp, a Linux-specific kernel security feature: {err}"), CannotUnshareUserNamespaceAndChangeRoot(err) => write!(f, "Cannot unshare user namespace and change root, which are Linux-specific kernel security features: {err}"), + CannotDoSecureClone(err) => write!(f, "Cannot call clone with all sandboxing flags, a Linux-specific kernel security features: {err}"), } } } @@ -208,32 +222,11 @@ async fn check_can_unshare_user_namespace_and_change_root( .map_err(|err| SecureModeError::CannotUnshareUserNamespaceAndChangeRoot( format!("could not create a temporary directory in {:?}: {}", cache_path, err) ))?; - match tokio::process::Command::new(prepare_worker_program_path) - .arg("--check-can-unshare-user-namespace-and-change-root") - .arg(cache_dir_tempdir.path()) - .output() - .await - { - Ok(output) if output.status.success() => Ok(()), - Ok(output) => { - let stderr = std::str::from_utf8(&output.stderr) - .expect("child process writes a UTF-8 string to stderr; qed") - .trim(); - if stderr.is_empty() { - Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot( - "not available".into() - )) - } else { - Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot( - format!("not available: {}", stderr) - )) - } - }, - Err(err) => - Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot( - format!("could not start child process: {}", err) - )), - } + spawn_process_for_security_check( + prepare_worker_program_path, + "--check-can-unshare-user-namespace-and-change-root", + &[cache_dir_tempdir.path()], + ).await.map_err(|err| SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(err)) } else { Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot( "only available on Linux".into() @@ -253,37 +246,17 @@ async fn check_landlock( ) -> SecureModeResult { cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { - match tokio::process::Command::new(prepare_worker_program_path) - .arg("--check-can-enable-landlock") - .output() - .await - { - Ok(output) if output.status.success() => Ok(()), - Ok(output) => { - let abi = - polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8; - let stderr = std::str::from_utf8(&output.stderr) - .expect("child process writes a UTF-8 string to stderr; qed") - .trim(); - if stderr.is_empty() { - Err(SecureModeError::CannotEnableLandlock( - format!("landlock ABI {} not available", abi) - )) - } else { - Err(SecureModeError::CannotEnableLandlock( - format!("not available: {}", stderr) - )) - } - }, - Err(err) => - Err(SecureModeError::CannotEnableLandlock( - format!("could not start child process: {}", err) - )), - } + let abi = polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8; + spawn_process_for_security_check( + prepare_worker_program_path, + "--check-can-enable-landlock", + std::iter::empty::<&str>(), + ).await.map_err(|err| SecureModeError::CannotEnableLandlock { err, abi }) } else { - Err(SecureModeError::CannotEnableLandlock( - "only available on Linux".into() - )) + Err(SecureModeError::CannotEnableLandlock { + err: "only available on Linux".into(), + abi: 0, + }) } } } @@ -301,31 +274,11 @@ async fn check_seccomp( if #[cfg(target_os = "linux")] { cfg_if::cfg_if! { if #[cfg(target_arch = "x86_64")] { - match tokio::process::Command::new(prepare_worker_program_path) - .arg("--check-can-enable-seccomp") - .output() - .await - { - Ok(output) if output.status.success() => Ok(()), - Ok(output) => { - let stderr = std::str::from_utf8(&output.stderr) - .expect("child process writes a UTF-8 string to stderr; qed") - .trim(); - if stderr.is_empty() { - Err(SecureModeError::CannotEnableSeccomp( - "not available".into() - )) - } else { - Err(SecureModeError::CannotEnableSeccomp( - format!("not available: {}", stderr) - )) - } - }, - Err(err) => - Err(SecureModeError::CannotEnableSeccomp( - format!("could not start child process: {}", err) - )), - } + spawn_process_for_security_check( + prepare_worker_program_path, + "--check-can-enable-seccomp", + std::iter::empty::<&str>(), + ).await.map_err(|err| SecureModeError::CannotEnableSeccomp(err)) } else { Err(SecureModeError::CannotEnableSeccomp( "only supported on CPUs from the x86_64 family (usually Intel or AMD)".into() @@ -348,24 +301,85 @@ async fn check_seccomp( } } +/// Check if we can call `clone` with all sandboxing flags, and return an error if not. +/// +/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible +/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on +/// success and -1 on failure. +async fn check_can_do_secure_clone( + #[cfg_attr(not(target_os = "linux"), allow(unused_variables))] + prepare_worker_program_path: &Path, +) -> SecureModeResult { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + spawn_process_for_security_check( + prepare_worker_program_path, + "--check-can-do-secure-clone", + std::iter::empty::<&str>(), + ).await.map_err(|err| SecureModeError::CannotDoSecureClone(err)) + } else { + Err(SecureModeError::CannotDoSecureClone( + "only available on Linux".into() + )) + } + } +} + +#[cfg(target_os = "linux")] +async fn spawn_process_for_security_check( + prepare_worker_program_path: &Path, + check_arg: &'static str, + extra_args: I, +) -> Result<(), String> +where + I: IntoIterator, + S: AsRef, +{ + let mut command = tokio::process::Command::new(prepare_worker_program_path); + // Clear env vars. (In theory, running checks with different env vars could result in different + // outcomes of the checks.) + command.env_clear(); + // Add back any env vars we want to keep. + if let Ok(value) = std::env::var("RUST_LOG") { + command.env("RUST_LOG", value); + } + + match command.arg(check_arg).args(extra_args).output().await { + Ok(output) if output.status.success() => Ok(()), + Ok(output) => { + let stderr = std::str::from_utf8(&output.stderr) + .expect("child process writes a UTF-8 string to stderr; qed") + .trim(); + if stderr.is_empty() { + Err("not available".into()) + } else { + Err(format!("not available: {}", stderr)) + } + }, + Err(err) => Err(format!("could not start child process: {}", err)), + } +} + #[cfg(test)] mod tests { use super::*; #[test] fn test_secure_mode_error_optionality() { - let err = SecureModeError::CannotEnableLandlock(String::new()); + let err = SecureModeError::CannotEnableLandlock { err: String::new(), abi: 3 }; assert!(err.is_allowed_in_secure_mode(&SecurityStatus { secure_validator_mode: true, can_enable_landlock: false, can_enable_seccomp: false, - can_unshare_user_namespace_and_change_root: true + can_unshare_user_namespace_and_change_root: true, + can_do_secure_clone: true, })); assert!(!err.is_allowed_in_secure_mode(&SecurityStatus { secure_validator_mode: true, can_enable_landlock: false, can_enable_seccomp: true, - can_unshare_user_namespace_and_change_root: false + can_unshare_user_namespace_and_change_root: false, + can_do_secure_clone: false, })); let err = SecureModeError::CannotEnableSeccomp(String::new()); @@ -373,13 +387,15 @@ mod tests { secure_validator_mode: true, can_enable_landlock: false, can_enable_seccomp: false, - can_unshare_user_namespace_and_change_root: true + can_unshare_user_namespace_and_change_root: true, + can_do_secure_clone: true, })); assert!(!err.is_allowed_in_secure_mode(&SecurityStatus { secure_validator_mode: true, can_enable_landlock: false, can_enable_seccomp: true, - can_unshare_user_namespace_and_change_root: false + can_unshare_user_namespace_and_change_root: false, + can_do_secure_clone: false, })); let err = SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(String::new()); @@ -387,13 +403,31 @@ mod tests { secure_validator_mode: true, can_enable_landlock: true, can_enable_seccomp: false, - can_unshare_user_namespace_and_change_root: false + can_unshare_user_namespace_and_change_root: false, + can_do_secure_clone: false, })); assert!(!err.is_allowed_in_secure_mode(&SecurityStatus { secure_validator_mode: true, can_enable_landlock: false, can_enable_seccomp: true, - can_unshare_user_namespace_and_change_root: false + can_unshare_user_namespace_and_change_root: false, + can_do_secure_clone: false, + })); + + let err = SecureModeError::CannotDoSecureClone(String::new()); + assert!(err.is_allowed_in_secure_mode(&SecurityStatus { + secure_validator_mode: true, + can_enable_landlock: true, + can_enable_seccomp: true, + can_unshare_user_namespace_and_change_root: true, + can_do_secure_clone: true, + })); + assert!(err.is_allowed_in_secure_mode(&SecurityStatus { + secure_validator_mode: false, + can_enable_landlock: false, + can_enable_seccomp: false, + can_unshare_user_namespace_and_change_root: false, + can_do_secure_clone: false, })); } } diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 15b341dc094c..bcc10749e746 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -457,10 +457,13 @@ async fn all_security_features_work() { assert_eq!( host.security_status().await, SecurityStatus { + // Disabled in tests to not enforce the presence of security features. This CI-only test + // is the only one that tests them. secure_validator_mode: false, can_enable_landlock, can_enable_seccomp: true, can_unshare_user_namespace_and_change_root: true, + can_do_secure_clone: true, } ); } diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index 3ea03339a839..e989eb874ba9 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -94,7 +94,7 @@ fn find_process_by_sid_and_name( found } -/// Sets up the test and makes sure everything gets cleaned up after. +/// Sets up the test. /// /// We run the runtime manually because `#[tokio::test]` doesn't work in `rusty_fork_test!`. fn test_wrapper(f: F) @@ -112,14 +112,6 @@ where // Pass a clone of the host so that it does not get dropped after. f(host.clone(), sid).await; - - // Sleep to give processes a chance to get cleaned up, preventing races in the next step. - tokio::time::sleep(Duration::from_millis(500)).await; - - // Make sure job processes got cleaned up. Pass `is_direct_child: false` to target the - // job processes. - assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none()); - assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none()); }); } @@ -127,7 +119,7 @@ where // then finding the child process that matches the session ID and expected process name and doing // something with that child. rusty_fork_test! { - // Everything succeeded. All created subprocesses for jobs should get cleaned up, to avoid memory leaks. + // Everything succeeds. #[test] fn successful_prepare_and_validate() { test_wrapper(|host, _sid| async move { @@ -331,7 +323,7 @@ rusty_fork_test! { // monitor, and memory tracking. assert_eq!( get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false), - 4 + polkadot_node_core_pvf_prepare_worker::PREPARE_WORKER_THREAD_NUMBER as i64, ); // End the test. @@ -374,7 +366,7 @@ rusty_fork_test! { // time monitor. assert_eq!( get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false), - 3 + polkadot_node_core_pvf_execute_worker::EXECUTE_WORKER_THREAD_NUMBER as i64, ); // End the test. diff --git a/prdoc/pr_2477-use-clone-instead-of-fork-on-pvf.prdoc b/prdoc/pr_2477-use-clone-instead-of-fork-on-pvf.prdoc new file mode 100644 index 000000000000..ac9a0a501b6c --- /dev/null +++ b/prdoc/pr_2477-use-clone-instead-of-fork-on-pvf.prdoc @@ -0,0 +1,22 @@ +title: "Use clone instead of fork on pvf" + +doc: + - audience: Node Operator + description: | + For validators: Adds a new, optional security capability. + Most modern Linux machines should support it, otherwise you will get a warning like: + "- Optional: Cannot call clone with all sandboxing flags, a Linux-specific kernel security features: not available" + If you are already running in a secure environment such as a container, this may conflict with our security features; your only option may be to ignore the warning. + Otherwise, it is recommended to upgrade your Linux version! + +migrations: + db: [] + + runtime: [] + +crates: + - name: polkadot-node-core-pvf + - name: polkadot-node-core-pvf-prepare-worker + - name: polkadot-node-core-pvf-execute-worker + +host_functions: []