Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PVF: ensure job processes are cleaned up, add tests #2643

Merged
merged 6 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ pub enum JobError {
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
#[error("Could not set pdeathsig: {0}")]
CouldNotSetPdeathsig(String),
}
9 changes: 9 additions & 0 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ fn handle_child_process(
params: Vec<u8>,
execution_timeout: Duration,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSetPdeathsig(err.to_string())))
});

gum::debug!(
target: LOG_TARGET,
worker_job_pid = %process::id(),
Expand Down
9 changes: 9 additions & 0 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ fn handle_child_process(
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});

let worker_job_pid = process::id();
gum::debug!(
target: LOG_TARGET,
Expand Down
6 changes: 3 additions & 3 deletions polkadot/node/core/pvf/src/worker_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path_clone,
?extra_args_clone,
?worker_dir_clone,
program_path = ?program_path_clone,
extra_args = ?extra_args_clone,
worker_dir = ?worker_dir_clone,
"error spawning worker: {}",
err,
);
Expand Down
155 changes: 78 additions & 77 deletions polkadot/node/core/pvf/tests/it/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
//! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs).

use super::TestHost;
use adder::{hash_state, BlockData, HeadData};
use assert_matches::assert_matches;
use parity_scale_codec::Encode;
use polkadot_node_core_pvf::{
InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams};
use polkadot_parachain_primitives::primitives::{
BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams,
};
use procfs::process;
use rusty_fork::rusty_fork_test;
use std::time::Duration;
use std::{future::Future, sync::Arc, time::Duration};

const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker";
const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker";
Expand All @@ -39,19 +43,21 @@ fn send_signal_by_sid_and_name(
is_direct_child: bool,
signal: i32,
) {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child)
.expect("Should have found the expected process");
assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0);
}
fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child)
.expect("Should have found the expected process");
process.stat().unwrap().num_threads
}

fn find_process_by_sid_and_name(
sid: i32,
exe_name: &'static str,
is_direct_child: bool,
) -> process::Process {
) -> Option<process::Process> {
let all_processes: Vec<process::Process> = process::all_processes()
.expect("Can't read /proc")
.filter_map(|p| match p {
Expand All @@ -68,7 +74,7 @@ fn find_process_by_sid_and_name(

let mut found = None;
for process in all_processes {
let stat = process.stat().unwrap();
let stat = process.stat().expect("/proc existed above. Potential race occurred");

if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) {
continue
Expand All @@ -85,24 +91,68 @@ fn find_process_by_sid_and_name(
}
found = Some(process);
}
found.expect("Should have found the expected process")
found
}

/// Sets up the test and makes sure everything gets cleaned up after.
///
/// We run the runtime manually because `#[tokio::test]` doesn't work in `rusty_fork_test!`.
fn test_wrapper<F, Fut>(f: F)
where
F: FnOnce(Arc<TestHost>, i32) -> Fut,
Fut: Future<Output = ()>,
{
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = Arc::new(TestHost::new().await);

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

// Pass a clone of the host so that it does not get dropped after.
f(host.clone(), sid).await;

// Sleep to give processes a chance to get cleaned up, preventing races in the next step.
tokio::time::sleep(Duration::from_millis(500)).await;

// Make sure job processes got cleaned up. Pass `is_direct_child: false` to target the
// job processes.
assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none());
assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none());
});
}

// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
// then doing something with the child process that matches the session ID and expected process
// name.
// then finding the child process that matches the session ID and expected process name and doing
// something with that child.
rusty_fork_test! {
// Everything succeeded. All created subprocesses for jobs should get cleaned up, to avoid memory leaks.
#[test]
fn successful_prepare_and_validate() {
test_wrapper(|host, _sid| async move {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
let block_data = BlockData { state: 0, add: 512 };
host
.validate_candidate(
adder::wasm_binary_unwrap(),
ValidationParams {
parent_head: GenericHeadData(parent_head.encode()),
block_data: GenericBlockData(block_data.encode()),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await
.unwrap();
})
}

// What happens when the prepare worker (not the job) times out?
#[test]
fn prepare_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand All @@ -120,14 +170,7 @@ rusty_fork_test! {
// What happens when the execute worker (not the job) times out?
#[test]
fn execute_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -137,7 +180,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand All @@ -161,14 +204,7 @@ rusty_fork_test! {
// What happens when the prepare worker dies in the middle of a job?
#[test]
fn prepare_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand All @@ -186,14 +222,7 @@ rusty_fork_test! {
// What happens when the execute worker dies in the middle of a job?
#[test]
fn execute_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -203,7 +232,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand All @@ -227,14 +256,7 @@ rusty_fork_test! {
// What happens when the forked prepare job dies in the middle of its job?
#[test]
fn forked_prepare_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand All @@ -256,14 +278,7 @@ rusty_fork_test! {
// What happens when the forked execute job dies in the middle of its job?
#[test]
fn forked_execute_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -273,7 +288,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand Down Expand Up @@ -301,14 +316,7 @@ rusty_fork_test! {
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_prepare_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
let _ = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
Expand Down Expand Up @@ -338,14 +346,7 @@ rusty_fork_test! {
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_execute_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;

// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);

test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
Expand All @@ -355,7 +356,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
Expand Down