Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize perfermance of fd_read/fd_write #8303

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions crates/wasi/src/preview1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use std::ops::{Deref, DerefMut};
use std::slice;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use system_interface::fs::FileIoExt;
use wasmtime::component::Resource;
use wiggle::tracing::instrument;
use wiggle::{GuestError, GuestPtr, GuestStrCow, GuestType};
Expand Down Expand Up @@ -276,6 +277,32 @@ impl BlockingMode {
}
}
}

fn block_write(
&self,
file: &super::filesystem::File,
bytes: GuestPtr<'_, [u8]>,
append: bool,
mut pos: u64,
) -> Result<usize> {
let buf = bytes.as_cow()?;
let mut buf = buf.deref();

let mut total: usize = 0;
while !buf.is_empty() {
let nwritten = if append {
file.file.append(buf)
} else {
file.file.write_at(buf, pos)
};
let nwritten = nwritten.or_else(|e| Err(StreamError::LastOperationFailed(e.into())))?;
pos = pos.checked_add(nwritten as u64).expect("add overflow");
(_, buf) = buf.split_at(nwritten);
total = total.checked_add(nwritten).expect("add overflow");
}

Ok(total)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1550,37 +1577,46 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx {
) -> Result<types::Size, types::Error> {
let t = self.transact()?;
let desc = t.get_descriptor(fd)?;
let (buf, read) = match desc {
match desc {
Descriptor::File(File {
fd,
blocking_mode,
blocking_mode: _,
position,
..
}) => {
let fd = fd.borrowed();
let blocking_mode = *blocking_mode;
let position = position.clone();
drop(t);
let Some(buf) = first_non_empty_iovec(iovs)? else {
let pos = position.load(Ordering::Relaxed);
let file = self.table().get(&fd)?.file()?;
let Some(iov) = first_non_empty_iovec(iovs)? else {
return Ok(0);
};
let is_shared_memory = iov.is_shared_memory();
let bytes_read = if is_shared_memory {
let mut buf = vec![0; iov.len() as usize];
let bytes_read = file
.file
.read_at(&mut buf[..], pos)
.or_else(|e| Err(StreamError::LastOperationFailed(e.into())))?;
iov.copy_from_slice(&buf[0..bytes_read])?;
bytes_read
} else {
let mut buf = iov.as_slice_mut()?.expect("get none");
let buf = buf.deref_mut();
let bytes_read = file
.file
.read_at(buf, pos)
.or_else(|e| Err(StreamError::LastOperationFailed(e.into())))?;
bytes_read
};

let pos = position.load(Ordering::Relaxed);
let stream = self.read_via_stream(fd.borrowed(), pos).map_err(|e| {
e.try_into()
.context("failed to call `read-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let read = blocking_mode
.read(self, stream.borrowed(), buf.len().try_into()?)
.await;
streams::HostInputStream::drop(self, stream).map_err(|e| types::Error::trap(e))?;
let read = read?;
let n = read.len().try_into()?;
let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?;
let pos = pos
.checked_add(bytes_read.try_into()?)
.ok_or(types::Errno::Overflow)?;
position.store(pos, Ordering::Relaxed);

(buf, read)
Ok(bytes_read.try_into()?)
}
Descriptor::Stdin { stream, .. } => {
let stream = stream.borrowed();
Expand All @@ -1591,17 +1627,16 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx {
let read = BlockingMode::Blocking
.read(self, stream, buf.len().try_into()?)
.await?;
(buf, read)
if read.len() > buf.len().try_into()? {
return Err(types::Errno::Range.into());
}
let buf = buf.get_range(0..u32::try_from(read.len())?).unwrap();
buf.copy_from_slice(&read)?;
let n = read.len().try_into()?;
Ok(n)
}
_ => return Err(types::Errno::Badf.into()),
};
if read.len() > buf.len().try_into()? {
return Err(types::Errno::Range.into());
}
let buf = buf.get_range(0..u32::try_from(read.len())?).unwrap();
buf.copy_from_slice(&read)?;
let n = read.len().try_into()?;
Ok(n)
}

/// Read from a file descriptor, without using and updating the file descriptor's offset.
Expand Down Expand Up @@ -1670,42 +1705,26 @@ impl wasi_snapshot_preview1::WasiSnapshotPreview1 for WasiP1Ctx {
position,
}) => {
let fd = fd.borrowed();
let fd2 = fd.borrowed();
let blocking_mode = *blocking_mode;
let position = position.clone();
let pos = position.load(Ordering::Relaxed);
let append = *append;
drop(t);
let f = self.table().get(&fd)?.file()?;
let Some(buf) = first_non_empty_ciovec(ciovs)? else {
return Ok(0);
};
let (stream, pos) = if append {
let stream = self.append_via_stream(fd).map_err(|e| {
e.try_into()
.context("failed to call `append-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
(stream, 0)
} else {
let pos = position.load(Ordering::Relaxed);
let stream = self.write_via_stream(fd, pos).map_err(|e| {
e.try_into()
.context("failed to call `write-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
(stream, pos)
};
let n = blocking_mode.write(self, stream.borrowed(), buf).await;
streams::HostOutputStream::drop(self, stream).map_err(|e| types::Error::trap(e))?;
let n = n?;
let nwritten = blocking_mode.block_write(f, buf, append, pos)?;
if append {
let len = self.stat(fd2).await?;
let len = self.stat(fd).await?;
position.store(len.size, Ordering::Relaxed);
} else {
let pos = pos.checked_add(n as u64).ok_or(types::Errno::Overflow)?;
let pos = pos
.checked_add(nwritten as u64)
.ok_or(types::Errno::Overflow)?;
position.store(pos, Ordering::Relaxed);
}
let n = n.try_into()?;
Ok(n)
Ok(nwritten.try_into()?)
}
Descriptor::Stdout { stream, .. } | Descriptor::Stderr { stream, .. } => {
let stream = stream.borrowed();
Expand Down