From b071fe21f01a583ee24ca8820fae4ef5df94d1a8 Mon Sep 17 00:00:00 2001 From: "Marcin S." Date: Wed, 6 Dec 2023 16:55:10 +0100 Subject: [PATCH 1/4] PVF: Add simple test to ensure job processes are cleaned up --- polkadot/node/core/pvf/tests/it/process.rs | 61 ++++++++++++++++++---- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index b742acb15d02..1e4d460cf47b 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -18,11 +18,15 @@ //! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs). use super::TestHost; +use adder::{hash_state, BlockData, HeadData}; use assert_matches::assert_matches; +use parity_scale_codec::Encode; use polkadot_node_core_pvf::{ InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError, }; -use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams}; +use polkadot_parachain_primitives::primitives::{ + BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams, +}; use procfs::process; use rusty_fork::rusty_fork_test; use std::time::Duration; @@ -39,11 +43,13 @@ fn send_signal_by_sid_and_name( is_direct_child: bool, signal: i32, ) { - let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child); + let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child) + .expect("Should have found the expected process"); assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0); } fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 { - let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child); + let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child) + .expect("Should have found the expected process"); process.stat().unwrap().num_threads } @@ -51,7 +57,7 @@ fn find_process_by_sid_and_name( sid: i32, exe_name: &'static str, is_direct_child: bool, -) -> process::Process { +) -> Option { let all_processes: Vec = process::all_processes() .expect("Can't read /proc") .filter_map(|p| match p { @@ -85,13 +91,46 @@ fn find_process_by_sid_and_name( } found = Some(process); } - found.expect("Should have found the expected process") + found } // Run these tests in their own processes with rusty-fork. They work by each creating a new session, -// then doing something with the child process that matches the session ID and expected process -// name. +// then finding the child process that matches the session ID and expected process name and doing +// something with that child. rusty_fork_test! { + // All created subprocesses for jobs should get cleaned up, to avoid memory leaks. + #[test] + fn job_processes_get_cleaned_up() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = TestHost::new().await; + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; + let block_data = BlockData { state: 0, add: 512 }; + host + .validate_candidate( + adder::wasm_binary_unwrap(), + ValidationParams { + parent_head: GenericHeadData(parent_head.encode()), + block_data: GenericBlockData(block_data.encode()), + relay_parent_number: 1, + relay_parent_storage_root: Default::default(), + }, + Default::default(), + ) + .await + .unwrap(); + + // Pass `is_direct_child: false` to target the job processes. + assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none()); + assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none()); + }) + } + // What happens when the prepare worker (not the job) times out? #[test] fn prepare_worker_timeout() { @@ -137,7 +176,7 @@ rusty_fork_test! { host.validate_candidate( binary, ValidationParams { - block_data: BlockData(Vec::new()), + block_data: GenericBlockData(Vec::new()), parent_head: Default::default(), relay_parent_number: 1, relay_parent_storage_root: Default::default(), @@ -203,7 +242,7 @@ rusty_fork_test! { host.validate_candidate( binary, ValidationParams { - block_data: BlockData(Vec::new()), + block_data: GenericBlockData(Vec::new()), parent_head: Default::default(), relay_parent_number: 1, relay_parent_storage_root: Default::default(), @@ -273,7 +312,7 @@ rusty_fork_test! { host.validate_candidate( binary, ValidationParams { - block_data: BlockData(Vec::new()), + block_data: GenericBlockData(Vec::new()), parent_head: Default::default(), relay_parent_number: 1, relay_parent_storage_root: Default::default(), @@ -355,7 +394,7 @@ rusty_fork_test! { host.validate_candidate( binary, ValidationParams { - block_data: BlockData(Vec::new()), + block_data: GenericBlockData(Vec::new()), parent_head: Default::default(), relay_parent_number: 1, relay_parent_storage_root: Default::default(), From 719ed44a6b70ca5ee25d923728415e4e1d27f028 Mon Sep 17 00:00:00 2001 From: "Marcin S." Date: Wed, 6 Dec 2023 20:04:47 +0100 Subject: [PATCH 2/4] Refactor process tests (they all check for job cleanup now) --- polkadot/node/core/pvf/tests/it/process.rs | 122 +++++++-------------- 1 file changed, 42 insertions(+), 80 deletions(-) diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index 1e4d460cf47b..3ea03339a839 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -29,7 +29,7 @@ use polkadot_parachain_primitives::primitives::{ }; use procfs::process; use rusty_fork::rusty_fork_test; -use std::time::Duration; +use std::{future::Future, sync::Arc, time::Duration}; const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker"; const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker"; @@ -74,7 +74,7 @@ fn find_process_by_sid_and_name( let mut found = None; for process in all_processes { - let stat = process.stat().unwrap(); + let stat = process.stat().expect("/proc existed above. Potential race occurred"); if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) { continue @@ -94,21 +94,43 @@ fn find_process_by_sid_and_name( found } +/// Sets up the test and makes sure everything gets cleaned up after. +/// +/// We run the runtime manually because `#[tokio::test]` doesn't work in `rusty_fork_test!`. +fn test_wrapper(f: F) +where + F: FnOnce(Arc, i32) -> Fut, + Fut: Future, +{ + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let host = Arc::new(TestHost::new().await); + + // Create a new session and get the session ID. + let sid = unsafe { libc::setsid() }; + assert!(sid > 0); + + // Pass a clone of the host so that it does not get dropped after. + f(host.clone(), sid).await; + + // Sleep to give processes a chance to get cleaned up, preventing races in the next step. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Make sure job processes got cleaned up. Pass `is_direct_child: false` to target the + // job processes. + assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none()); + assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none()); + }); +} + // Run these tests in their own processes with rusty-fork. They work by each creating a new session, // then finding the child process that matches the session ID and expected process name and doing // something with that child. rusty_fork_test! { - // All created subprocesses for jobs should get cleaned up, to avoid memory leaks. + // Everything succeeded. All created subprocesses for jobs should get cleaned up, to avoid memory leaks. #[test] - fn job_processes_get_cleaned_up() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + fn successful_prepare_and_validate() { + test_wrapper(|host, _sid| async move { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; host @@ -124,24 +146,13 @@ rusty_fork_test! { ) .await .unwrap(); - - // Pass `is_direct_child: false` to target the job processes. - assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none()); - assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none()); }) } // What happens when the prepare worker (not the job) times out? #[test] fn prepare_worker_timeout() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), @@ -159,14 +170,7 @@ rusty_fork_test! { // What happens when the execute worker (not the job) times out? #[test] fn execute_worker_timeout() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { // Prepare the artifact ahead of time. let binary = halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); @@ -200,14 +204,7 @@ rusty_fork_test! { // What happens when the prepare worker dies in the middle of a job? #[test] fn prepare_worker_killed_during_job() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), @@ -225,14 +222,7 @@ rusty_fork_test! { // What happens when the execute worker dies in the middle of a job? #[test] fn execute_worker_killed_during_job() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { // Prepare the artifact ahead of time. let binary = halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); @@ -266,14 +256,7 @@ rusty_fork_test! { // What happens when the forked prepare job dies in the middle of its job? #[test] fn forked_prepare_job_killed_during_job() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), @@ -295,14 +278,7 @@ rusty_fork_test! { // What happens when the forked execute job dies in the middle of its job? #[test] fn forked_execute_job_killed_during_job() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { // Prepare the artifact ahead of time. let binary = halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); @@ -340,14 +316,7 @@ rusty_fork_test! { // See `run_worker` for why we need this invariant. #[test] fn ensure_prepare_processes_have_correct_num_threads() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { let _ = futures::join!( // Choose a job that would normally take the entire timeout. host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), @@ -377,14 +346,7 @@ rusty_fork_test! { // See `run_worker` for why we need this invariant. #[test] fn ensure_execute_processes_have_correct_num_threads() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let host = TestHost::new().await; - - // Create a new session and get the session ID. - let sid = unsafe { libc::setsid() }; - assert!(sid > 0); - + test_wrapper(|host, sid| async move { // Prepare the artifact ahead of time. let binary = halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); From d2e7cb4fd5d5fe42162cedec11d6c41d0e474cee Mon Sep 17 00:00:00 2001 From: "Marcin S." Date: Wed, 6 Dec 2023 20:05:15 +0100 Subject: [PATCH 3/4] Use PR_SET_PDEATHSIG to make sure job processes get cleaned up --- polkadot/node/core/pvf/common/src/execute.rs | 2 ++ polkadot/node/core/pvf/execute-worker/src/lib.rs | 9 +++++++++ polkadot/node/core/pvf/prepare-worker/src/lib.rs | 9 +++++++++ 3 files changed, 20 insertions(+) diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index aa1c1c539682..5ba5b443e6a1 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -96,4 +96,6 @@ pub enum JobError { CouldNotSpawnThread(String), #[error("An error occurred in the CPU time monitor thread: {0}")] CpuTimeMonitorThread(String), + #[error("Could not set pdeathsig: {0}")] + CouldNotSetPdeathsig(String), } diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index b33a9d5069df..cff6e0ac13ab 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -277,6 +277,15 @@ fn handle_child_process( params: Vec, execution_timeout: Duration, ) -> ! { + // Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded). + // + // RACE: the worker may die before we install the death signal. In practice this is unlikely, + // and most of the time the job process should terminate on its own when it completes. + #[cfg(target_os = "linux")] + nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| { + send_child_response(&mut pipe_write, Err(JobError::CouldNotSetPdeathsig(err.to_string()))) + }); + gum::debug!( target: LOG_TARGET, worker_job_pid = %process::id(), diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index af5ac8c59749..f77eed871ec9 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -334,6 +334,15 @@ fn handle_child_process( prepare_job_kind: PrepareJobKind, executor_params: Arc, ) -> ! { + // Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded). + // + // RACE: the worker may die before we install the death signal. In practice this is unlikely, + // and most of the time the job process should terminate on its own when it completes. + #[cfg(target_os = "linux")] + nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| { + send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string()))) + }); + let worker_job_pid = process::id(); gum::debug!( target: LOG_TARGET, From 61c4871f4298ef9f3bc78609e48029ebf73a6735 Mon Sep 17 00:00:00 2001 From: "Marcin S." Date: Thu, 14 Dec 2023 17:00:54 +0100 Subject: [PATCH 4/4] minor: Improve log --- polkadot/node/core/pvf/src/worker_interface.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/polkadot/node/core/pvf/src/worker_interface.rs b/polkadot/node/core/pvf/src/worker_interface.rs index c68ff92b06eb..456c20bd27b2 100644 --- a/polkadot/node/core/pvf/src/worker_interface.rs +++ b/polkadot/node/core/pvf/src/worker_interface.rs @@ -105,9 +105,9 @@ pub async fn spawn_with_program_path( gum::warn!( target: LOG_TARGET, %debug_id, - ?program_path_clone, - ?extra_args_clone, - ?worker_dir_clone, + program_path = ?program_path_clone, + extra_args = ?extra_args_clone, + worker_dir = ?worker_dir_clone, "error spawning worker: {}", err, );