diff --git a/crates/test-programs/tests/wasi-preview1-host-in-preview2.rs b/crates/test-programs/tests/wasi-preview1-host-in-preview2.rs index 79f619a8a3eb..4086694d57d2 100644 --- a/crates/test-programs/tests/wasi-preview1-host-in-preview2.rs +++ b/crates/test-programs/tests/wasi-preview1-host-in-preview2.rs @@ -258,13 +258,10 @@ async fn path_symlink_trailing_slashes() { run("path_symlink_trailing_slashes", false).await.unwrap() } #[test_log::test(tokio::test(flavor = "multi_thread"))] -#[should_panic] async fn poll_oneoff_files() { run("poll_oneoff_files", false).await.unwrap() } #[test_log::test(tokio::test(flavor = "multi_thread"))] -// This is a known bug with the preview 2 implementation: -#[should_panic] async fn poll_oneoff_stdio() { run("poll_oneoff_stdio", true).await.unwrap() } diff --git a/crates/wasi/src/preview2/preview1.rs b/crates/wasi/src/preview2/preview1.rs index b035796cd8f3..f738e71a7444 100644 --- a/crates/wasi/src/preview2/preview1.rs +++ b/crates/wasi/src/preview2/preview1.rs @@ -9,12 +9,12 @@ use crate::preview2::bindings::io::streams; use crate::preview2::{bindings, IsATTY, TableError, WasiView}; use anyhow::{anyhow, bail, Context}; use std::borrow::Borrow; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::mem::{self, size_of, size_of_val}; use std::ops::{Deref, DerefMut}; -use std::slice; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::{iter, slice}; use wasmtime::component::Resource; use wiggle::tracing::instrument; use wiggle::{GuestError, GuestPtr, GuestSlice, GuestSliceMut, GuestStrCow, GuestType}; @@ -82,15 +82,18 @@ impl BlockingMode { let (chunk, rest) = bytes.split_at(len); bytes = rest; - let borrow = Resource::new_borrow(output_stream.rep()); - Streams::blocking_write_and_flush(host, borrow, Vec::from(chunk)).await? + Streams::blocking_write_and_flush( + host, + output_stream.borrowed(), + Vec::from(chunk), + ) + .await? } Ok(total) } BlockingMode::NonBlocking => { - let borrow = Resource::new_borrow(output_stream.rep()); - let n = match Streams::check_write(host, borrow) { + let n = match Streams::check_write(host, output_stream.borrowed()) { Ok(n) => n, Err(e) if matches!(e.downcast_ref(), Some(streams::WriteError::Closed)) => 0, Err(e) => Err(e)?, @@ -101,8 +104,7 @@ impl BlockingMode { return Ok(0); } - let borrow = Resource::new_borrow(output_stream.rep()); - match Streams::write(host, borrow, bytes[..len].to_vec()) { + match Streams::write(host, output_stream.borrowed(), bytes[..len].to_vec()) { Ok(()) => {} Err(e) if matches!(e.downcast_ref(), Some(streams::WriteError::Closed)) => { return Ok(0) @@ -110,8 +112,7 @@ impl BlockingMode { Err(e) => Err(e)?, } - let borrow = Resource::new_borrow(output_stream.rep()); - match Streams::blocking_flush(host, borrow).await { + match Streams::blocking_flush(host, output_stream.borrowed()).await { Ok(()) => {} Err(e) if matches!(e.downcast_ref(), Some(streams::WriteError::Closed)) => { return Ok(0) @@ -127,9 +128,18 @@ impl BlockingMode { #[derive(Debug)] enum Descriptor { - Stdin { input_stream: u32, isatty: IsATTY }, - Stdout { output_stream: u32, isatty: IsATTY }, - Stderr { output_stream: u32, isatty: IsATTY }, + Stdin { + stream: Resource, + isatty: IsATTY, + }, + Stdout { + stream: Resource, + isatty: IsATTY, + }, + Stderr { + stream: Resource, + isatty: IsATTY, + }, PreopenDirectory((Resource, String)), File(File), } @@ -175,11 +185,10 @@ impl Descriptors { ) -> Result { let mut descriptors = Self::default(); descriptors.push(Descriptor::Stdin { - input_stream: host + stream: host .get_stdin() .context("failed to call `get-stdin`") - .map_err(types::Error::trap)? - .rep(), + .map_err(types::Error::trap)?, isatty: if let Some(term_in) = host .get_terminal_stdin() .context("failed to call `get-terminal-stdin`") @@ -194,11 +203,10 @@ impl Descriptors { }, })?; descriptors.push(Descriptor::Stdout { - output_stream: host + stream: host .get_stdout() .context("failed to call `get-stdout`") - .map_err(types::Error::trap)? - .rep(), + .map_err(types::Error::trap)?, isatty: if let Some(term_out) = host .get_terminal_stdout() .context("failed to call `get-terminal-stdout`") @@ -213,11 +221,10 @@ impl Descriptors { }, })?; descriptors.push(Descriptor::Stderr { - output_stream: host + stream: host .get_stderr() .context("failed to call `get-stderr`") - .map_err(types::Error::trap)? - .rep(), + .map_err(types::Error::trap)?, isatty: if let Some(term_out) = host .get_terminal_stderr() .context("failed to call `get-terminal-stderr`") @@ -1022,17 +1029,14 @@ impl< .remove(fd) .ok_or(types::Errno::Badf)?; match desc { - Descriptor::Stdin { input_stream, .. } => { - streams::HostInputStream::drop(self, Resource::new_own(input_stream)) - .context("failed to call `drop-input-stream`") - } - Descriptor::Stdout { output_stream, .. } | Descriptor::Stderr { output_stream, .. } => { - streams::HostOutputStream::drop(self, Resource::new_own(output_stream)) - .context("failed to call `drop-output-stream`") + Descriptor::Stdin { stream, .. } => streams::HostInputStream::drop(self, stream) + .context("failed to call `drop` on `input-stream`"), + Descriptor::Stdout { stream, .. } | Descriptor::Stderr { stream, .. } => { + streams::HostOutputStream::drop(self, stream) + .context("failed to call `drop` on `output-stream`") } Descriptor::File(File { fd, .. }) | Descriptor::PreopenDirectory((fd, _)) => { - filesystem::HostDescriptor::drop(self, fd) - .context("failed to call `drop-descriptor`") + filesystem::HostDescriptor::drop(self, fd).context("failed to call `drop`") } } .map_err(types::Error::trap) @@ -1349,8 +1353,8 @@ impl< (buf, read, state) } - Descriptor::Stdin { input_stream, .. } => { - let input = Resource::new_borrow(*input_stream); + Descriptor::Stdin { stream, .. } => { + let stream = stream.borrowed(); drop(t); let Some(buf) = first_non_empty_iovec(iovs)? else { return Ok(0); @@ -1358,7 +1362,7 @@ impl< let (read, state) = stream_res( streams::HostInputStream::blocking_read( self, - input, + stream, buf.len().try_into().unwrap_or(u64::MAX), ) .await, @@ -1437,53 +1441,56 @@ impl< ) -> Result { let t = self.transact()?; let desc = t.get_descriptor(fd)?; - match *desc { + match desc { Descriptor::File(File { - ref fd, + fd, blocking_mode, append, - ref position, + position, }) if t.view.table().get_resource(fd)?.is_file() => { let fd = fd.borrowed(); + let blocking_mode = *blocking_mode; let position = position.clone(); + let append = *append; drop(t); let Some(buf) = first_non_empty_ciovec(ciovs)? else { return Ok(0); }; let (stream, pos) = if append { - let stream = self.append_via_stream(fd.borrowed()).map_err(|e| { + let stream = self.append_via_stream(fd).map_err(|e| { e.try_into() .context("failed to call `append-via-stream`") .unwrap_or_else(types::Error::trap) })?; (stream, 0) } else { - let position = position.load(Ordering::Relaxed); - let stream = self - .write_via_stream(fd.borrowed(), position) - .map_err(|e| { - e.try_into() - .context("failed to call `write-via-stream`") - .unwrap_or_else(types::Error::trap) - })?; - (stream, position) + let pos = position.load(Ordering::Relaxed); + let stream = self.write_via_stream(fd, pos).map_err(|e| { + e.try_into() + .context("failed to call `write-via-stream`") + .unwrap_or_else(types::Error::trap) + })?; + (stream, pos) }; let n = blocking_mode.write(self, stream, &buf).await?; if !append { let pos = pos.checked_add(n as u64).ok_or(types::Errno::Overflow)?; position.store(pos, Ordering::Relaxed); } - Ok(n.try_into()?) + let n = n.try_into()?; + Ok(n) } - Descriptor::Stdout { output_stream, .. } | Descriptor::Stderr { output_stream, .. } => { + Descriptor::Stdout { stream, .. } | Descriptor::Stderr { stream, .. } => { + let stream = stream.borrowed(); drop(t); let Some(buf) = first_non_empty_ciovec(ciovs)? else { return Ok(0); }; - Ok(BlockingMode::Blocking - .write(self, Resource::new_borrow(output_stream), &buf) + let n = BlockingMode::Blocking + .write(self, stream, &buf) .await? - .try_into()?) + .try_into()?; + Ok(n) } _ => Err(types::Errno::Badf.into()), } @@ -1500,13 +1507,12 @@ impl< ) -> Result { let t = self.transact()?; let desc = t.get_descriptor(fd)?; - let n = match *desc { + let n = match desc { Descriptor::File(File { - ref fd, - blocking_mode, - .. + fd, blocking_mode, .. }) if t.view.table().get_resource(fd)?.is_file() => { let fd = fd.borrowed(); + let blocking_mode = *blocking_mode; drop(t); let Some(buf) = first_non_empty_ciovec(ciovs)? else { return Ok(0); @@ -1631,22 +1637,16 @@ impl< cookie: types::Dircookie, ) -> Result { let fd = self.get_dir_fd(fd)?; - let stream = self - .read_directory(Resource::new_borrow(fd.rep())) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `read-directory`") - .unwrap_or_else(types::Error::trap) - })?; - let dir_metadata_hash = self - .metadata_hash(Resource::new_borrow(fd.rep())) - .await - .map_err(|e| { - e.try_into() - .context("failed to call `metadata-hash`") - .unwrap_or_else(types::Error::trap) - })?; + let stream = self.read_directory(fd.borrowed()).await.map_err(|e| { + e.try_into() + .context("failed to call `read-directory`") + .unwrap_or_else(types::Error::trap) + })?; + let dir_metadata_hash = self.metadata_hash(fd.borrowed()).await.map_err(|e| { + e.try_into() + .context("failed to call `metadata-hash`") + .unwrap_or_else(types::Error::trap) + })?; let cookie = cookie.try_into().map_err(|_| types::Errno::Overflow)?; let head = [ @@ -1684,11 +1684,7 @@ impl< .unwrap_or_else(types::Error::trap) })?; let metadata_hash = self - .metadata_hash_at( - Resource::new_borrow(fd.rep()), - filesystem::PathFlags::empty(), - name.clone(), - ) + .metadata_hash_at(fd.borrowed(), filesystem::PathFlags::empty(), name.clone()) .await .map_err(|e| { e.try_into() @@ -1754,12 +1750,13 @@ impl< ) -> Result<(), types::Error> { let dirfd = self.get_dir_fd(dirfd)?; let path = read_string(path)?; - let borrow = Resource::new_borrow(dirfd.rep()); - self.create_directory_at(borrow, path).await.map_err(|e| { - e.try_into() - .context("failed to call `create-directory-at`") - .unwrap_or_else(types::Error::trap) - }) + self.create_directory_at(dirfd.borrowed(), path) + .await + .map_err(|e| { + e.try_into() + .context("failed to call `create-directory-at`") + .unwrap_or_else(types::Error::trap) + }) } /// Return the attributes of a file or directory. @@ -1781,11 +1778,7 @@ impl< data_modification_timestamp, status_change_timestamp, } = self - .stat_at( - Resource::new_borrow(dirfd.rep()), - flags.into(), - path.clone(), - ) + .stat_at(dirfd.borrowed(), flags.into(), path.clone()) .await .map_err(|e| { e.try_into() @@ -2022,8 +2015,7 @@ impl< let dirfd = self.get_dir_fd(dirfd)?; let src_path = read_string(src_path)?; let dest_path = read_string(dest_path)?; - let borrow = Resource::new_borrow(dirfd.rep()); - self.symlink_at(borrow, src_path, dest_path) + self.symlink_at(dirfd.borrowed(), src_path, dest_path) .await .map_err(|e| { e.try_into() @@ -2040,15 +2032,15 @@ impl< ) -> Result<(), types::Error> { let dirfd = self.get_dir_fd(dirfd)?; let path = path.as_cow()?.to_string(); - let borrow = Resource::new_borrow(dirfd.rep()); - self.unlink_file_at(borrow, path).await.map_err(|e| { - e.try_into() - .context("failed to call `unlink-file-at`") - .unwrap_or_else(types::Error::trap) - }) + self.unlink_file_at(dirfd.borrowed(), path) + .await + .map_err(|e| { + e.try_into() + .context("failed to call `unlink-file-at`") + .unwrap_or_else(types::Error::trap) + }) } - #[allow(unused_variables)] #[instrument(skip(self))] async fn poll_oneoff<'a>( &mut self, @@ -2056,7 +2048,250 @@ impl< events: &GuestPtr<'a, types::Event>, nsubscriptions: types::Size, ) -> Result { - todo!("preview1 poll_oneoff is not implemented") + if nsubscriptions == 0 { + // Indefinite sleeping is not supported in preview1. + return Err(types::Errno::Inval.into()); + } + let subs = subs.as_array(nsubscriptions); + let events = events.as_array(nsubscriptions); + + let n = usize::try_from(nsubscriptions).unwrap_or(usize::MAX); + let mut pollables = Vec::with_capacity(n); + for sub in subs.iter() { + let sub = sub?.read()?; + let p = match sub.u { + types::SubscriptionU::Clock(types::SubscriptionClock { + id, + timeout, + flags, + .. + }) => { + let absolute = flags.contains(types::Subclockflags::SUBSCRIPTION_CLOCK_ABSTIME); + let (timeout, absolute) = match id { + types::Clockid::Monotonic => (timeout, absolute), + types::Clockid::Realtime if !absolute => (timeout, false), + types::Clockid::Realtime => { + let now = wall_clock::Host::now(self) + .context("failed to call `wall_clock::now`") + .map_err(types::Error::trap)?; + + // Convert `timeout` to `Datetime` format. + let seconds = timeout / 1_000_000_000; + let nanoseconds = timeout % 1_000_000_000; + + let timeout = if now.seconds < seconds + || now.seconds == seconds + && u64::from(now.nanoseconds) < nanoseconds + { + // `now` is less than `timeout`, which is expressable as u64, + // substract the nanosecond counts directly + now.seconds * 1_000_000_000 + u64::from(now.nanoseconds) - timeout + } else { + 0 + }; + (timeout, false) + } + _ => return Err(types::Errno::Inval.into()), + }; + monotonic_clock::Host::subscribe(self, timeout, absolute) + .context("failed to call `monotonic_clock::subscribe`") + .map_err(types::Error::trap)? + } + types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite { + file_descriptor, + }) => { + let stream = { + let t = self.transact()?; + let desc = t.get_descriptor(file_descriptor)?; + match desc { + Descriptor::Stdin { stream, .. } => stream.borrowed(), + Descriptor::File(File { fd, position, .. }) + if t.view.table().get_resource(fd)?.is_file() => + { + let pos = position.load(Ordering::Relaxed); + let fd = fd.borrowed(); + drop(t); + self.read_via_stream(fd, pos).map_err(|e| { + e.try_into() + .context("failed to call `read-via-stream`") + .unwrap_or_else(types::Error::trap) + })? + } + // TODO: Support sockets + _ => return Err(types::Errno::Badf.into()), + } + }; + streams::HostInputStream::subscribe(self, stream) + .context("failed to call `subscribe` on `input-stream`") + .map_err(types::Error::trap)? + } + types::SubscriptionU::FdWrite(types::SubscriptionFdReadwrite { + file_descriptor, + }) => { + let stream = { + let t = self.transact()?; + let desc = t.get_descriptor(file_descriptor)?; + match desc { + Descriptor::Stdout { stream, .. } + | Descriptor::Stderr { stream, .. } => stream.borrowed(), + Descriptor::File(File { + fd, + position, + append, + .. + }) if t.view.table().get_resource(fd)?.is_file() => { + let fd = fd.borrowed(); + let position = position.clone(); + let append = *append; + drop(t); + if append { + self.append_via_stream(fd).map_err(|e| { + e.try_into() + .context("failed to call `append-via-stream`") + .unwrap_or_else(types::Error::trap) + })? + } else { + let pos = position.load(Ordering::Relaxed); + self.write_via_stream(fd, pos).map_err(|e| { + e.try_into() + .context("failed to call `write-via-stream`") + .unwrap_or_else(types::Error::trap) + })? + } + } + // TODO: Support sockets + _ => return Err(types::Errno::Badf.into()), + } + }; + streams::HostOutputStream::subscribe(self, stream) + .context("failed to call `subscribe` on `output-stream`") + .map_err(types::Error::trap)? + } + }; + pollables.push(p); + } + let ready: HashSet<_> = self + .poll_list(pollables) + .await + .context("failed to call `poll-oneoff`") + .map_err(types::Error::trap)? + .into_iter() + .collect(); + + let mut count: types::Size = 0; + for (sub, event) in iter::zip(0.., subs.iter().zip(events.iter())) + .filter_map(|(idx, (sub, event))| ready.contains(&idx).then_some((sub, event))) + { + let sub = sub?.read()?; + let event = event?; + let e = match sub.u { + types::SubscriptionU::Clock(..) => types::Event { + userdata: sub.userdata, + error: types::Errno::Success, + type_: types::Eventtype::Clock, + fd_readwrite: types::EventFdReadwrite { + flags: types::Eventrwflags::empty(), + nbytes: 0, + }, + }, + types::SubscriptionU::FdRead(types::SubscriptionFdReadwrite { + file_descriptor, + }) => { + let t = self.transact()?; + let desc = t.get_descriptor(file_descriptor)?; + match desc { + Descriptor::Stdin { .. } => types::Event { + userdata: sub.userdata, + error: types::Errno::Success, + type_: types::Eventtype::FdRead, + fd_readwrite: types::EventFdReadwrite { + flags: types::Eventrwflags::empty(), + nbytes: 1, + }, + }, + Descriptor::File(File { fd, position, .. }) + if t.view.table().get_resource(fd)?.is_file() => + { + let fd = fd.borrowed(); + let position = position.clone(); + drop(t); + match self + .stat(fd) + .await + .map_err(|e| e.try_into().context("failed to call `stat`")) + { + Ok(filesystem::DescriptorStat { size, .. }) => { + let pos = position.load(Ordering::Relaxed); + let nbytes = size.saturating_sub(pos); + types::Event { + userdata: sub.userdata, + error: types::Errno::Success, + type_: types::Eventtype::FdRead, + fd_readwrite: types::EventFdReadwrite { + flags: if nbytes == 0 { + types::Eventrwflags::FD_READWRITE_HANGUP + } else { + types::Eventrwflags::empty() + }, + nbytes: 1, + }, + } + } + Err(Ok(error)) => types::Event { + userdata: sub.userdata, + error, + type_: types::Eventtype::FdRead, + fd_readwrite: types::EventFdReadwrite { + flags: types::Eventrwflags::empty(), + nbytes: 1, + }, + }, + Err(Err(error)) => return Err(types::Error::trap(error)), + } + } + // TODO: Support sockets + _ => return Err(types::Errno::Badf.into()), + } + } + types::SubscriptionU::FdWrite(types::SubscriptionFdReadwrite { + file_descriptor, + }) => { + let t = self.transact()?; + let desc = t.get_descriptor(file_descriptor)?; + match desc { + Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => types::Event { + userdata: sub.userdata, + error: types::Errno::Success, + type_: types::Eventtype::FdWrite, + fd_readwrite: types::EventFdReadwrite { + flags: types::Eventrwflags::empty(), + nbytes: 1, + }, + }, + Descriptor::File(File { fd, .. }) + if t.view.table().get_resource(fd)?.is_file() => + { + types::Event { + userdata: sub.userdata, + error: types::Errno::Success, + type_: types::Eventtype::FdWrite, + fd_readwrite: types::EventFdReadwrite { + flags: types::Eventrwflags::empty(), + nbytes: 1, + }, + } + } + // TODO: Support sockets + _ => return Err(types::Errno::Badf.into()), + } + } + }; + event.write(e)?; + count = count + .checked_add(1) + .ok_or_else(|| types::Error::from(types::Errno::Overflow))? + } + Ok(count) } #[instrument(skip(self))]