Skip to content

Commit

Permalink
Updated the visibility of ProcStack's fields, renamed TaskLayout into…
Browse files Browse the repository at this point in the history
… ProcLayout, added the "cancel" and "stack" methods to RecoverableHandle and made its inner ProcHandle private, made ProcFutureExt and CatchUnwind private and fixed a few typos and formatting issues.
  • Loading branch information
r3v2d0g committed Nov 1, 2019
1 parent 0fdc0a9 commit ca5df91
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 249 deletions.
21 changes: 10 additions & 11 deletions lightproc/src/catch_unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,29 @@ use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut>
pub(crate) struct CatchUnwind<F>
where
Fut: Future,
F: Future,
{
future: Fut,
future: F,
}

impl<Fut> CatchUnwind<Fut>
impl<F> CatchUnwind<F>
where
Fut: Future + UnwindSafe,
F: Future + UnwindSafe,
{
unsafe_pinned!(future: Fut);
unsafe_pinned!(future: F);

pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
pub(crate) fn new(future: F) -> CatchUnwind<F> {
CatchUnwind { future }
}
}

impl<Fut> Future for CatchUnwind<Fut>
impl<F> Future for CatchUnwind<F>
where
Fut: Future + UnwindSafe,
F: Future + UnwindSafe,
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;
type Output = Result<F::Output, Box<dyn Any + Send>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
Expand Down
6 changes: 2 additions & 4 deletions lightproc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
mod catch_unwind;
mod layout_helpers;
mod proc_data;
mod proc_ext;
mod proc_layout;
mod proc_vtable;
mod raw_proc;
mod state;

pub mod catch_unwind;
pub mod lightproc;
pub mod proc_ext;
pub mod proc_handle;
pub mod proc_stack;
pub mod recoverable_handle;

pub mod prelude {
pub use crate::catch_unwind::*;
pub use crate::lightproc::*;
pub use crate::proc_ext::*;
pub use crate::proc_handle::*;
pub use crate::proc_stack::*;
pub use crate::recoverable_handle::*;
Expand Down
35 changes: 17 additions & 18 deletions lightproc/src/lightproc.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::ptr::NonNull;

use crate::proc_data::ProcData;
use crate::proc_ext::ProcFutureExt;
use crate::proc_handle::ProcHandle;
use crate::proc_stack::*;
use crate::raw_proc::RawProc;
use crate::recoverable_handle::RecoverableHandle;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::ptr::NonNull;

pub struct LightProc {
/// A pointer to the heap-allocated proc.
Expand Down Expand Up @@ -91,6 +90,18 @@ impl LightProc {
}
}

impl Debug for LightProc {
fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

fmt.debug_struct("LightProc")
.field("pdata", unsafe { &(*pdata) })
.field("stack", self.stack())
.finish()
}
}

impl Drop for LightProc {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
Expand All @@ -108,15 +119,3 @@ impl Drop for LightProc {
}
}
}

impl fmt::Debug for LightProc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

f.debug_struct("LightProc")
.field("pdata", unsafe { &(*pdata) })
.field("stack", self.stack())
.finish()
}
}
29 changes: 14 additions & 15 deletions lightproc/src/proc_data.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::layout_helpers::extend;
use crate::proc_stack::ProcStack;
use crate::proc_vtable::ProcVTable;
use crate::state::*;
use crossbeam_utils::Backoff;
use std::alloc::Layout;
use std::cell::Cell;
use std::fmt;
use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;

use crossbeam_utils::Backoff;

use crate::layout_helpers::extend;
use crate::proc_stack::*;
use crate::proc_vtable::ProcVTable;
use crate::state::*;

/// The pdata of a proc.
///
/// This pdata is stored right at the beginning of every heap-allocated proc.
Expand Down Expand Up @@ -131,23 +129,24 @@ impl ProcData {
#[inline]
pub(crate) fn offset_stack() -> usize {
let layout_pdata = Layout::new::<ProcData>();
let layout_t = Layout::new::<ProcStack>();
let (_, offset_t) = extend(layout_pdata, layout_t);
offset_t
let layout_stack = Layout::new::<ProcStack>();
let (_, offset_stack) = extend(layout_pdata, layout_stack);
offset_stack
}
}

impl fmt::Debug for ProcData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl Debug for ProcData {
fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);

