Skip to content

Commit

Permalink
After panic callback is added
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Oct 24, 2019
1 parent 97f9e9a commit a3aba3b
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 12 deletions.
2 changes: 2 additions & 0 deletions lightproc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ edition = "2018"

[dependencies]
crossbeam-utils = "0.6.6"
pin-utils = "0.1.0-alpha.4"

[dev-dependencies]
crossbeam = "0.7.1"
futures-preview = "0.3.0-alpha.17"
lazy_static = "1.3.0"
65 changes: 65 additions & 0 deletions lightproc/examples/proc_panic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::future::Future;
use std::sync::Arc;
use std::thread;

use crossbeam::channel::{unbounded, Sender};
use futures::{executor, FutureExt};
use lightproc::prelude::*;
use std::sync::atomic::AtomicUsize;
use lazy_static::lazy_static;
use std::panic::AssertUnwindSafe;
use lightproc::proc_ext::ProcFutureExt;
use lightproc::recoverable_handle::RecoverableHandle;


fn spawn_on_thread<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
static ref QUEUE: Sender<LightProc> = {
let (sender, receiver) = unbounded::<LightProc>();

// Start the executor thread.
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});

sender
};
}

let schedule = |t| QUEUE.send(t).unwrap();
let (proc, handle) = LightProc::recoverable(
future,
schedule,
ProcStack {
pid: AtomicUsize::new(1),
after_complete: Some(Arc::new(|| {
println!("After complete");
})),
before_start: Some(Arc::new(|| {
println!("Before start");
})),
after_panic: Some(Arc::new(|| {
println!("After panic");
}))
},
);

proc.schedule();

handle
}

fn main() {
let handle = spawn_on_thread(async {
panic!("Panic here!");
});

executor::block_on(handle);
}
1 change: 1 addition & 0 deletions lightproc/examples/proc_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ where
before_start: Some(Arc::new(|| {
println!("Before start");
})),
after_panic: None
},
);

Expand Down
30 changes: 30 additions & 0 deletions lightproc/src/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::future::Future;
use std::panic::{UnwindSafe, catch_unwind, AssertUnwindSafe};
use pin_utils::unsafe_pinned;
use std::any::Any;
use std::task::{Context, Poll};
use std::pin::Pin;

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut> where Fut: Future {
future: Fut,
}

impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
CatchUnwind { future }
}
}

impl<Fut> Future for CatchUnwind<Fut>
where Fut: Future + UnwindSafe,
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
}
}
4 changes: 4 additions & 0 deletions lightproc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ pub mod proc_stack;
pub mod proc_vtable;
pub mod raw_proc;
pub mod state;
pub mod panic_helpers;
pub mod catch_unwind;
pub mod proc_ext;
pub mod recoverable_handle;

pub mod prelude {
pub use crate::lightproc::*;
Expand Down
22 changes: 19 additions & 3 deletions lightproc/src/lightproc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, thread};
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
Expand All @@ -7,7 +7,11 @@ use std::ptr::NonNull;
use crate::proc_data::ProcData;
use crate::proc_handle::ProcHandle;
use crate::proc_stack::*;
use crate::raw_proc::RawProc;
use crate::raw_proc::{RawProc, ProcFuture};
use std::panic::AssertUnwindSafe;
use crate::proc_ext::ProcFutureExt;
use crate::catch_unwind::CatchUnwind;
use crate::recoverable_handle::RecoverableHandle;

