Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement opt-in for enabling WASI to block the current thread #8190

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion crates/wasi/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct WasiCtxBuilder {
wall_clock: Box<dyn HostWallClock + Send>,
monotonic_clock: Box<dyn HostMonotonicClock + Send>,
allowed_network_uses: AllowedNetworkUses,
allow_blocking_current_thread: bool,
built: bool,
}

Expand Down Expand Up @@ -80,6 +81,7 @@ impl WasiCtxBuilder {
wall_clock: wall_clock(),
monotonic_clock: monotonic_clock(),
allowed_network_uses: AllowedNetworkUses::default(),
allow_blocking_current_thread: false,
built: false,
}
}
Expand Down Expand Up @@ -115,6 +117,37 @@ impl WasiCtxBuilder {
self.inherit_stdin().inherit_stdout().inherit_stderr()
}

/// Configures whether or not blocking operations made through this
/// `WasiCtx` are allowed to block the current thread.
///
/// WASI is currently implemented on top of the Rust
/// [Tokio](https://tokio.rs/) library. While most WASI APIs are
/// non-blocking some are instead blocking from the perspective of
/// WebAssembly. For example opening a file is a blocking operation with
/// respect to WebAssembly but it's implemented as an asynchronous operation
/// on the host. This is currently done with Tokio's
/// [`spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).
///
/// When WebAssembly is used in a synchronous context, for example when
/// [`Config::async_support`] is disabled, then this asynchronous operation
/// is quickly turned back into a synchronous operation with a `block_on` in
/// Rust. This switching back-and-forth between a blocking a non-blocking
/// context can have overhead, and this option exists to help alleviate this
/// overhead.
///
/// This option indicates that for WASI functions that are blocking from the
/// perspective of WebAssembly it's ok to block the native thread as well.
/// This means that this back-and-forth between async and sync won't happen
/// and instead blocking operations are performed on-thread (such as opening
/// a file). This can improve the performance of WASI operations when async
/// support is disabled.
///
/// [`Config::async_support`]: https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.async_support
pub fn allow_blocking_current_thread(&mut self, enable: bool) -> &mut Self {
self.allow_blocking_current_thread = enable;
self
}

