diff --git a/lightproc/src/catch_unwind.rs b/lightproc/src/catch_unwind.rs index 00e4de50..05c550ad 100644 --- a/lightproc/src/catch_unwind.rs +++ b/lightproc/src/catch_unwind.rs @@ -6,30 +6,29 @@ use std::pin::Pin; use std::task::{Context, Poll}; #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CatchUnwind +pub(crate) struct CatchUnwind where - Fut: Future, + F: Future, { - future: Fut, + future: F, } -impl CatchUnwind +impl CatchUnwind where - Fut: Future + UnwindSafe, + F: Future + UnwindSafe, { - unsafe_pinned!(future: Fut); + unsafe_pinned!(future: F); - pub(super) fn new(future: Fut) -> CatchUnwind { + pub(crate) fn new(future: F) -> CatchUnwind { CatchUnwind { future } } } -impl Future for CatchUnwind +impl Future for CatchUnwind where - Fut: Future + UnwindSafe, + F: Future + UnwindSafe, { - type Output = Result>; + type Output = Result>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok) diff --git a/lightproc/src/lib.rs b/lightproc/src/lib.rs index 1c902c95..46d3d493 100644 --- a/lightproc/src/lib.rs +++ b/lightproc/src/lib.rs @@ -1,21 +1,19 @@ +mod catch_unwind; mod layout_helpers; mod proc_data; +mod proc_ext; mod proc_layout; mod proc_vtable; mod raw_proc; mod state; -pub mod catch_unwind; pub mod lightproc; -pub mod proc_ext; pub mod proc_handle; pub mod proc_stack; pub mod recoverable_handle; pub mod prelude { - pub use crate::catch_unwind::*; pub use crate::lightproc::*; - pub use crate::proc_ext::*; pub use crate::proc_handle::*; pub use crate::proc_stack::*; pub use crate::recoverable_handle::*; diff --git a/lightproc/src/lightproc.rs b/lightproc/src/lightproc.rs index 9693b34e..b5feb2ff 100644 --- a/lightproc/src/lightproc.rs +++ b/lightproc/src/lightproc.rs @@ -1,16 +1,15 @@ -use std::fmt; -use std::future::Future; -use std::marker::PhantomData; -use std::mem; -use std::ptr::NonNull; - use crate::proc_data::ProcData; use crate::proc_ext::ProcFutureExt; use crate::proc_handle::ProcHandle; use crate::proc_stack::*; use crate::raw_proc::RawProc; use crate::recoverable_handle::RecoverableHandle; +use std::fmt::{self, Debug, Formatter}; +use std::future::Future; +use std::marker::PhantomData; +use std::mem; use std::panic::AssertUnwindSafe; +use std::ptr::NonNull; pub struct LightProc { /// A pointer to the heap-allocated proc. @@ -91,6 +90,18 @@ impl LightProc { } } +impl Debug for LightProc { + fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + fmt.debug_struct("LightProc") + .field("pdata", unsafe { &(*pdata) }) + .field("stack", self.stack()) + .finish() + } +} + impl Drop for LightProc { fn drop(&mut self) { let ptr = self.raw_proc.as_ptr(); @@ -108,15 +119,3 @@ impl Drop for LightProc { } } } - -impl fmt::Debug for LightProc { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ptr = self.raw_proc.as_ptr(); - let pdata = ptr as *const ProcData; - - f.debug_struct("LightProc") - .field("pdata", unsafe { &(*pdata) }) - .field("stack", self.stack()) - .finish() - } -} diff --git a/lightproc/src/proc_data.rs b/lightproc/src/proc_data.rs index 42bdde1b..b47222f1 100644 --- a/lightproc/src/proc_data.rs +++ b/lightproc/src/proc_data.rs @@ -1,16 +1,14 @@ +use crate::layout_helpers::extend; +use crate::proc_stack::ProcStack; +use crate::proc_vtable::ProcVTable; +use crate::state::*; +use crossbeam_utils::Backoff; use std::alloc::Layout; use std::cell::Cell; -use std::fmt; +use std::fmt::{self, Debug, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Waker; -use crossbeam_utils::Backoff; - -use crate::layout_helpers::extend; -use crate::proc_stack::*; -use crate::proc_vtable::ProcVTable; -use crate::state::*; - /// The pdata of a proc. /// /// This pdata is stored right at the beginning of every heap-allocated proc. @@ -131,23 +129,24 @@ impl ProcData { #[inline] pub(crate) fn offset_stack() -> usize { let layout_pdata = Layout::new::(); - let layout_t = Layout::new::(); - let (_, offset_t) = extend(layout_pdata, layout_t); - offset_t + let layout_stack = Layout::new::(); + let (_, offset_stack) = extend(layout_pdata, layout_stack); + offset_stack } } -impl fmt::Debug for ProcData { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +impl Debug for ProcData { + fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result { let state = self.state.load(Ordering::SeqCst); - f.debug_struct("ProcData") + fmt.debug_struct("ProcData") .field("scheduled", &(state & SCHEDULED != 0)) .field("running", &(state & RUNNING != 0)) .field("completed", &(state & COMPLETED != 0)) .field("closed", &(state & CLOSED != 0)) - .field("awaiter", &(state & AWAITER != 0)) .field("handle", &(state & HANDLE != 0)) + .field("awaiter", &(state & AWAITER != 0)) + .field("locked", &(state & LOCKED != 0)) .field("ref_count", &(state / REFERENCE)) .finish() } diff --git a/lightproc/src/proc_ext.rs b/lightproc/src/proc_ext.rs index 4fb5c319..a3baa12d 100644 --- a/lightproc/src/proc_ext.rs +++ b/lightproc/src/proc_ext.rs @@ -2,7 +2,7 @@ use crate::catch_unwind::CatchUnwind; use std::future::Future; use std::panic::UnwindSafe; -pub trait ProcFutureExt: Future { +pub(crate) trait ProcFutureExt: Future { fn catch_unwind(self) -> CatchUnwind where Self: Sized + UnwindSafe, diff --git a/lightproc/src/proc_handle.rs b/lightproc/src/proc_handle.rs index 2327185a..f87c6e8f 100644 --- a/lightproc/src/proc_handle.rs +++ b/lightproc/src/proc_handle.rs @@ -1,4 +1,7 @@ -use std::fmt; +use crate::proc_data::ProcData; +use crate::proc_stack::ProcStack; +use crate::state::*; +use std::fmt::{self, Debug, Formatter}; use std::future::Future; use std::marker::{PhantomData, Unpin}; use std::pin::Pin; @@ -6,10 +9,6 @@ use std::ptr::NonNull; use std::sync::atomic::Ordering; use std::task::{Context, Poll}; -use crate::proc_data::ProcData; -use crate::proc_stack::*; -use crate::state::*; - /// A handle that awaits the result of a proc. /// /// This type is a future that resolves to an `Option` where: @@ -94,6 +93,85 @@ impl ProcHandle { } } +impl Future for ProcHandle { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + unsafe { + let mut state = (*pdata).state.load(Ordering::Acquire); + + loop { + // If the proc has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // Even though the awaiter is most likely the current proc, it could also be + // another proc. + (*pdata).notify_unless(cx.waker()); + return Poll::Ready(None); + } + + // If the proc is not completed, register the current proc. + if state & COMPLETED == 0 { + // Replace the waker with one associated with the current proc. We need a + // safeguard against panics because dropping the previous waker can panic. + (*pdata).swap_awaiter(Some(cx.waker().clone())); + + // Reload the state after registering. It is possible that the proc became + // completed or closed just before registration so we need to check for that. + state = (*pdata).state.load(Ordering::Acquire); + + // If the proc has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // Even though the awaiter is most likely the current proc, it could also + // be another proc. + (*pdata).notify_unless(cx.waker()); + return Poll::Ready(None); + } + + // If the proc is still not completed, we're blocked on it. + if state & COMPLETED == 0 { + return Poll::Pending; + } + } + + // Since the proc is now completed, mark it as closed in order to grab its output. + match (*pdata).state.compare_exchange( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter. Even though the awaiter is most likely the current + // proc, it could also be another proc. + if state & AWAITER != 0 { + (*pdata).notify_unless(cx.waker()); + } + + // Take the output from the proc. + let output = ((*pdata).vtable.get_output)(ptr) as *mut R; + return Poll::Ready(Some(output.read())); + } + Err(s) => state = s, + } + } + } + } +} + +impl Debug for ProcHandle { + fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result { + let ptr = self.raw_proc.as_ptr(); + let pdata = ptr as *const ProcData; + + fmt.debug_struct("ProcHandle") + .field("pdata", unsafe { &(*pdata) }) + .finish() + } +} + impl Drop for ProcHandle { fn drop(&mut self) { let ptr = self.raw_proc.as_ptr(); @@ -173,82 +251,3 @@ impl Drop for ProcHandle { drop(output); } } - -impl Future for ProcHandle { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let ptr = self.raw_proc.as_ptr(); - let pdata = ptr as *const ProcData; - - unsafe { - let mut state = (*pdata).state.load(Ordering::Acquire); - - loop { - // If the proc has been closed, notify the awaiter and return `None`. - if state & CLOSED != 0 { - // Even though the awaiter is most likely the current proc, it could also be - // another proc. - (*pdata).notify_unless(cx.waker()); - return Poll::Ready(None); - } - - // If the proc is not completed, register the current proc. - if state & COMPLETED == 0 { - // Replace the waker with one associated with the current proc. We need a - // safeguard against panics because dropping the previous waker can panic. - (*pdata).swap_awaiter(Some(cx.waker().clone())); - - // Reload the state after registering. It is possible that the proc became - // completed or closed just before registration so we need to check for that. - state = (*pdata).state.load(Ordering::Acquire); - - // If the proc has been closed, notify the awaiter and return `None`. - if state & CLOSED != 0 { - // Even though the awaiter is most likely the current proc, it could also - // be another proc. - (*pdata).notify_unless(cx.waker()); - return Poll::Ready(None); - } - - // If the proc is still not completed, we're blocked on it. - if state & COMPLETED == 0 { - return Poll::Pending; - } - } - - // Since the proc is now completed, mark it as closed in order to grab its output. - match (*pdata).state.compare_exchange( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Notify the awaiter. Even though the awaiter is most likely the current - // proc, it could also be another proc. - if state & AWAITER != 0 { - (*pdata).notify_unless(cx.waker()); - } - - // Take the output from the proc. - let output = ((*pdata).vtable.get_output)(ptr) as *mut R; - return Poll::Ready(Some(output.read())); - } - Err(s) => state = s, - } - } - } - } -} - -impl fmt::Debug for ProcHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ptr = self.raw_proc.as_ptr(); - let pdata = ptr as *const ProcData; - - f.debug_struct("ProcHandle") - .field("pdata", unsafe { &(*pdata) }) - .finish() - } -} diff --git a/lightproc/src/proc_layout.rs b/lightproc/src/proc_layout.rs index df719784..3355305e 100644 --- a/lightproc/src/proc_layout.rs +++ b/lightproc/src/proc_layout.rs @@ -1,19 +1,19 @@ use std::alloc::Layout; #[derive(Clone, Copy)] -pub(crate) struct TaskLayout { +pub(crate) struct ProcLayout { /// Memory layout of the whole proc. pub(crate) layout: Layout, /// Offset into the proc at which the stack is stored. - pub(crate) offset_t: usize, + pub(crate) offset_stack: usize, /// Offset into the proc at which the schedule function is stored. - pub(crate) offset_s: usize, + pub(crate) offset_schedule: usize, /// Offset into the proc at which the future is stored. - pub(crate) offset_f: usize, + pub(crate) offset_future: usize, /// Offset into the proc at which the output is stored. - pub(crate) offset_r: usize, + pub(crate) offset_output: usize, } diff --git a/lightproc/src/proc_stack.rs b/lightproc/src/proc_stack.rs index c1e02fb9..aa1cf31c 100644 --- a/lightproc/src/proc_stack.rs +++ b/lightproc/src/proc_stack.rs @@ -1,5 +1,4 @@ -use std::fmt; -use std::fmt::{Error, Formatter}; +use std::fmt::{self, Debug, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -7,13 +6,14 @@ use std::sync::Arc; pub struct ProcStack { pub pid: AtomicUsize, - // Before action callbacks - pub before_start: Option>, + // Before action callback + pub(crate) before_start: Option>, - // After action callbacks - pub after_complete: Option>, + // After action callback + pub(crate) after_complete: Option>, - pub after_panic: Option>, + // After panic callback + pub(crate) after_panic: Option>, } impl ProcStack { @@ -47,9 +47,9 @@ impl ProcStack { } } -impl fmt::Debug for ProcStack { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { - f.debug_struct("ProcStack") +impl Debug for ProcStack { + fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ProcStack") .field("pid", &self.pid.load(Ordering::SeqCst)) .finish() } diff --git a/lightproc/src/raw_proc.rs b/lightproc/src/raw_proc.rs index 8364cf85..fb4b81f2 100644 --- a/lightproc/src/raw_proc.rs +++ b/lightproc/src/raw_proc.rs @@ -1,23 +1,21 @@ +use crate::catch_unwind::CatchUnwind; +use crate::layout_helpers::extend; +use crate::lightproc::LightProc; +use crate::proc_data::ProcData; +use crate::proc_layout::ProcLayout; +use crate::proc_stack::ProcStack; +use crate::proc_vtable::ProcVTable; +use crate::state::*; use std::alloc::{self, Layout}; use std::cell::Cell; use std::future::Future; use std::mem::{self, ManuallyDrop}; +use std::panic::AssertUnwindSafe; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use crate::catch_unwind::CatchUnwind; -use crate::layout_helpers::extend; -use crate::lightproc::LightProc; -use crate::proc_data::ProcData; -use crate::proc_layout::TaskLayout; -use crate::proc_stack::*; -use crate::proc_vtable::ProcVTable; -use crate::state::*; - -use std::panic::AssertUnwindSafe; - /// Raw pointers to the fields of a proc. pub(crate) struct RawProc { pub(crate) pdata: *const ProcData, @@ -27,19 +25,12 @@ pub(crate) struct RawProc { pub(crate) output: *mut R, } -impl Copy for RawProc {} - -impl Clone for RawProc { - fn clone(&self) -> Self { - Self { - pdata: self.pdata, - schedule: self.schedule, - stack: self.stack, - future: self.future, - output: self.output, - } - } -} +/// A guard that closes the proc if polling its future panics. +struct Guard(RawProc) +where + F: Future + Send + 'static, + R: Send + 'static, + S: Fn(LightProc) + Send + Sync + 'static; impl RawProc where @@ -105,40 +96,40 @@ where unsafe { Self { pdata: p as *const ProcData, - stack: p.add(proc_layout.offset_t) as *mut ProcStack, - schedule: p.add(proc_layout.offset_s) as *const S, - future: p.add(proc_layout.offset_f) as *mut F, - output: p.add(proc_layout.offset_r) as *mut R, + stack: p.add(proc_layout.offset_stack) as *mut ProcStack, + schedule: p.add(proc_layout.offset_schedule) as *const S, + future: p.add(proc_layout.offset_future) as *mut F, + output: p.add(proc_layout.offset_output) as *mut R, } } } /// Returns the memory layout for a proc. #[inline] - fn proc_layout() -> TaskLayout { + fn proc_layout() -> ProcLayout { let layout_pdata = Layout::new::(); - let layout_t = Layout::new::(); - let layout_s = Layout::new::(); - let layout_f = Layout::new::>>(); - let layout_r = Layout::new::(); + let layout_stack = Layout::new::(); + let layout_schedule = Layout::new::(); + let layout_future = Layout::new::>>(); + let layout_output = Layout::new::(); - let size_union = layout_f.size().max(layout_r.size()); - let align_union = layout_f.align().max(layout_r.align()); + let size_union = layout_future.size().max(layout_output.size()); + let align_union = layout_future.align().max(layout_output.align()); let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) }; let layout = layout_pdata; - let (layout, offset_t) = extend(layout, layout_t); - let (layout, offset_s) = extend(layout, layout_s); + let (layout, offset_stack) = extend(layout, layout_stack); + let (layout, offset_schedule) = extend(layout, layout_schedule); let (layout, offset_union) = extend(layout, layout_union); - let offset_f = offset_union; - let offset_r = offset_union; + let offset_future = offset_union; + let offset_output = offset_union; - TaskLayout { + ProcLayout { layout, - offset_t, - offset_s, - offset_f, - offset_r, + offset_stack, + offset_schedule, + offset_future, + offset_output, } } @@ -505,67 +496,74 @@ where } } } + } +} - /// A guard that closes the proc if polling its future panics. - struct Guard(RawProc) - where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(LightProc) + Send + Sync + 'static; - - impl Drop for Guard - where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(LightProc) + Send + Sync + 'static, - { - fn drop(&mut self) { - let raw = self.0; - let ptr = raw.pdata as *const (); - - unsafe { - let mut state = (*raw.pdata).state.load(Ordering::Acquire); - - loop { - // If the proc was closed while running, then unschedule it, drop its - // future, and drop the proc reference. - if state & CLOSED != 0 { - // We still need to unschedule the proc because it is possible it was - // woken while running. - (*raw.pdata).state.fetch_and(!SCHEDULED, Ordering::AcqRel); - - // The thread that closed the proc didn't drop the future because it - // was running so now it's our responsibility to do so. - RawProc::::drop_future(ptr); +impl Copy for RawProc {} - // Drop the proc reference. - RawProc::::decrement(ptr); - break; - } +impl Clone for RawProc { + fn clone(&self) -> Self { + Self { + pdata: self.pdata, + schedule: self.schedule, + stack: self.stack, + future: self.future, + output: self.output, + } + } +} + +impl Drop for Guard +where + F: Future + Send + 'static, + R: Send + 'static, + S: Fn(LightProc) + Send + Sync + 'static, +{ + fn drop(&mut self) { + let raw = self.0; + let ptr = raw.pdata as *const (); - // Mark the proc as not running, not scheduled, and closed. - match (*raw.pdata).state.compare_exchange_weak( - state, - (state & !RUNNING & !SCHEDULED) | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(state) => { - // Drop the future because the proc is now closed. - RawProc::::drop_future(ptr); - - // Notify the awaiter that the proc has been closed. - if state & AWAITER != 0 { - (*raw.pdata).notify(); - } + unsafe { + let mut state = (*raw.pdata).state.load(Ordering::Acquire); + + loop { + // If the proc was closed while running, then unschedule it, drop its + // future, and drop the proc reference. + if state & CLOSED != 0 { + // We still need to unschedule the proc because it is possible it was + // woken while running. + (*raw.pdata).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // The thread that closed the proc didn't drop the future because it + // was running so now it's our responsibility to do so. + RawProc::::drop_future(ptr); + + // Drop the proc reference. + RawProc::::decrement(ptr); + break; + } - // Drop the proc reference. - RawProc::::decrement(ptr); - break; - } - Err(s) => state = s, + // Mark the proc as not running, not scheduled, and closed. + match (*raw.pdata).state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the proc is now closed. + RawProc::::drop_future(ptr); + + // Notify the awaiter that the proc has been closed. + if state & AWAITER != 0 { + (*raw.pdata).notify(); } + + // Drop the proc reference. + RawProc::::decrement(ptr); + break; } + Err(s) => state = s, } } } diff --git a/lightproc/src/recoverable_handle.rs b/lightproc/src/recoverable_handle.rs index d6a9ca6f..41352b12 100644 --- a/lightproc/src/recoverable_handle.rs +++ b/lightproc/src/recoverable_handle.rs @@ -1,11 +1,28 @@ use crate::proc_handle::ProcHandle; +use crate::proc_stack::ProcStack; use std::future::Future; use std::panic::resume_unwind; use std::pin::Pin; use std::task::{Context, Poll}; use std::thread; -pub struct RecoverableHandle(pub ProcHandle>); +pub struct RecoverableHandle(pub(crate) ProcHandle>); + +impl RecoverableHandle { + /// Cancels the proc. + /// + /// If the proc has already completed, calling this method will have no effect. + /// + /// When a proc is cancelled, its future cannot be polled again and will be dropped instead. + pub fn cancel(&self) { + self.0.cancel() + } + + /// Returns a reference to the stack stored inside the proc. + pub fn stack(&self) -> &ProcStack { + self.0.stack() + } +} impl Future for RecoverableHandle { type Output = Option; diff --git a/lightproc/src/state.rs b/lightproc/src/state.rs index 41696297..a09e11d1 100644 --- a/lightproc/src/state.rs +++ b/lightproc/src/state.rs @@ -1,7 +1,7 @@ /// Set if the proc is scheduled for running. /// /// A proc is considered to be scheduled whenever its `LightProc` reference exists. It is in scheduled -/// state at the moment of creation and when it gets unapused either by its `ProcHandle` or woken +/// state at the moment of creation and when it gets unpaused either by its `ProcHandle` or woken /// by a `Waker`. /// /// This flag can't be set when the proc is completed. However, it can be set while the proc is