Skip to content

Commit

Permalink
Implemented fault recovery for the system and basic message passing f…
Browse files Browse the repository at this point in the history
…or it and the supervisors
  • Loading branch information
r3v2d0g committed Oct 9, 2019
1 parent 291bde6 commit a552a03
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ impl Bastion {
pub fn supervisor() -> Supervisor {
Supervisor::new()
}
}
}
28 changes: 26 additions & 2 deletions src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use uuid::Uuid;

type Sender = UnboundedSender<BastionMessage>;
type Receiver = UnboundedReceiver<BastionMessage>;
pub(super) type Sender = UnboundedSender<BastionMessage>;
pub(super) type Receiver = UnboundedReceiver<BastionMessage>;

pub(super) struct Broadcast {
id: Uuid,
Expand Down Expand Up @@ -62,6 +62,30 @@ impl Broadcast {
&self.id
}

pub(super) fn sender(&self) -> &Sender {
&self.sender
}

pub(super) fn poison_pill_child(&mut self, id: &Uuid) {
self.send_child(id, BastionMessage::PoisonPill);
self.remove_child(id);
}

pub(super) fn poison_pill_children(&mut self) {
self.send_children(BastionMessage::PoisonPill);
self.clear_children();
}

pub(super) fn dead(&mut self) {
self.poison_pill_children();
self.send_parent(BastionMessage::dead(self.id.clone()));
}

pub(super) fn faulted(&mut self) {
self.poison_pill_children();
self.send_parent(BastionMessage::faulted(self.id.clone()));
}

pub(super) fn new_child(&mut self, id: Uuid) -> Self {
let child = Broadcast::with_parent(id, self.sender.clone());
self.children.insert(child.id.clone(), child.sender.clone());
Expand Down
43 changes: 25 additions & 18 deletions src/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub trait Message: Shell + Debug {
fn as_any(&self) -> &dyn Any;
}
impl<T> Message for T
where
T: Shell + Debug,
where
T: Shell + Debug,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -74,13 +74,10 @@ impl Children {

match msg {
BastionMessage::PoisonPill | BastionMessage::Dead { .. } | BastionMessage::Faulted { .. } => {
self.bcast.send_children(BastionMessage::poison_pill());
self.bcast.clear_children();

if msg.is_faulted() {
self.bcast.send_parent(BastionMessage::faulted(id));
self.bcast.faulted();
} else {
self.bcast.send_parent(BastionMessage::dead(id));
self.bcast.dead();
}

return self;
Expand All @@ -89,8 +86,11 @@ impl Children {
BastionMessage::Message(_) => unimplemented!(),
}
}
// FIXME: "return self" or "send_parent(Faulted)"?
Poll::Ready(None) => unimplemented!(),
Poll::Ready(None) => {
self.bcast.faulted();

return self;
}
Poll::Pending => pending!(),
}
}
Expand All @@ -99,11 +99,14 @@ impl Children {
pub(super) fn launch(mut self) -> JoinHandle<Self> {
for _ in 0..self.redundancy {
let id = Uuid::new_v4();
let bcast = self.bcast.new_child(id);
let bcast = self.bcast.new_child(id.clone());

let thunk = objekt::clone_box(&*self.thunk);
let ctx = BastionContext { };
let msg = objekt::clone_box(&*self.msg);

let parent = self.bcast.sender().clone();
let ctx = BastionContext::new(id, parent);

let exec = thunk(ctx, msg)
.catch_unwind();

Expand All @@ -122,8 +125,8 @@ impl Child {
let id = self.bcast.id().clone();

match res {
Ok(Ok(())) => self.bcast.send_parent(BastionMessage::dead(id)),
Ok(Err(())) | Err(_) => self.bcast.send_parent(BastionMessage::faulted(id)),
Ok(Ok(())) => self.bcast.dead(),
Ok(Err(())) | Err(_) => self.bcast.faulted(),
}

return;
Expand All @@ -135,20 +138,24 @@ impl Child {

match msg {
BastionMessage::PoisonPill => {
self.bcast.send_parent(BastionMessage::dead(id));
self.bcast.dead();

return;
}
BastionMessage::Dead { .. } | BastionMessage::Faulted { .. } => {
// FIXME: shouldn't happen; send_parent(Faulted)?
unimplemented!()
self.bcast.faulted();

return;
}
// FIXME
BastionMessage::Message(_) => unimplemented!(),
}
}
// FIXME: "return" or "send_parent(Faulted)"?
Poll::Ready(None) => unimplemented!(),
Poll::Ready(None) => {
self.bcast.faulted();

return;
}
Poll::Pending => pending!(),
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
pub struct BastionContext {}
use crate::broadcast::{BastionMessage, Broadcast, Sender};
use uuid::Uuid;

pub struct BastionContext {
id: Uuid,
parent: Sender,
}

impl BastionContext {
pub(super) fn new(id: Uuid, parent: Sender) -> Self {
BastionContext { id, parent }
}

pub fn id(&self) -> &Uuid {
&self.id
}
}
33 changes: 14 additions & 19 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,10 @@ impl Supervisor {

async fn kill_children(&mut self, range: RangeFrom<usize>) {
if range.start == 0 {
self.bcast.send_children(BastionMessage::poison_pill());
self.bcast.clear_children();
self.bcast.poison_pill_children();
} else {
for (_, id) in self.order.range(range.clone()) {
self.bcast.send_child(id, BastionMessage::poison_pill());
self.bcast.remove_child(id);
self.bcast.poison_pill_child(id);
}
}

Expand Down Expand Up @@ -154,12 +152,7 @@ impl Supervisor {
Poll::Ready(Some(msg)) => {
match msg {
BastionMessage::PoisonPill => {
let id = self.bcast.id().clone();

self.bcast.send_children(BastionMessage::poison_pill());
self.bcast.clear_children();

self.bcast.send_parent(BastionMessage::dead(id));
self.bcast.dead();

return self;
}
Expand All @@ -172,21 +165,23 @@ impl Supervisor {
}
BastionMessage::Faulted { id } => {
if self.recover(id).await.is_err() {
self.bcast.send_children(BastionMessage::poison_pill());
self.bcast.clear_children();

let id = self.bcast.id().clone();
self.bcast.send_parent(BastionMessage::faulted(id));
self.bcast.faulted();

return self;
}
}
// FIXME
BastionMessage::Message(_) => unimplemented!(),
BastionMessage::Message(_) => {
// TODO: send to parent too?

self.bcast.send_children(msg);
}
}
}
// FIXME: "return self" or "send_parent(Faulted)"?
Poll::Ready(None) => unimplemented!(),
Poll::Ready(None) => {
self.bcast.faulted();

return self;
}
Poll::Pending => pending!(),
}
}
Expand Down
23 changes: 17 additions & 6 deletions src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,23 @@ impl System {
match msg {
// FIXME
BastionMessage::PoisonPill => unimplemented!(),
// FIXME
BastionMessage::Dead { .. } => unimplemented!(),
// FIXME
BastionMessage::Faulted { .. } => unimplemented!(),
// FIXME
BastionMessage::Message(_) => unimplemented!(),
BastionMessage::Dead { id } => {
self.supervisors.remove(&id);
self.bcast.remove_child(&id);

self.dead.insert(id);
}
BastionMessage::Faulted { id } => {
// TODO: add a "faulted" list and poll from it instead of awaiting

if let Some(supervisor) = self.supervisors.remove(&id) {
// FIXME: set a limit?
self.launch_supervisor(supervisor.await);
}
}
BastionMessage::Message(_) => {
self.bcast.send_children(msg);
}
}
}
// FIXME
Expand Down

0 comments on commit a552a03

Please sign in to comment.