pub fn envs(&mut self, env: &[(impl AsRef<str>, impl AsRef<str>)]) -> &mut Self {
self.env.extend(
env.iter()
Expand Down Expand Up @@ -162,7 +195,13 @@ impl WasiCtxBuilder {
open_mode |= OpenMode::WRITE;
}
self.preopens.push((
Dir::new(dir, perms, file_perms, open_mode),
Dir::new(
dir,
perms,
file_perms,
open_mode,
self.allow_blocking_current_thread,
),
path.as_ref().to_owned(),
));
self
Expand Down Expand Up @@ -263,6 +302,7 @@ impl WasiCtxBuilder {
wall_clock,
monotonic_clock,
allowed_network_uses,
allow_blocking_current_thread,
built: _,
} = mem::replace(self, Self::new());
self.built = true;
Expand All @@ -281,6 +321,7 @@ impl WasiCtxBuilder {
wall_clock,
monotonic_clock,
allowed_network_uses,
allow_blocking_current_thread,
}
}

Expand Down Expand Up @@ -310,6 +351,7 @@ pub struct WasiCtx {
pub(crate) stderr: Box<dyn StdoutStream>,
pub(crate) socket_addr_check: SocketAddrCheck,
pub(crate) allowed_network_uses: AllowedNetworkUses,
pub(crate) allow_blocking_current_thread: bool,
}

pub struct AllowedNetworkUses {
Expand Down
136 changes: 94 additions & 42 deletions crates/wasi/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ bitflags::bitflags! {
}
}

#[derive(Clone)]
pub struct File {
/// The operating system File this struct is mediating access to.
///
Expand All @@ -92,14 +93,22 @@ pub struct File {
/// doesn't presently provide a cross-platform equivelant of reading the
/// oflags back out using fcntl.
pub open_mode: OpenMode,

allow_blocking_current_thread: bool,
}

impl File {
pub fn new(file: cap_std::fs::File, perms: FilePerms, open_mode: OpenMode) -> Self {
pub fn new(
file: cap_std::fs::File,
perms: FilePerms,
open_mode: OpenMode,
allow_blocking_current_thread: bool,
) -> Self {
Self {
file: Arc::new(file),
perms,
open_mode,
allow_blocking_current_thread,
}
}

Expand All @@ -110,11 +119,31 @@ impl File {
F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
R: Send + 'static,
{
let f = self.file.clone();
spawn_blocking(move || body(&f)).await
match self._spawn_blocking(body) {
SpawnBlocking::Done(result) => result,
SpawnBlocking::Spawned(task) => task.await,
}
}

fn _spawn_blocking<F, R>(&self, body: F) -> SpawnBlocking<R>
where
F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
R: Send + 'static,
{
if self.allow_blocking_current_thread {
SpawnBlocking::Done(body(&self.file))
} else {
let f = self.file.clone();
SpawnBlocking::Spawned(spawn_blocking(move || body(&f)))
}
}
}

enum SpawnBlocking<T> {
Done(T),
Spawned(AbortOnDropJoinHandle<T>),
}

bitflags::bitflags! {
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct DirPerms: usize {
Expand Down Expand Up @@ -146,6 +175,8 @@ pub struct Dir {
/// doesn't presently provide a cross-platform equivelant of reading the
/// oflags back out using fcntl.
pub open_mode: OpenMode,

allow_blocking_current_thread: bool,
}

impl Dir {
Expand All @@ -154,12 +185,14 @@ impl Dir {
perms: DirPerms,
file_perms: FilePerms,
open_mode: OpenMode,
allow_blocking_current_thread: bool,
) -> Self {
Dir {
dir: Arc::new(dir),
perms,
file_perms,
open_mode,
allow_blocking_current_thread,
}
}

Expand All @@ -170,30 +203,38 @@ impl Dir {
F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
R: Send + 'static,
{
let d = self.dir.clone();
spawn_blocking(move || body(&d)).await
if self.allow_blocking_current_thread {
body(&self.dir)
} else {
let d = self.dir.clone();
spawn_blocking(move || body(&d)).await
}
}
}

pub struct FileInputStream {
file: Arc<cap_std::fs::File>,
file: File,
position: u64,
}
impl FileInputStream {
pub fn new(file: Arc<cap_std::fs::File>, position: u64) -> Self {
Self { file, position }
pub fn new(file: &File, position: u64) -> Self {
Self {
file: file.clone(),
position,
}
}

pub async fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
use system_interface::fs::FileIoExt;
let f = Arc::clone(&self.file);
let p = self.position;
let (r, mut buf) = spawn_blocking(move || {
let mut buf = BytesMut::zeroed(size);
let r = f.read_at(&mut buf, p);
(r, buf)
})
.await;
let (r, mut buf) = self
.file
.spawn_blocking(move |f| {
let mut buf = BytesMut::zeroed(size);
let r = f.read_at(&mut buf, p);
(r, buf)
})
.await;
let n = read_result(r)?;
buf.truncate(n);
self.position += n as u64;
Expand Down Expand Up @@ -222,7 +263,7 @@ pub(crate) enum FileOutputMode {
}

pub(crate) struct FileOutputStream {
file: Arc<cap_std::fs::File>,
file: File,
mode: FileOutputMode,
state: OutputState,
}
Expand All @@ -238,16 +279,17 @@ enum OutputState {
}

impl FileOutputStream {
pub fn write_at(file: Arc<cap_std::fs::File>, position: u64) -> Self {
pub fn write_at(file: &File, position: u64) -> Self {
Self {
file,
file: file.clone(),
mode: FileOutputMode::Position(position),
state: OutputState::Ready,
}
}
pub fn append(file: Arc<cap_std::fs::File>) -> Self {

pub fn append(file: &File) -> Self {
Self {
file,
file: file.clone(),
mode: FileOutputMode::Append,
state: OutputState::Ready,
}
Expand Down Expand Up @@ -275,33 +317,43 @@ impl HostOutputStream for FileOutputStream {
return Ok(());
}

let f = Arc::clone(&self.file);
let m = self.mode;
let task = spawn_blocking(move || match m {
FileOutputMode::Position(mut p) => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.write_at(buf.as_ref(), p)?;
// afterwards buf contains [nwritten, len):
let _ = buf.split_to(nwritten);
p += nwritten as u64;
total += nwritten;
let result = self.file._spawn_blocking(move |f| {
match m {
FileOutputMode::Position(mut p) => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.write_at(buf.as_ref(), p)?;
// afterwards buf contains [nwritten, len):
let _ = buf.split_to(nwritten);
p += nwritten as u64;
total += nwritten;
}
Ok(total)
}
Ok(total)
}
FileOutputMode::Append => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.append(buf.as_ref())?;
let _ = buf.split_to(nwritten);
total += nwritten;
FileOutputMode::Append => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.append(buf.as_ref())?;
let _ = buf.split_to(nwritten);
total += nwritten;
}
Ok(total)
}
Ok(total)
}
});
self.state = OutputState::Waiting(task);
self.state = match result {
SpawnBlocking::Done(Ok(nwritten)) => {
if let FileOutputMode::Position(ref mut p) = &mut self.mode {
*p += nwritten as u64;
}
OutputState::Ready
}
SpawnBlocking::Done(Err(e)) => OutputState::Error(e),
SpawnBlocking::Spawned(task) => OutputState::Waiting(task),
};
Ok(())
}
fn flush(&mut self) -> Result<(), StreamError> {
Expand Down
24 changes: 11 additions & 13 deletions crates/wasi/src/host/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ impl<T: WasiView> HostDescriptor for T {
use system_interface::fs::{FdFlags, GetSetFdFlags};
use types::{DescriptorFlags, OpenFlags};

let allow_blocking_current_thread = self.ctx().allow_blocking_current_thread;
let table = self.table();
let d = table.get(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
Expand Down Expand Up @@ -609,11 +610,15 @@ impl<T: WasiView> HostDescriptor for T {
d.perms,
d.file_perms,
open_mode,
allow_blocking_current_thread,
)))?),

OpenResult::File(file) => {
Ok(table.push(Descriptor::File(File::new(file, d.file_perms, open_mode)))?)
}
OpenResult::File(file) => Ok(table.push(Descriptor::File(File::new(
file,
d.file_perms,
open_mode,
allow_blocking_current_thread,
)))?),

OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()),
}
Expand Down Expand Up @@ -730,11 +735,9 @@ impl<T: WasiView> HostDescriptor for T {
if !f.perms.contains(FilePerms::READ) {
Err(types::ErrorCode::BadDescriptor)?;
}
// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let reader = FileInputStream::new(clone, offset);
let reader = FileInputStream::new(f, offset);

// Insert the stream view into the table. Trap if the table is full.
let index = self.table().push(InputStream::File(reader))?;
Expand All @@ -754,11 +757,8 @@ impl<T: WasiView> HostDescriptor for T {
Err(types::ErrorCode::BadDescriptor)?;
}

// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let writer = FileOutputStream::write_at(clone, offset);
let writer = FileOutputStream::write_at(f, offset);
let writer: OutputStream = Box::new(writer);

// Insert the stream view into the table. Trap if the table is full.
Expand All @@ -777,11 +777,9 @@ impl<T: WasiView> HostDescriptor for T {
if !f.perms.contains(FilePerms::WRITE) {
Err(types::ErrorCode::BadDescriptor)?;
}
// Duplicate the file descriptor so that we get an indepenent lifetime.
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let appender = FileOutputStream::append(clone);
let appender = FileOutputStream::append(f);
let appender: OutputStream = Box::new(appender);

// Insert the stream view into the table. Trap if the table is full.
Expand Down
Loading