f.debug_struct("ProcData")
fmt.debug_struct("ProcData")
.field("scheduled", &(state & SCHEDULED != 0))
.field("running", &(state & RUNNING != 0))
.field("completed", &(state & COMPLETED != 0))
.field("closed", &(state & CLOSED != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("handle", &(state & HANDLE != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("locked", &(state & LOCKED != 0))
.field("ref_count", &(state / REFERENCE))
.finish()
}
Expand Down
2 changes: 1 addition & 1 deletion lightproc/src/proc_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::catch_unwind::CatchUnwind;
use std::future::Future;
use std::panic::UnwindSafe;

pub trait ProcFutureExt: Future {
pub(crate) trait ProcFutureExt: Future {
fn catch_unwind(self) -> CatchUnwind<Self>
where
Self: Sized + UnwindSafe,
Expand Down
167 changes: 83 additions & 84 deletions lightproc/src/proc_handle.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::fmt;
use crate::proc_data::ProcData;
use crate::proc_stack::ProcStack;
use crate::state::*;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};

use crate::proc_data::ProcData;
use crate::proc_stack::*;
use crate::state::*;

/// A handle that awaits the result of a proc.
///
/// This type is a future that resolves to an `Option<R>` where:
Expand Down Expand Up @@ -94,6 +93,85 @@ impl<R> ProcHandle<R> {
}
}

impl<R> Future for ProcHandle<R> {
type Output = Option<R>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
let mut state = (*pdata).state.load(Ordering::Acquire);

loop {
// If the proc has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// Even though the awaiter is most likely the current proc, it could also be
// another proc.
(*pdata).notify_unless(cx.waker());
return Poll::Ready(None);
}

// If the proc is not completed, register the current proc.
if state & COMPLETED == 0 {
// Replace the waker with one associated with the current proc. We need a
// safeguard against panics because dropping the previous waker can panic.
(*pdata).swap_awaiter(Some(cx.waker().clone()));

// Reload the state after registering. It is possible that the proc became
// completed or closed just before registration so we need to check for that.
state = (*pdata).state.load(Ordering::Acquire);

// If the proc has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// Even though the awaiter is most likely the current proc, it could also
// be another proc.
(*pdata).notify_unless(cx.waker());
return Poll::Ready(None);
}

// If the proc is still not completed, we're blocked on it.
if state & COMPLETED == 0 {
return Poll::Pending;
}
}

// Since the proc is now completed, mark it as closed in order to grab its output.
match (*pdata).state.compare_exchange(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter. Even though the awaiter is most likely the current
// proc, it could also be another proc.
if state & AWAITER != 0 {
(*pdata).notify_unless(cx.waker());
}

// Take the output from the proc.
let output = ((*pdata).vtable.get_output)(ptr) as *mut R;
return Poll::Ready(Some(output.read()));
}
Err(s) => state = s,
}
}
}
}
}

impl<R> Debug for ProcHandle<R> {
fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

fmt.debug_struct("ProcHandle")
.field("pdata", unsafe { &(*pdata) })
.finish()
}
}

impl<R> Drop for ProcHandle<R> {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
Expand Down Expand Up @@ -173,82 +251,3 @@ impl<R> Drop for ProcHandle<R> {
drop(output);
}
}

impl<R> Future for ProcHandle<R> {
type Output = Option<R>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
let mut state = (*pdata).state.load(Ordering::Acquire);

loop {
// If the proc has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// Even though the awaiter is most likely the current proc, it could also be
// another proc.
(*pdata).notify_unless(cx.waker());
return Poll::Ready(None);
}

// If the proc is not completed, register the current proc.
if state & COMPLETED == 0 {
// Replace the waker with one associated with the current proc. We need a
// safeguard against panics because dropping the previous waker can panic.
(*pdata).swap_awaiter(Some(cx.waker().clone()));

// Reload the state after registering. It is possible that the proc became
// completed or closed just before registration so we need to check for that.
state = (*pdata).state.load(Ordering::Acquire);

// If the proc has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// Even though the awaiter is most likely the current proc, it could also
// be another proc.
(*pdata).notify_unless(cx.waker());
return Poll::Ready(None);
}

// If the proc is still not completed, we're blocked on it.
if state & COMPLETED == 0 {
return Poll::Pending;
}
}

// Since the proc is now completed, mark it as closed in order to grab its output.
match (*pdata).state.compare_exchange(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter. Even though the awaiter is most likely the current
// proc, it could also be another proc.
if state & AWAITER != 0 {
(*pdata).notify_unless(cx.waker());
}

// Take the output from the proc.
let output = ((*pdata).vtable.get_output)(ptr) as *mut R;
return Poll::Ready(Some(output.read()));
}
Err(s) => state = s,
}
}
}
}
}

impl<R> fmt::Debug for ProcHandle<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

f.debug_struct("ProcHandle")
.field("pdata", unsafe { &(*pdata) })
.finish()
}
}
10 changes: 5 additions & 5 deletions lightproc/src/proc_layout.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::alloc::Layout;

#[derive(Clone, Copy)]
pub(crate) struct TaskLayout {
pub(crate) struct ProcLayout {
/// Memory layout of the whole proc.
pub(crate) layout: Layout,

/// Offset into the proc at which the stack is stored.
pub(crate) offset_t: usize,
pub(crate) offset_stack: usize,

/// Offset into the proc at which the schedule function is stored.
pub(crate) offset_s: usize,
pub(crate) offset_schedule: usize,

/// Offset into the proc at which the future is stored.
pub(crate) offset_f: usize,
pub(crate) offset_future: usize,

/// Offset into the proc at which the output is stored.
pub(crate) offset_r: usize,
pub(crate) offset_output: usize,
}
Loading

0 comments on commit ca5df91

Please sign in to comment.