Skip to content

Commit

Permalink
stateful ProcStack
Browse files Browse the repository at this point in the history
  • Loading branch information
onsails committed Jan 5, 2020
1 parent 99480f0 commit f4267b8
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 45 deletions.
7 changes: 5 additions & 2 deletions bastion-executor/examples/run.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
use lightproc::proc_stack::{EmptyProcState, ProcStack};

fn main() {
run(
async {
println!("Example execution");
panic!("fault");
},
ProcStack::default().with_after_panic(|| println!("after panic")),
ProcStack::default().with_after_panic(|s: EmptyProcState| {
println!("after panic");
s
}),
);
}
7 changes: 5 additions & 2 deletions bastion-executor/examples/spawn_async.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use bastion_executor::prelude::*;
use lightproc::proc_stack::ProcStack;
use lightproc::proc_stack::{EmptyProcState, ProcStack};

fn main() {
let pid = 1;
let stack = ProcStack::default()
.with_pid(pid)
.with_after_panic(move || println!("after panic {}", pid.clone()));
.with_after_panic(move |s: EmptyProcState| {
println!("after panic {}", pid.clone());
s
});

let handle = spawn(
async {
Expand Down
3 changes: 2 additions & 1 deletion bastion/src/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Child {
let sender = self.bcast.sender().clone();

// FIXME: with_pid
ProcStack::default().with_after_panic(move || {
ProcStack::default().with_after_panic(move |state: EmptyProcState| {
// FIXME: clones
let id = id.clone();
warn!("Child({}): Panicked.", id);
Expand All @@ -85,6 +85,7 @@ impl Child {
let env = Envelope::new(msg, path.clone(), sender.clone());
// TODO: handle errors
parent.send(env).ok();
state
})
}

Expand Down
9 changes: 6 additions & 3 deletions lightproc/examples/proc_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ where
schedule,
ProcStack::default()
.with_pid(1)
.with_before_start(|| {
.with_before_start(|s: EmptyProcState| {
println!("Before start");
s
})
.with_after_complete(|| {
.with_after_complete(|s: EmptyProcState| {
println!("After complete");
s
})
.with_after_panic(|| {
.with_after_panic(|s: EmptyProcState| {
println!("After panic");
s
}),
);

Expand Down
6 changes: 4 additions & 2 deletions lightproc/examples/proc_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ where
schedule,
ProcStack::default()
.with_pid(1)
.with_before_start(|| {
.with_before_start(|s: EmptyProcState| {
println!("Before start");
s
})
.with_after_complete(|| {
.with_after_complete(|s: EmptyProcState| {
println!("After complete");
s
}),
);

Expand Down
9 changes: 6 additions & 3 deletions lightproc/src/lightproc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
//! // ... process stack with a lifecycle callback
//! let proc_stack =
//! ProcStack::default()
//! .with_after_panic(|| {
//! .with_after_panic(|s: EmptyProcState| {
//! println!("After panic started!");
//! s
//! });
//!
//! // ... creating a recoverable process
Expand Down Expand Up @@ -76,8 +77,9 @@ impl LightProc {
/// # // ... process stack with a lifecycle callback
/// # let proc_stack =
/// # ProcStack::default()
/// # .with_after_panic(|| {
/// # .with_after_panic(|s: EmptyProcState| {
/// # println!("After panic started!");
/// # s
/// # });
/// #
/// // ... creating a recoverable process
Expand Down Expand Up @@ -120,8 +122,9 @@ impl LightProc {
/// # // ... process stack with a lifecycle callback
/// # let proc_stack =
/// # ProcStack::default()
/// # .with_after_panic(|| {
/// # .with_after_panic(|s: EmptyProcState| {
/// # println!("After panic started!");
/// # s
/// # });
/// #
/// // ... creating a standard process
Expand Down
6 changes: 3 additions & 3 deletions lightproc/src/proc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ impl<R> ProcHandle<R> {
}

/// Returns a reference to the stack stored inside the proc.
pub fn stack(&self) -> &ProcStack {
pub fn stack(&self) -> &mut ProcStack {
let offset = ProcData::offset_stack();
let ptr = self.raw_proc.as_ptr();

unsafe {
let raw = (ptr as *mut u8).add(offset) as *const ProcStack;
&*raw
let raw = (ptr as *mut u8).add(offset) as *mut ProcStack;
&mut *raw
}
}
}
Expand Down
115 changes: 93 additions & 22 deletions lightproc/src/proc_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
//!
//! If we want to make an analogy, stack abstraction is similar to actor lifecycle abstractions
//! in frameworks like Akka, but tailored version for Rust environment.
use std::alloc;
use std::fmt::{self, Debug, Formatter};
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand All @@ -14,12 +17,12 @@ use std::sync::Arc;
/// # Example
///
/// ```rust
/// use lightproc::proc_stack::ProcStack;
/// use lightproc::proc_stack::{ProcStack, EmptyProcState};
///
/// ProcStack::default()
/// .with_before_start(|| { println!("Before start"); })
/// .with_after_complete(|| { println!("After complete"); })
/// .with_after_panic(|| { println!("After panic"); });
/// .with_before_start(|s: EmptyProcState| { println!("Before start"); s })
/// .with_after_complete(|s: EmptyProcState| { println!("After complete"); s })
/// .with_after_panic(|s: EmptyProcState| { println!("After panic"); s });
/// ```
#[derive(Default)]
pub struct ProcStack {
Expand All @@ -28,24 +31,26 @@ pub struct ProcStack {
/// Can be used to identify specific processes during any executor, reactor implementations.
pub pid: AtomicUsize,

pub(crate) state: RawState,

/// Before start callback
///
/// This callback is called before we start to inner future of the process
pub(crate) before_start: Option<Arc<dyn Fn() + Send + Sync>>,
pub(crate) before_start: Option<Arc<dyn Fn(&RawState) -> RawState + Send + Sync>>,

/// After complete callback
///
/// This callback is called after future resolved to it's output.
/// Mind that, even panic occurs this callback will get executed.
///
/// Eventually all panics are coming from an Error output.
pub(crate) after_complete: Option<Arc<dyn Fn() + Send + Sync>>,
pub(crate) after_complete: Option<Arc<dyn Fn(&RawState) -> RawState + Send + Sync>>,

/// After panic callback
///
/// This callback is only called when a panic has been occurred.
/// Mind that [ProcHandle](proc_handle/struct.ProcHandle.html) is not using this
pub(crate) after_panic: Option<Arc<dyn Fn() + Send + Sync>>,
pub(crate) after_panic: Option<Arc<dyn Fn(&RawState) -> RawState + Send + Sync>>,
}

impl ProcStack {
Expand All @@ -62,51 +67,56 @@ impl ProcStack {
self
}

pub fn with_state<S>(mut self, state: Box<S>) -> Self {
self.state = RawState::from(state);
self
}

/// Adds a callback that will be executed before polling inner future to the stack
///
/// ```rust
/// use lightproc::proc_stack::ProcStack;
/// use lightproc::proc_stack::{ProcStack, EmptyProcState};
///
/// ProcStack::default()
/// .with_before_start(|| { println!("Before start") });
/// .with_before_start(|s: EmptyProcState| { println!("Before start"); s });
/// ```
pub fn with_before_start<T>(mut self, callback: T) -> Self
pub fn with_before_start<C, S>(mut self, callback: C) -> Self
where
T: Fn() + Send + Sync + 'static,
C: Fn(Box<S>) -> Box<S> + Send + Sync + 'static,
{
self.before_start = Some(Arc::new(callback));
self.before_start = Some(Self::wrap_callback(callback));
self
}

/// Adds a callback that will be executed after inner future resolves to an output to the stack
///
/// ```rust
/// use lightproc::proc_stack::ProcStack;
/// use lightproc::proc_stack::{ProcStack, EmptyProcState};
///
/// ProcStack::default()
/// .with_after_complete(|| { println!("After complete") });
/// .with_after_complete(|s: EmptyProcState| { println!("After complete"); s });
/// ```
pub fn with_after_complete<T>(mut self, callback: T) -> Self
pub fn with_after_complete<C, S>(mut self, callback: C) -> Self
where
T: Fn() + Send + Sync + 'static,
C: Fn(Box<S>) -> Box<S> + Send + Sync + 'static,
{
self.after_complete = Some(Arc::new(callback));
self.after_complete = Some(Self::wrap_callback(callback));
self
}

/// Adds a callback that will be executed after inner future panics to the stack
///
/// ```rust
/// use lightproc::proc_stack::ProcStack;
/// use lightproc::proc_stack::{ProcStack, EmptyProcState};
///
/// ProcStack::default()
/// .with_after_panic(|| { println!("After panic") });
/// .with_after_panic(|s: EmptyProcState| { println!("After panic"); s });
/// ```
pub fn with_after_panic<T>(mut self, callback: T) -> Self
pub fn with_after_panic<C, S>(mut self, callback: C) -> Self
where
T: Fn() + Send + Sync + 'static,
C: Fn(Box<S>) -> Box<S> + Send + Sync + 'static,
{
self.after_panic = Some(Arc::new(callback));
self.after_panic = Some(Self::wrap_callback(callback));
self
}

Expand All @@ -122,6 +132,66 @@ impl ProcStack {
pub fn get_pid(&self) -> usize {
self.pid.load(Ordering::Acquire)
}

fn wrap_callback<C, S>(callback: C) -> Arc<dyn Fn(&RawState) -> RawState + Send + Sync>
where
C: Fn(Box<S>) -> Box<S> + Send + Sync + 'static,
{
Arc::new(move |raw_state: &RawState| {
let state = raw_state.downcast::<S>();
let state = callback(state);
RawState::from(state)
})
}
}

pub type EmptyProcState = Box<Empty>;
pub struct Empty;

pub(crate) struct RawState {
ptr: *mut u8,
size: usize,
align: usize,
}

impl RawState {
fn downcast<T>(&self) -> Box<T> {
unsafe { Box::from_raw(self.ptr as *mut T) }
}
}

impl Default for RawState {
fn default() -> Self {
RawState::from(Box::new(Empty))
}
}

impl<T> From<Box<T>> for RawState {
fn from(val: Box<T>) -> Self {
let size = mem::size_of_val(val.as_ref());
let align = mem::align_of_val(&val);

Self {
ptr: Box::into_raw(val) as *mut u8,
size,
align,
}
}
}

impl Clone for RawState {
fn clone(&self) -> Self {
let layout = alloc::Layout::from_size_align(self.size, self.align).unwrap();
let dst = unsafe { alloc::alloc(layout) };
unsafe {
ptr::copy(self.ptr, dst, self.size);
}
RawState {
ptr: dst,
size: self.size,
align: self.align,
}
}
}

impl Debug for ProcStack {
Expand All @@ -136,6 +206,7 @@ impl Clone for ProcStack {
fn clone(&self) -> Self {
ProcStack {
pid: AtomicUsize::new(self.pid.load(Ordering::Acquire)),
state: self.state.clone(),
before_start: self.before_start.clone(),
after_complete: self.after_complete.clone(),
after_panic: self.after_panic.clone(),
Expand Down
5 changes: 3 additions & 2 deletions lightproc/src/raw_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ where
let guard = Guard(raw);

if let Some(before_start_cb) = &(*raw.stack).before_start {
(*before_start_cb.clone())();
(*raw.stack).state = (*before_start_cb.clone())(&(*raw.stack).state);
}

let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
Expand Down Expand Up @@ -441,7 +441,8 @@ where
}

if let Some(after_complete_cb) = &(*raw.stack).after_complete {
(*after_complete_cb.clone())();
(*raw.stack).state =
(*after_complete_cb.clone())(&(*raw.stack).state);
}

// Drop the proc reference.
Expand Down
2 changes: 1 addition & 1 deletion lightproc/src/recoverable_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<R> Future for RecoverableHandle<R> {
Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)),
Poll::Ready(Some(Err(_))) => {
if let Some(after_panic_cb) = self.0.stack().after_panic.clone() {
(*after_panic_cb.clone())();
self.0.stack().state = (*after_panic_cb.clone())(&self.0.stack().state);
}

Poll::Ready(None)
Expand Down
11 changes: 7 additions & 4 deletions lightproc/tests/stack.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use lightproc::proc_stack::ProcStack;
use lightproc::proc_stack::{EmptyProcState, ProcStack};

#[test]
fn stack_copy() {
let stack = ProcStack::default().with_pid(12).with_after_panic(|| {
println!("After panic!");
});
let stack = ProcStack::default()
.with_pid(12)
.with_after_panic(|s: EmptyProcState| {
println!("After panic!");
s
});

let stack2 = stack.clone();

Expand Down

0 comments on commit f4267b8

Please sign in to comment.