pub struct LightProc {
/// A pointer to the heap-allocated task.
Expand All @@ -18,13 +22,25 @@ unsafe impl Send for LightProc {}
unsafe impl Sync for LightProc {}

impl LightProc {
pub fn recoverable<F, R, S>(future: F, schedule: S, stack: ProcStack) -> (LightProc, RecoverableHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static,
{
let recovery_future = AssertUnwindSafe(future).catch_unwind();
let (proc, handle) = Self::build(recovery_future, schedule, stack);
(proc, RecoverableHandle(handle))
}


pub fn build<F, R, S>(future: F, schedule: S, stack: ProcStack) -> (LightProc, ProcHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static,
{
let raw_task = RawProc::<F, R, S>::allocate(stack, future, schedule);
let raw_task = RawProc::allocate(stack, future, schedule);
let task = LightProc { raw_proc: raw_task };
let handle = ProcHandle {
raw_proc: raw_task,
Expand Down
17 changes: 17 additions & 0 deletions lightproc/src/panic_helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::mem;

#[inline]
pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
struct Bomb;

impl Drop for Bomb {
fn drop(&mut self) {
std::process::abort();
}
}

let bomb = Bomb;
let t = f();
mem::forget(bomb);
t
}
4 changes: 2 additions & 2 deletions lightproc/src/proc_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crossbeam_utils::Backoff;

use crate::layout_helpers::extend;
use crate::proc_stack::*;
use crate::proc_vtable::TaskVTable;
use crate::proc_vtable::ProcVTable;
use crate::state::*;

/// The pdata of a task.
Expand All @@ -29,7 +29,7 @@ pub(crate) struct ProcData {
///
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,
pub(crate) vtable: &'static ProcVTable,
}

impl ProcData {
Expand Down
13 changes: 13 additions & 0 deletions lightproc/src/proc_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::future::Future;
use std::panic::UnwindSafe;
use crate::catch_unwind::CatchUnwind;

pub trait ProcFutureExt: Future {
fn catch_unwind(self) -> CatchUnwind<Self>
where Self: Sized + UnwindSafe
{
CatchUnwind::new(self)
}
}

impl<T: ?Sized> ProcFutureExt for T where T: Future {}
4 changes: 3 additions & 1 deletion lightproc/src/proc_handle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, mem};
use std::future::Future;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
Expand All @@ -9,6 +9,7 @@ use std::task::{Context, Poll};
use crate::proc_data::ProcData;
use crate::proc_stack::*;
use crate::state::*;
use std::any::Any;

/// A handle that awaits the result of a task.
///
Expand Down Expand Up @@ -242,6 +243,7 @@ impl<R> Future for ProcHandle<R> {
}
}


impl<R> fmt::Debug for ProcHandle<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
Expand Down
2 changes: 2 additions & 0 deletions lightproc/src/proc_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub struct ProcStack {

// After action callbacks
pub after_complete: Option<Arc<dyn Fn() + Send + Sync>>,

pub after_panic: Option<Arc<dyn Fn() + Send + Sync>>,
}

impl fmt::Debug for ProcStack {
Expand Down
2 changes: 1 addition & 1 deletion lightproc/src/proc_vtable.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::task::RawWakerVTable;

/// The vtable for a task.
pub(crate) struct TaskVTable {
pub(crate) struct ProcVTable {
/// The raw waker vtable.
pub(crate) raw_waker: RawWakerVTable,

Expand Down
16 changes: 11 additions & 5 deletions lightproc/src/raw_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

use crate::layout_helpers::extend;
use crate::lightproc::LightProc;
use crate::lightproc::{LightProc};
use crate::proc_data::ProcData;
use crate::proc_layout::TaskLayout;
use crate::proc_stack::*;
use crate::proc_vtable::TaskVTable;
use crate::proc_vtable::ProcVTable;
use crate::state::*;
use std::panic::AssertUnwindSafe;
use crate::catch_unwind::CatchUnwind;
use std::any::Any;

pub type ProcFuture<F> = CatchUnwind<AssertUnwindSafe<F>>;

/// Raw pointers to the fields of a task.
pub(crate) struct RawProc<F, R, S> {
Expand All @@ -26,7 +31,8 @@ pub(crate) struct RawProc<F, R, S> {

impl<F, R, S> Copy for RawProc<F, R, S> {}

impl<F, R, S> Clone for RawProc<F, R, S> {
impl<F, R, S> Clone for RawProc<F, R, S>
{
fn clone(&self) -> Self {
Self {
pdata: self.pdata,
Expand Down Expand Up @@ -64,7 +70,7 @@ where
(raw.pdata as *mut ProcData).write(ProcData {
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
awaiter: Cell::new(None),
vtable: &TaskVTable {
vtable: &ProcVTable {
raw_waker: RawWakerVTable::new(
Self::clone_waker,
Self::wake,
Expand Down Expand Up @@ -116,7 +122,7 @@ where
let layout_pdata = Layout::new::<ProcData>();
let layout_t = Layout::new::<ProcStack>();
let layout_s = Layout::new::<S>();
let layout_f = Layout::new::<F>();
let layout_f = Layout::new::<ProcFuture<F>>();
let layout_r = Layout::new::<R>();

let size_union = layout_f.size().max(layout_r.size());
Expand Down
27 changes: 27 additions & 0 deletions lightproc/src/recoverable_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::proc_handle::ProcHandle;
use std::thread;
use std::future::Future;
use std::task::{Context, Poll};
use std::pin::Pin;
use std::panic::resume_unwind;

pub struct RecoverableHandle<R>(pub ProcHandle<thread::Result<R>>);

impl<R> Future for RecoverableHandle<R> {
type Output = Option<R>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)),
Poll::Ready(Some(Err(err))) => {
if let Some(after_panic_cb) = self.0.stack().after_panic.clone() {
(*after_panic_cb.clone())();
}

resume_unwind(err)
},
}
}
}

0 comments on commit a3aba3b

Please sign in to comment.