Skip to content

Commit

Permalink
Try to alleviate some tokio overhead in the CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed Mar 19, 2024
1 parent afaf1c7 commit 427ec06
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 50 deletions.
13 changes: 12 additions & 1 deletion crates/wasi/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct WasiCtxBuilder {
wall_clock: Box<dyn HostWallClock + Send>,
monotonic_clock: Box<dyn HostMonotonicClock + Send>,
allowed_network_uses: AllowedNetworkUses,
allow_blocking_current_thread: bool,
built: bool,
}

Expand Down Expand Up @@ -80,6 +81,7 @@ impl WasiCtxBuilder {
wall_clock: wall_clock(),
monotonic_clock: monotonic_clock(),
allowed_network_uses: AllowedNetworkUses::default(),
allow_blocking_current_thread: false,
built: false,
}
}
Expand Down Expand Up @@ -162,7 +164,13 @@ impl WasiCtxBuilder {
open_mode |= OpenMode::WRITE;
}
self.preopens.push((
Dir::new(dir, perms, file_perms, open_mode),
Dir::new(
dir,
perms,
file_perms,
open_mode,
self.allow_blocking_current_thread,
),
path.as_ref().to_owned(),
));
self
Expand Down Expand Up @@ -263,6 +271,7 @@ impl WasiCtxBuilder {
wall_clock,
monotonic_clock,
allowed_network_uses,
allow_blocking_current_thread,
built: _,
} = mem::replace(self, Self::new());
self.built = true;
Expand All @@ -281,6 +290,7 @@ impl WasiCtxBuilder {
wall_clock,
monotonic_clock,
allowed_network_uses,
allow_blocking_current_thread,
}
}

Expand Down Expand Up @@ -310,6 +320,7 @@ pub struct WasiCtx {
pub(crate) stderr: Box<dyn StdoutStream>,
pub(crate) socket_addr_check: SocketAddrCheck,
pub(crate) allowed_network_uses: AllowedNetworkUses,
pub(crate) allow_blocking_current_thread: bool,
}

