diff --git a/crates/wasi/src/ctx.rs b/crates/wasi/src/ctx.rs index 4e9f3578e736..98233fa458e2 100644 --- a/crates/wasi/src/ctx.rs +++ b/crates/wasi/src/ctx.rs @@ -30,6 +30,7 @@ pub struct WasiCtxBuilder { wall_clock: Box, monotonic_clock: Box, allowed_network_uses: AllowedNetworkUses, + allow_blocking_current_thread: bool, built: bool, } @@ -80,6 +81,7 @@ impl WasiCtxBuilder { wall_clock: wall_clock(), monotonic_clock: monotonic_clock(), allowed_network_uses: AllowedNetworkUses::default(), + allow_blocking_current_thread: false, built: false, } } @@ -115,6 +117,37 @@ impl WasiCtxBuilder { self.inherit_stdin().inherit_stdout().inherit_stderr() } + /// Configures whether or not blocking operations made through this + /// `WasiCtx` are allowed to block the current thread. + /// + /// WASI is currently implemented on top of the Rust + /// [Tokio](https://tokio.rs/) library. While most WASI APIs are + /// non-blocking some are instead blocking from the perspective of + /// WebAssembly. For example opening a file is a blocking operation with + /// respect to WebAssembly but it's implemented as an asynchronous operation + /// on the host. This is currently done with Tokio's + /// [`spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html). + /// + /// When WebAssembly is used in a synchronous context, for example when + /// [`Config::async_support`] is disabled, then this asynchronous operation + /// is quickly turned back into a synchronous operation with a `block_on` in + /// Rust. This switching back-and-forth between a blocking a non-blocking + /// context can have overhead, and this option exists to help alleviate this + /// overhead. + /// + /// This option indicates that for WASI functions that are blocking from the + /// perspective of WebAssembly it's ok to block the native thread as well. + /// This means that this back-and-forth between async and sync won't happen + /// and instead blocking operations are performed on-thread (such as opening + /// a file). This can improve the performance of WASI operations when async + /// support is disabled. + /// + /// [`Config::async_support`]: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.async_support + pub fn allow_blocking_current_thread(&mut self, enable: bool) -> &mut Self { + self.allow_blocking_current_thread = enable; + self + } + pub fn envs(&mut self, env: &[(impl AsRef, impl AsRef)]) -> &mut Self { self.env.extend( env.iter() @@ -162,7 +195,13 @@ impl WasiCtxBuilder { open_mode |= OpenMode::WRITE; } self.preopens.push(( - Dir::new(dir, perms, file_perms, open_mode), + Dir::new( + dir, + perms, + file_perms, + open_mode, + self.allow_blocking_current_thread, + ), path.as_ref().to_owned(), )); self @@ -263,6 +302,7 @@ impl WasiCtxBuilder { wall_clock, monotonic_clock, allowed_network_uses, + allow_blocking_current_thread, built: _, } = mem::replace(self, Self::new()); self.built = true; @@ -281,6 +321,7 @@ impl WasiCtxBuilder { wall_clock, monotonic_clock, allowed_network_uses, + allow_blocking_current_thread, } } @@ -310,6 +351,7 @@ pub struct WasiCtx { pub(crate) stderr: Box, pub(crate) socket_addr_check: SocketAddrCheck, pub(crate) allowed_network_uses: AllowedNetworkUses, + pub(crate) allow_blocking_current_thread: bool, } pub struct AllowedNetworkUses { diff --git a/crates/wasi/src/filesystem.rs b/crates/wasi/src/filesystem.rs index 86bd9398d8b9..3dffa4603529 100644 --- a/crates/wasi/src/filesystem.rs +++ b/crates/wasi/src/filesystem.rs @@ -74,6 +74,7 @@ bitflags::bitflags! { } } +#[derive(Clone)] pub struct File { /// The operating system File this struct is mediating access to. /// @@ -92,14 +93,22 @@ pub struct File { /// doesn't presently provide a cross-platform equivelant of reading the /// oflags back out using fcntl. pub open_mode: OpenMode, + + allow_blocking_current_thread: bool, } impl File { - pub fn new(file: cap_std::fs::File, perms: FilePerms, open_mode: OpenMode) -> Self { + pub fn new( + file: cap_std::fs::File, + perms: FilePerms, + open_mode: OpenMode, + allow_blocking_current_thread: bool, + ) -> Self { Self { file: Arc::new(file), perms, open_mode, + allow_blocking_current_thread, } } @@ -110,11 +119,31 @@ impl File { F: FnOnce(&cap_std::fs::File) -> R + Send + 'static, R: Send + 'static, { - let f = self.file.clone(); - spawn_blocking(move || body(&f)).await + match self._spawn_blocking(body) { + SpawnBlocking::Done(result) => result, + SpawnBlocking::Spawned(task) => task.await, + } + } + + fn _spawn_blocking(&self, body: F) -> SpawnBlocking + where + F: FnOnce(&cap_std::fs::File) -> R + Send + 'static, + R: Send + 'static, + { + if self.allow_blocking_current_thread { + SpawnBlocking::Done(body(&self.file)) + } else { + let f = self.file.clone(); + SpawnBlocking::Spawned(spawn_blocking(move || body(&f))) + } } } +enum SpawnBlocking { + Done(T), + Spawned(AbortOnDropJoinHandle), +} + bitflags::bitflags! { #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct DirPerms: usize { @@ -146,6 +175,8 @@ pub struct Dir { /// doesn't presently provide a cross-platform equivelant of reading the /// oflags back out using fcntl. pub open_mode: OpenMode, + + allow_blocking_current_thread: bool, } impl Dir { @@ -154,12 +185,14 @@ impl Dir { perms: DirPerms, file_perms: FilePerms, open_mode: OpenMode, + allow_blocking_current_thread: bool, ) -> Self { Dir { dir: Arc::new(dir), perms, file_perms, open_mode, + allow_blocking_current_thread, } } @@ -170,30 +203,38 @@ impl Dir { F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static, R: Send + 'static, { - let d = self.dir.clone(); - spawn_blocking(move || body(&d)).await + if self.allow_blocking_current_thread { + body(&self.dir) + } else { + let d = self.dir.clone(); + spawn_blocking(move || body(&d)).await + } } } pub struct FileInputStream { - file: Arc, + file: File, position: u64, } impl FileInputStream { - pub fn new(file: Arc, position: u64) -> Self { - Self { file, position } + pub fn new(file: &File, position: u64) -> Self { + Self { + file: file.clone(), + position, + } } pub async fn read(&mut self, size: usize) -> Result { use system_interface::fs::FileIoExt; - let f = Arc::clone(&self.file); let p = self.position; - let (r, mut buf) = spawn_blocking(move || { - let mut buf = BytesMut::zeroed(size); - let r = f.read_at(&mut buf, p); - (r, buf) - }) - .await; + let (r, mut buf) = self + .file + .spawn_blocking(move |f| { + let mut buf = BytesMut::zeroed(size); + let r = f.read_at(&mut buf, p); + (r, buf) + }) + .await; let n = read_result(r)?; buf.truncate(n); self.position += n as u64; @@ -222,7 +263,7 @@ pub(crate) enum FileOutputMode { } pub(crate) struct FileOutputStream { - file: Arc, + file: File, mode: FileOutputMode, state: OutputState, } @@ -238,16 +279,17 @@ enum OutputState { } impl FileOutputStream { - pub fn write_at(file: Arc, position: u64) -> Self { + pub fn write_at(file: &File, position: u64) -> Self { Self { - file, + file: file.clone(), mode: FileOutputMode::Position(position), state: OutputState::Ready, } } - pub fn append(file: Arc) -> Self { + + pub fn append(file: &File) -> Self { Self { - file, + file: file.clone(), mode: FileOutputMode::Append, state: OutputState::Ready, } @@ -275,33 +317,43 @@ impl HostOutputStream for FileOutputStream { return Ok(()); } - let f = Arc::clone(&self.file); let m = self.mode; - let task = spawn_blocking(move || match m { - FileOutputMode::Position(mut p) => { - let mut total = 0; - let mut buf = buf; - while !buf.is_empty() { - let nwritten = f.write_at(buf.as_ref(), p)?; - // afterwards buf contains [nwritten, len): - let _ = buf.split_to(nwritten); - p += nwritten as u64; - total += nwritten; + let result = self.file._spawn_blocking(move |f| { + match m { + FileOutputMode::Position(mut p) => { + let mut total = 0; + let mut buf = buf; + while !buf.is_empty() { + let nwritten = f.write_at(buf.as_ref(), p)?; + // afterwards buf contains [nwritten, len): + let _ = buf.split_to(nwritten); + p += nwritten as u64; + total += nwritten; + } + Ok(total) } - Ok(total) - } - FileOutputMode::Append => { - let mut total = 0; - let mut buf = buf; - while !buf.is_empty() { - let nwritten = f.append(buf.as_ref())?; - let _ = buf.split_to(nwritten); - total += nwritten; + FileOutputMode::Append => { + let mut total = 0; + let mut buf = buf; + while !buf.is_empty() { + let nwritten = f.append(buf.as_ref())?; + let _ = buf.split_to(nwritten); + total += nwritten; + } + Ok(total) } - Ok(total) } }); - self.state = OutputState::Waiting(task); + self.state = match result { + SpawnBlocking::Done(Ok(nwritten)) => { + if let FileOutputMode::Position(ref mut p) = &mut self.mode { + *p += nwritten as u64; + } + OutputState::Ready + } + SpawnBlocking::Done(Err(e)) => OutputState::Error(e), + SpawnBlocking::Spawned(task) => OutputState::Waiting(task), + }; Ok(()) } fn flush(&mut self) -> Result<(), StreamError> { diff --git a/crates/wasi/src/host/filesystem.rs b/crates/wasi/src/host/filesystem.rs index 6094bddb728a..5ad40071d18a 100644 --- a/crates/wasi/src/host/filesystem.rs +++ b/crates/wasi/src/host/filesystem.rs @@ -493,6 +493,7 @@ impl HostDescriptor for T { use system_interface::fs::{FdFlags, GetSetFdFlags}; use types::{DescriptorFlags, OpenFlags}; + let allow_blocking_current_thread = self.ctx().allow_blocking_current_thread; let table = self.table(); let d = table.get(&fd)?.dir()?; if !d.perms.contains(DirPerms::READ) { @@ -609,11 +610,15 @@ impl HostDescriptor for T { d.perms, d.file_perms, open_mode, + allow_blocking_current_thread, )))?), - OpenResult::File(file) => { - Ok(table.push(Descriptor::File(File::new(file, d.file_perms, open_mode)))?) - } + OpenResult::File(file) => Ok(table.push(Descriptor::File(File::new( + file, + d.file_perms, + open_mode, + allow_blocking_current_thread, + )))?), OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()), } @@ -730,11 +735,9 @@ impl HostDescriptor for T { if !f.perms.contains(FilePerms::READ) { Err(types::ErrorCode::BadDescriptor)?; } - // Duplicate the file descriptor so that we get an indepenent lifetime. - let clone = std::sync::Arc::clone(&f.file); // Create a stream view for it. - let reader = FileInputStream::new(clone, offset); + let reader = FileInputStream::new(f, offset); // Insert the stream view into the table. Trap if the table is full. let index = self.table().push(InputStream::File(reader))?; @@ -754,11 +757,8 @@ impl HostDescriptor for T { Err(types::ErrorCode::BadDescriptor)?; } - // Duplicate the file descriptor so that we get an indepenent lifetime. - let clone = std::sync::Arc::clone(&f.file); - // Create a stream view for it. - let writer = FileOutputStream::write_at(clone, offset); + let writer = FileOutputStream::write_at(f, offset); let writer: OutputStream = Box::new(writer); // Insert the stream view into the table. Trap if the table is full. @@ -777,11 +777,9 @@ impl HostDescriptor for T { if !f.perms.contains(FilePerms::WRITE) { Err(types::ErrorCode::BadDescriptor)?; } - // Duplicate the file descriptor so that we get an indepenent lifetime. - let clone = std::sync::Arc::clone(&f.file); // Create a stream view for it. - let appender = FileOutputStream::append(clone); + let appender = FileOutputStream::append(f); let appender: OutputStream = Box::new(appender); // Insert the stream view into the table. Trap if the table is full. diff --git a/src/commands/run.rs b/src/commands/run.rs index 1413874ee535..2c1eed303542 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -207,15 +207,26 @@ impl RunCommand { } } + // Pre-emptively initialize and install a Tokio runtime ambiently in the + // environment when executing the module. Without this whenever a WASI + // call is made that needs to block on a future a Tokio runtime is + // configured and entered, and this appears to be slower than simply + // picking an existing runtime out of the environment and using that. + // The goal of this is to improve the performance of WASI-related + // operations that block in the CLI since the CLI doesn't use async to + // invoke WebAssembly. + let result = wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| { + self.load_main_module(&mut store, &mut linker, &main, modules) + .with_context(|| { + format!( + "failed to run main module `{}`", + self.module_and_args[0].to_string_lossy() + ) + }) + }); + // Load the main wasm module. - match self - .load_main_module(&mut store, &mut linker, &main, modules) - .with_context(|| { - format!( - "failed to run main module `{}`", - self.module_and_args[0].to_string_lossy() - ) - }) { + match result { Ok(()) => (), Err(e) => { // Exit the process if Wasmtime understands the error; @@ -799,6 +810,12 @@ impl RunCommand { let mut builder = wasmtime_wasi::WasiCtxBuilder::new(); builder.inherit_stdio().args(&self.compute_argv()?); + // It's ok to block the current thread since we're the only thread in + // the program as the CLI. This helps improve the performance of some + // blocking operations in WASI, for example, by skipping the + // back-and-forth between sync and async. + builder.allow_blocking_current_thread(true); + if self.run.common.wasi.inherit_env == Some(true) { for (k, v) in std::env::vars() { builder.env(&k, &v);