Skip to content

Commit

Permalink
PVF: ensure job processes are cleaned up, add tests (#2643)
Browse files Browse the repository at this point in the history
Fixes a potential memory leak.

`PR_SET_PDEATHSIG` is used to terminate children when the parent dies.
Note that this is subject to a race. There seems to be a raceless
alternative [here](https://stackoverflow.com/a/42498370/6085242), but
the concern is small enough that a bit more complexity doesn't seem
worth it. Left a bit more info in the code comment.
  • Loading branch information
mrcnski authored Dec 29, 2023
1 parent 45f4d9a commit 8bf5a1c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 80 deletions.
2 changes: 2 additions & 0 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ pub enum JobError {
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
#[error("Could not set pdeathsig: {0}")]
CouldNotSetPdeathsig(String),
}
9 changes: 9 additions & 0 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ fn handle_child_process(
params: Vec<u8>,
execution_timeout: Duration,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSetPdeathsig(err.to_string())))
});

gum::debug!(
target: LOG_TARGET,
worker_job_pid = %process::id(),
Expand Down
9 changes: 9 additions & 0 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ fn handle_child_process(
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});

let worker_job_pid = process::id();
gum::debug!(
target: LOG_TARGET,
Expand Down
6 changes: 3 additions & 3 deletions polkadot/node/core/pvf/src/worker_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path_clone,
?extra_args_clone,
?worker_dir_clone,
program_path = ?program_path_clone,
extra_args = ?extra_args_clone,
worker_dir = ?worker_dir_clone,
"error spawning worker: {}",
err,
);
Expand Down
155 changes: 78 additions & 77 deletions polkadot/node/core/pvf/tests/it/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
//! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs).

use super::TestHost;
use adder::{hash_state, BlockData, HeadData};
use assert_matches::assert_matches;
use parity_scale_codec::Encode;
use polkadot_node_core_pvf::{
InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams};
use polkadot_parachain_primitives::primitives::{
BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams,
};
use procfs::process;
use rusty_fork::rusty_fork_test;
use std::time::Duration;
use std::{future::Future, sync::Arc, time::Duration};

const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker";
const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker";
Expand All @@ -39,19 +43,21 @@ fn send_signal_by_sid_and_name(
is_direct_child: bool,
signal: i32,
) {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child)
.expect("Should have found the expected process");
assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0);
}
fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child)
.expect("Should have found the expected process");
process.stat().unwrap().num_threads
}

fn find_process_by_sid_and_name(
sid: i32,
exe_name: &'static str,
is_direct_child: bool,
) -> process::Process {
) -> Option<process::Process> {
let all_processes: Vec<process::Process> = process::all_processes()
.expect("Can't read /proc")
.filter_map(|p| match p {
Expand All @@ -68,7 +74,7 @@ fn find_process_by_sid_and_name(

let mut found = None;
for process in all_processes {
let stat = process.stat().unwrap();
let stat = process.stat().expect("/proc existed above. Potential race occurred");

if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) {
continue
Expand All @@ -85,24 +91,68 @@ fn find_process_by_sid_and_name(
}
found = Some(process);
}
found.expect("Should have found the expected process")
found
}

/// Sets up the test and makes sure everything gets cleaned up after.
///
/// We run the runtime manually because `#[tokio::test]` doesn't work in `rusty_fork_test!`.
fn test_wrapper<F, Fut>(f: F)
where
F: FnOnce(Arc<TestHost>, i32) -> Fut,
Fut: Future<Output = ()>,
{
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = Arc::new(TestHost::new().await);

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

// Pass a clone of the host so that it does not get dropped after.
f(host.clone(), sid).await;

// Sleep to give processes a chance to get cleaned up, preventing races in the next step.
tokio::time::sleep(Duration::from_millis(500)).await;

// Make sure job processes got cleaned up. Pass `is_direct_child: false` to target the
// job processes.
assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none());
assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none());
});
}

// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
// then doing something with the child process that matches the session ID and expected process
// name.
// then finding the child process that matches the session ID and expected process name and doing
// something with that child.
rusty_fork_test! {
// Everything succeeded. All created subprocesses for jobs should get cleaned up, to avoid memory leaks.
#[test]
fn successful_prepare_and_validate() {
test_wrapper(|host, _sid| async move {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
let block_data = BlockData { state: 0, add: 512 };
host
.validate_candidate(
adder::wasm_binary_unwrap(),
ValidationParams {
parent_head: GenericHeadData(parent_head.encode()),
block_data: GenericBlockData(block_data.encode()),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await
.unwrap();
})
}

// What happens when the prepare worker (not the job) times out?
#[test]
fn prepare_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand All @@ -120,14 +170,7 @@ rusty_fork_test! {
// What happens when the execute worker (not the job) times out?
#[test]
fn execute_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -137,7 +180,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand All @@ -161,14 +204,7 @@ rusty_fork_test! {
// What happens when the prepare worker dies in the middle of a job?
#[test]
fn prepare_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand All @@ -186,14 +222,7 @@ rusty_fork_test! {
// What happens when the execute worker dies in the middle of a job?
#[test]
fn execute_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -203,7 +232,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand All @@ -227,14 +256,7 @@ rusty_fork_test! {
// What happens when the forked prepare job dies in the middle of its job?
#[test]
fn forked_prepare_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand All @@ -256,14 +278,7 @@ rusty_fork_test! {
// What happens when the forked execute job dies in the middle of its job?
#[test]
fn forked_execute_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -273,7 +288,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand Down Expand Up @@ -301,14 +316,7 @@ rusty_fork_test! {
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_prepare_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let _ = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand Down Expand Up @@ -338,14 +346,7 @@ rusty_fork_test! {
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_execute_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -355,7 +356,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand Down

0 comments on commit 8bf5a1c

Please sign in to comment.