pub struct AllowedNetworkUses {
Expand Down
126 changes: 91 additions & 35 deletions crates/wasi/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,22 @@ pub struct File {
/// doesn't presently provide a cross-platform equivelant of reading the
/// oflags back out using fcntl.
pub open_mode: OpenMode,

allow_blocking_current_thread: bool,
}

impl File {
pub fn new(file: cap_std::fs::File, perms: FilePerms, open_mode: OpenMode) -> Self {
pub fn new(
file: cap_std::fs::File,
perms: FilePerms,
open_mode: OpenMode,
allow_blocking_current_thread: bool,
) -> Self {
Self {
file: Arc::new(file),
perms,
open_mode,
allow_blocking_current_thread,
}
}

Expand All @@ -110,8 +118,12 @@ impl File {
F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
R: Send + 'static,
{
let f = self.file.clone();
spawn_blocking(move || body(&f)).await
if self.allow_blocking_current_thread {
body(&self.file)
} else {
let f = self.file.clone();
spawn_blocking(move || body(&f)).await
}
}
}

Expand Down Expand Up @@ -146,6 +158,8 @@ pub struct Dir {
/// doesn't presently provide a cross-platform equivelant of reading the
/// oflags back out using fcntl.
pub open_mode: OpenMode,

allow_blocking_current_thread: bool,
}

impl Dir {
Expand All @@ -154,12 +168,14 @@ impl Dir {
perms: DirPerms,
file_perms: FilePerms,
open_mode: OpenMode,
allow_blocking_current_thread: bool,
) -> Self {
Dir {
dir: Arc::new(dir),
perms,
file_perms,
open_mode,
allow_blocking_current_thread,
}
}

Expand All @@ -170,30 +186,47 @@ impl Dir {
F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
R: Send + 'static,
{
let d = self.dir.clone();
spawn_blocking(move || body(&d)).await
if self.allow_blocking_current_thread {
body(&self.dir)
} else {
let d = self.dir.clone();
spawn_blocking(move || body(&d)).await
}
}
}

pub struct FileInputStream {
file: Arc<cap_std::fs::File>,
position: u64,
allow_blocking_current_thread: bool,
}
impl FileInputStream {
pub fn new(file: Arc<cap_std::fs::File>, position: u64) -> Self {
Self { file, position }
pub fn new(
file: Arc<cap_std::fs::File>,
position: u64,
allow_blocking_current_thread: bool,
) -> Self {
Self {
file,
position,
allow_blocking_current_thread,
}
}

pub async fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
use system_interface::fs::FileIoExt;
let f = Arc::clone(&self.file);
let p = self.position;
let (r, mut buf) = spawn_blocking(move || {
let run = move |f: &cap_std::fs::File| {
let mut buf = BytesMut::zeroed(size);
let r = f.read_at(&mut buf, p);
(r, buf)
})
.await;
};
let (r, mut buf) = if self.allow_blocking_current_thread {
run(&self.file)
} else {
let f = Arc::clone(&self.file);
spawn_blocking(move || run(&f)).await
};
let n = read_result(r)?;
buf.truncate(n);
self.position += n as u64;
Expand Down Expand Up @@ -225,6 +258,7 @@ pub(crate) struct FileOutputStream {
file: Arc<cap_std::fs::File>,
mode: FileOutputMode,
state: OutputState,
allow_blocking_current_thread: bool,
}

enum OutputState {
Expand All @@ -238,18 +272,25 @@ enum OutputState {
}

impl FileOutputStream {
pub fn write_at(file: Arc<cap_std::fs::File>, position: u64) -> Self {
pub fn write_at(
file: Arc<cap_std::fs::File>,
position: u64,
allow_blocking_current_thread: bool,
) -> Self {
Self {
file,
mode: FileOutputMode::Position(position),
state: OutputState::Ready,
allow_blocking_current_thread,
}
}
pub fn append(file: Arc<cap_std::fs::File>) -> Self {

pub fn append(file: Arc<cap_std::fs::File>, allow_blocking_current_thread: bool) -> Self {
Self {
file,
mode: FileOutputMode::Append,
state: OutputState::Ready,
allow_blocking_current_thread,
}
}
}
Expand All @@ -275,33 +316,48 @@ impl HostOutputStream for FileOutputStream {
return Ok(());
}

let f = Arc::clone(&self.file);
let m = self.mode;
let task = spawn_blocking(move || match m {
FileOutputMode::Position(mut p) => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.write_at(buf.as_ref(), p)?;
// afterwards buf contains [nwritten, len):
let _ = buf.split_to(nwritten);
p += nwritten as u64;
total += nwritten;
let run = move |f: &cap_std::fs::File| {
match m {
FileOutputMode::Position(mut p) => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.write_at(buf.as_ref(), p)?;
// afterwards buf contains [nwritten, len):
let _ = buf.split_to(nwritten);
p += nwritten as u64;
total += nwritten;
}
Ok(total)
}
FileOutputMode::Append => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.append(buf.as_ref())?;
let _ = buf.split_to(nwritten);
total += nwritten;
}
Ok(total)
}
Ok(total)
}
FileOutputMode::Append => {
let mut total = 0;
let mut buf = buf;
while !buf.is_empty() {
let nwritten = f.append(buf.as_ref())?;
let _ = buf.split_to(nwritten);
total += nwritten;
};
self.state = if self.allow_blocking_current_thread {
match run(&self.file) {
Ok(nwritten) => {
if let FileOutputMode::Position(ref mut p) = &mut self.mode {
*p += nwritten as u64;
}
OutputState::Ready
}
Ok(total)
Err(e) => OutputState::Error(e),
}
});
self.state = OutputState::Waiting(task);
} else {
let f = Arc::clone(&self.file);
let task = spawn_blocking(move || run(&f));
OutputState::Waiting(task)
};
Ok(())
}
fn flush(&mut self) -> Result<(), StreamError> {
Expand Down
18 changes: 12 additions & 6 deletions crates/wasi/src/host/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ impl<T: WasiView> HostDescriptor for T {
use system_interface::fs::{FdFlags, GetSetFdFlags};
use types::{DescriptorFlags, OpenFlags};

let allow_blocking_current_thread = self.ctx().allow_blocking_current_thread;
let table = self.table();
let d = table.get(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
Expand Down Expand Up @@ -609,11 +610,15 @@ impl<T: WasiView> HostDescriptor for T {
d.perms,
d.file_perms,
open_mode,
allow_blocking_current_thread,
)))?),

OpenResult::File(file) => {
Ok(table.push(Descriptor::File(File::new(file, d.file_perms, open_mode)))?)
}
OpenResult::File(file) => Ok(table.push(Descriptor::File(File::new(
file,
d.file_perms,
open_mode,
allow_blocking_current_thread,
)))?),

OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()),
}
Expand Down Expand Up @@ -734,7 +739,7 @@ impl<T: WasiView> HostDescriptor for T {
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let reader = FileInputStream::new(clone, offset);
let reader = FileInputStream::new(clone, offset, self.ctx().allow_blocking_current_thread);

// Insert the stream view into the table. Trap if the table is full.
let index = self.table().push(InputStream::File(reader))?;
Expand All @@ -758,7 +763,8 @@ impl<T: WasiView> HostDescriptor for T {
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let writer = FileOutputStream::write_at(clone, offset);
let writer =
FileOutputStream::write_at(clone, offset, self.ctx().allow_blocking_current_thread);
let writer: OutputStream = Box::new(writer);

// Insert the stream view into the table. Trap if the table is full.
Expand All @@ -781,7 +787,7 @@ impl<T: WasiView> HostDescriptor for T {
let clone = std::sync::Arc::clone(&f.file);

// Create a stream view for it.
let appender = FileOutputStream::append(clone);
let appender = FileOutputStream::append(clone, self.ctx().allow_blocking_current_thread);
let appender: OutputStream = Box::new(appender);

// Insert the stream view into the table. Trap if the table is full.
Expand Down
19 changes: 11 additions & 8 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,18 @@ impl RunCommand {
}
}

let result = wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| {
self.load_main_module(&mut store, &mut linker, &main, modules)
.with_context(|| {
format!(
"failed to run main module `{}`",
self.module_and_args[0].to_string_lossy()
)
})
});

// Load the main wasm module.
match self
.load_main_module(&mut store, &mut linker, &main, modules)
.with_context(|| {
format!(
"failed to run main module `{}`",
self.module_and_args[0].to_string_lossy()
)
}) {
match result {
Ok(()) => (),
Err(e) => {
// Exit the process if Wasmtime understands the error;
Expand Down

0 comments on commit 427ec06

Please sign in to comment.