Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(driver,polling): remove is_blocking #169

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 34 additions & 22 deletions compio-driver/src/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,13 @@ pub(crate) use crate::unix::RawOp;

/// Abstraction of operations.
pub trait OpCode {
/// Determines that the operation is really non-blocking defined by POSIX.
/// If not, the driver will try to operate it in another thread.
fn is_nonblocking(&self) -> bool {
true
}

/// Perform the operation before submit, and return [`Decision`] to
/// indicate whether submitting the operation to polling is required.
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;

/// Perform the operation after received corresponding
/// event.
/// event. If this operation is blocking, the return value should be
/// [`Poll::Ready`].
fn on_event(self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>>;
}

Expand All @@ -48,6 +43,8 @@ pub enum Decision {
Completed(usize),
/// Async operation, needs to submit
Wait(WaitArg),
/// Blocking operation, needs to be spawned in another thread
Blocking(Event),
}

impl Decision {
Expand All @@ -65,6 +62,21 @@ impl Decision {
pub fn wait_writable(fd: RawFd) -> Self {
Self::wait_for(fd, Interest::Writable)
}

/// Decide to spawn a blocking task with a dummy event.
pub fn blocking_dummy() -> Self {
Self::Blocking(Event::none(0))
}

/// Decide to spawn a blocking task with a readable event.
pub fn blocking_readable(fd: RawFd) -> Self {
Self::Blocking(Event::readable(fd as _))
}

/// Decide to spawn a blocking task with a writable event.
pub fn blocking_writable(fd: RawFd) -> Self {
Self::Blocking(Event::writable(fd as _))
}
}

/// Meta of polling operations.
Expand Down Expand Up @@ -211,24 +223,25 @@ impl Driver {
Poll::Ready(Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
} else {
let op_pin = op.as_pin();
if op_pin.is_nonblocking() {
match op_pin.pre_submit() {
Ok(Decision::Wait(arg)) => {
self.submit(user_data, arg)?;
match op_pin.pre_submit() {
Ok(Decision::Wait(arg)) => {
self.submit(user_data, arg)?;
Poll::Pending
}
Ok(Decision::Completed(res)) => Poll::Ready(Ok(res)),
Ok(Decision::Blocking(event)) => {
if self.push_blocking(user_data, op, event) {
Poll::Pending
} else {
Poll::Ready(Err(io::Error::from_raw_os_error(libc::EBUSY)))
}
Ok(Decision::Completed(res)) => Poll::Ready(Ok(res)),
Err(err) => Poll::Ready(Err(err)),
}
} else if self.push_blocking(user_data, op) {
Poll::Pending
} else {
Poll::Ready(Err(io::Error::from_raw_os_error(libc::EBUSY)))
Err(err) => Poll::Ready(Err(err)),
}
}
}

fn push_blocking(&mut self, user_data: usize, op: &mut RawOp) -> bool {
fn push_blocking(&mut self, user_data: usize, op: &mut RawOp, event: Event) -> bool {
// Safety: the RawOp is not released before the operation returns.
struct SendWrapper<T>(T);
unsafe impl<T> Send for SendWrapper<T> {}
Expand All @@ -242,10 +255,9 @@ impl Driver {
let mut op = op;
let op = unsafe { op.0.as_mut() };
let op_pin = op.as_pin();
let res = match op_pin.pre_submit() {
Ok(Decision::Wait(_)) => unreachable!("this operation is not non-blocking"),
Ok(Decision::Completed(res)) => Ok(res),
Err(err) => Err(err),
let res = match op_pin.on_event(&event) {
Poll::Pending => unreachable!("this operation is not non-blocking"),
Poll::Ready(res) => res,
};
completed.push(Entry::new(user_data, res));
poll.notify().ok();
Expand Down
166 changes: 56 additions & 110 deletions compio-driver/src/poll/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,33 @@ use crate::op::*;
pub use crate::unix::op::*;

impl OpCode for OpenFile {
fn is_nonblocking(&self) -> bool {
false
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::blocking_dummy())
}

fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Completed(syscall!(open(
fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(syscall!(open(
self.path.as_ptr(),
self.flags,
self.mode as libc::c_int
))? as _))
}

fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
unreachable!("OpenFile operation should not be submitted to polling")
}
}

impl OpCode for CloseFile {
fn is_nonblocking(&self) -> bool {
false
}

fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Completed(syscall!(libc::close(self.fd))? as _))
Ok(Decision::blocking_dummy())
}

fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
unreachable!("CloseFile operation should not be submitted to polling")
}
}

impl<T: IoBufMut> ReadAt<T> {
unsafe fn call(&mut self) -> libc::ssize_t {
let fd = self.fd;
let slice = self.buffer.as_mut_slice();
pread(
fd,
slice.as_mut_ptr() as _,
slice.len() as _,
self.offset as _,
)
Poll::Ready(Ok(syscall!(libc::close(self.fd))? as _))
}
}

impl<T: IoBufMut> OpCode for ReadAt<T> {
fn is_nonblocking(&self) -> bool {
cfg!(not(any(target_os = "linux", target_os = "android")))
}

fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
if cfg!(any(target_os = "linux", target_os = "android")) {
Ok(Decision::Completed(syscall!(self.call())? as _))
Ok(Decision::blocking_readable(self.fd))
} else {
Ok(Decision::wait_readable(self.fd))
}
Expand All @@ -79,30 +54,23 @@ impl<T: IoBufMut> OpCode for ReadAt<T> {
fn on_event(mut self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
debug_assert!(event.readable);

syscall!(break self.call())
}
}

impl<T: IoVectoredBufMut> ReadVectoredAt<T> {
unsafe fn call(&mut self) -> libc::ssize_t {
self.slices = unsafe { self.buffer.as_io_slices_mut() };
preadv(
self.fd,
self.slices.as_ptr() as _,
self.slices.len() as _,
self.offset as _,
let fd = self.fd;
let slice = self.buffer.as_mut_slice();
syscall!(
break pread(
fd,
slice.as_mut_ptr() as _,
slice.len() as _,
self.offset as _,
)
)
}
}

impl<T: IoVectoredBufMut> OpCode for ReadVectoredAt<T> {
fn is_nonblocking(&self) -> bool {
cfg!(not(any(target_os = "linux", target_os = "android")))
}

fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
if cfg!(any(target_os = "linux", target_os = "android")) {
Ok(Decision::Completed(syscall!(self.call())? as _))
Ok(Decision::blocking_readable(self.fd))
} else {
Ok(Decision::wait_readable(self.fd))
}
Expand All @@ -111,30 +79,22 @@ impl<T: IoVectoredBufMut> OpCode for ReadVectoredAt<T> {
fn on_event(mut self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
debug_assert!(event.readable);

syscall!(break self.call())
}
}

impl<T: IoBuf> WriteAt<T> {
unsafe fn call(&self) -> libc::ssize_t {
let slice = self.buffer.as_slice();
pwrite(
self.fd,
slice.as_ptr() as _,
slice.len() as _,
self.offset as _,
self.slices = unsafe { self.buffer.as_io_slices_mut() };
syscall!(
break preadv(
self.fd,
self.slices.as_ptr() as _,
self.slices.len() as _,
self.offset as _,
)
)
}
}

impl<T: IoBuf> OpCode for WriteAt<T> {
fn is_nonblocking(&self) -> bool {
cfg!(not(any(target_os = "linux", target_os = "android")))
}

fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
if cfg!(any(target_os = "linux", target_os = "android")) {
Ok(Decision::Completed(syscall!(self.call())? as _))
Ok(Decision::blocking_writable(self.fd))
} else {
Ok(Decision::wait_writable(self.fd))
}
Expand All @@ -143,30 +103,22 @@ impl<T: IoBuf> OpCode for WriteAt<T> {
fn on_event(self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
debug_assert!(event.writable);

syscall!(break self.call())
}
}

impl<T: IoVectoredBuf> WriteVectoredAt<T> {
unsafe fn call(&mut self) -> libc::ssize_t {
self.slices = unsafe { self.buffer.as_io_slices() };
pwritev(
self.fd,
self.slices.as_ptr() as _,
self.slices.len() as _,
self.offset as _,
let slice = self.buffer.as_slice();
syscall!(
break pwrite(
self.fd,
slice.as_ptr() as _,
slice.len() as _,
self.offset as _,
)
)
}
}

impl<T: IoVectoredBuf> OpCode for WriteVectoredAt<T> {
fn is_nonblocking(&self) -> bool {
cfg!(not(any(target_os = "linux", target_os = "android")))
}

fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
if cfg!(any(target_os = "linux", target_os = "android")) {
Ok(Decision::Completed(syscall!(self.call())? as _))
Ok(Decision::blocking_writable(self.fd))
} else {
Ok(Decision::wait_writable(self.fd))
}
Expand All @@ -175,16 +127,24 @@ impl<T: IoVectoredBuf> OpCode for WriteVectoredAt<T> {
fn on_event(mut self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
debug_assert!(event.writable);

syscall!(break self.call())
self.slices = unsafe { self.buffer.as_io_slices() };
syscall!(
break pwritev(
self.fd,
self.slices.as_ptr() as _,
self.slices.len() as _,
self.offset as _,
)
)
}
}

impl OpCode for Sync {
fn is_nonblocking(&self) -> bool {
false
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::blocking_dummy())
}

fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
#[cfg(any(
target_os = "android",
target_os = "freebsd",
Expand All @@ -194,7 +154,7 @@ impl OpCode for Sync {
target_os = "netbsd"
))]
{
Ok(Decision::Completed(syscall!(if self.datasync {
Poll::Ready(Ok(syscall!(if self.datasync {
libc::fdatasync(self.fd)
} else {
libc::fsync(self.fd)
Expand All @@ -209,42 +169,28 @@ impl OpCode for Sync {
target_os = "netbsd"
)))]
{
Ok(Decision::Completed(syscall!(libc::fsync(self.fd))? as _))
Poll::Ready(Ok(syscall!(libc::fsync(self.fd))? as _))
}
}

fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
unreachable!("Sync operation should not be submitted to polling")
}
}

impl OpCode for ShutdownSocket {
fn is_nonblocking(&self) -> bool {
false
}

fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Completed(
syscall!(libc::shutdown(self.fd, self.how()))? as _,
))
Ok(Decision::blocking_dummy())
}

fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
unreachable!("CreateSocket operation should not be submitted to polling")
Poll::Ready(Ok(syscall!(libc::shutdown(self.fd, self.how()))? as _))
}
}

impl OpCode for CloseSocket {
fn is_nonblocking(&self) -> bool {
false
}

fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Completed(syscall!(libc::close(self.fd))? as _))
Ok(Decision::blocking_dummy())
}

fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
unreachable!("CloseSocket operation should not be submitted to polling")
Poll::Ready(Ok(syscall!(libc::close(self.fd))? as _))
}
}

Expand Down