Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Replace async-std with tokio in PVF subsystem #6419

Merged
merged 23 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ec204fa
Replace async-std with tokio in PVF subsystem
mrcnski Dec 11, 2022
3abc804
Rework workers to use `select!` instead of a mutex
mrcnski Dec 13, 2022
5fcc67d
Remove unnecessary `fuse`
mrcnski Dec 13, 2022
1574561
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 13, 2022
451fae0
Add explanation for `expect()`
mrcnski Dec 13, 2022
fc4c28b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
1dde78b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
da31a48
Address some review comments
mrcnski Dec 18, 2022
35a0c79
Merge remote-tracking branch 'origin/m-cat/replace-async-std-pvf' int…
mrcnski Dec 18, 2022
e1c2cf3
Shutdown tokio runtime
mrcnski Dec 18, 2022
077a123
Run cargo fmt
mrcnski Dec 19, 2022
e0d4b9e
Add a small note about retries
mrcnski Dec 19, 2022
2353747
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 20, 2022
28d4062
Fix up merge
mrcnski Dec 20, 2022
3964aca
Rework `cpu_time_monitor_loop` to return when other thread finishes
mrcnski Dec 20, 2022
7057518
Add error string to PrepareError::IoErr variant
mrcnski Dec 20, 2022
e6ba098
Log when artifacts fail to prepare
mrcnski Dec 20, 2022
e094f80
Fix `cpu_time_monitor_loop`; fix test
mrcnski Dec 20, 2022
c09377a
Fix text
mrcnski Dec 20, 2022
05d1865
Fix a couple of potential minor data races.
mrcnski Dec 22, 2022
b0c2434
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 5, 2023
5cc477c
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 9, 2023
0f4ac06
Update Cargo.lock
mrcnski Jan 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 8 additions & 84 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs"

[dependencies]
always-assert = "0.1"
async-std = { version = "1.11.0", features = ["attributes"] }
async-process = "1.3.0"
assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
Expand All @@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
tokio = { version = "1.22.0", features = ["fs", "process"] }
rayon = "1.5.1"

parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }
Expand Down
19 changes: 8 additions & 11 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use crate::{error::PrepareError, host::PrepareResultSender};
use always_assert::always;
use async_std::path::{Path, PathBuf};
use polkadot_parachain::primitives::ValidationCodeHash;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -136,8 +136,8 @@ impl Artifacts {
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created.
// First delete the entire cache. Nodes are long-running so this should populate shortly.
let _ = async_std::fs::remove_dir_all(cache_path).await;
let _ = async_std::fs::create_dir_all(cache_path).await;
let _ = tokio::fs::remove_dir_all(cache_path).await;
let _ = tokio::fs::create_dir_all(cache_path).await;

Self { artifacts: HashMap::new() }
}
Expand Down Expand Up @@ -214,9 +214,8 @@ impl Artifacts {
#[cfg(test)]
mod tests {
use super::{ArtifactId, Artifacts};
use async_std::path::Path;
use sp_core::H256;
use std::str::FromStr;
use std::{path::Path, str::FromStr};

#[test]
fn from_file_name() {
Expand Down Expand Up @@ -252,11 +251,9 @@ mod tests {
);
}

#[test]
fn artifacts_removes_cache_on_startup() {
let fake_cache_path = async_std::task::block_on(async move {
crate::worker_common::tmpfile("test-cache").await.unwrap()
});
#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
Expand All @@ -271,7 +268,7 @@ mod tests {
// this should remove it and re-create.

let p = &fake_cache_path;
async_std::task::block_on(async { Artifacts::new(p).await });
Artifacts::new(p).await;

assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0);

Expand Down
3 changes: 1 addition & 2 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ use crate::{
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use async_std::path::PathBuf;
use futures::{
channel::mpsc,
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, time::Duration};
use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};

slotmap::new_key_type! { struct Worker; }

Expand Down
86 changes: 41 additions & 45 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,20 @@ use crate::{
},
LOG_TARGET,
};
use async_std::{
io,
os::unix::net::UnixStream,
path::{Path, PathBuf},
task,
};
use cpu_time::ProcessTime;
use futures::FutureExt;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult;
use std::{
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use tokio::{io, net::UnixStream, select};

/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
Expand Down Expand Up @@ -235,10 +230,10 @@ impl Response {
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |mut stream| async move {
let executor = Executor::new().map_err(|e| {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let executor = Arc::new(Executor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?;
})?);

loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
Expand All @@ -249,52 +244,53 @@ pub fn worker_entrypoint(socket_path: &str) {
artifact_path.display(),
);

// Create a lock flag. We set it when either thread finishes.
let lock = Arc::new(AtomicBool::new(false));
// Flag used only to signal to the cpu time monitor thread that it can finish.
let finished_flag = Arc::new(AtomicBool::new(false));
let cpu_time_start = ProcessTime::now();

// Spawn a new thread that runs the CPU time monitor. Continuously wakes up from
// sleeping and then either sleeps for the remaining CPU time, or kills the process if
// we exceed the CPU timeout.
let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) =
(stream.clone(), cpu_time_start, execution_timeout, lock.clone());
let handle =
thread::Builder::new().name("CPU time monitor".into()).spawn(move || {
task::block_on(async {
cpu_time_monitor_loop(
JobKind::Execute,
stream_2,
cpu_time_start_2,
execution_timeout_2,
lock_2,
)
.await;
})
})?;

let response =
validate_using_artifact(&artifact_path, &params, &executor, cpu_time_start).await;
// Spawn a new thread that runs the CPU time monitor.
let finished_flag_2 = finished_flag.clone();
let thread_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(
JobKind::Execute,
cpu_time_start,
execution_timeout,
finished_flag_2,
);
})
.fuse();
let executor_2 = executor.clone();
let execute_fut = rt_handle
.spawn_blocking(move || {
validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
})
.fuse();

let lock_result =
lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
if lock_result.is_err() {
// The other thread is still sending an error response over the socket. Wait on it
// and return.
let _ = handle.join();
// Monitor thread detected timeout and likely already terminated the process,
// nothing to do.
continue
}
let response = select! {
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
// If this future is not selected, the join handle is dropped and the thread will
// finish in the background.
join_res = thread_fut => {
match join_res {
Ok(()) => Response::TimedOut,
Err(e) => Response::InternalError(format!("{}", e)),
}
},
execute_res = execute_fut => {
finished_flag.store(true, Ordering::Relaxed);
execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e)))
},
};

send_response(&mut stream, response).await?;
}
});
}

async fn validate_using_artifact(
fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
executor: &Executor,
executor: Arc<Executor>,
cpu_time_start: ProcessTime,
) -> Response {
let descriptor_bytes = match unsafe {
Expand Down
Loading