Skip to content

Commit

Permalink
Fixed formatting and updated Children to forward messages to its own …
Browse files Browse the repository at this point in the history
…children
  • Loading branch information
r3v2d0g committed Oct 17, 2019
1 parent b17bef2 commit be4b1bc
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 300 deletions.
12 changes: 6 additions & 6 deletions bastion/src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use futures::channel::mpsc::UnboundedSender;
use lazy_static::lazy_static;

lazy_static! {
pub(super) static ref SYSTEM: UnboundedSender<Supervisor> = System::start();
pub(super) static ref REGISTRY: Registry = Registry::new();
pub(super) static ref SYSTEM: UnboundedSender<Supervisor> = System::start();
pub(super) static ref REGISTRY: Registry = Registry::new();
}

pub struct Bastion {
// TODO: ...
// TODO: ...
}

impl Bastion {
pub fn supervisor() -> Supervisor {
Supervisor::new()
}
pub fn supervisor() -> Supervisor {
Supervisor::new()
}
}
14 changes: 5 additions & 9 deletions bastion/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ pub(super) struct Broadcast {
#[derive(Debug)]
pub(super) enum BastionMessage {
PoisonPill,
Dead {
id: BastionId,
},
Faulted {
id: BastionId,
},
Dead { id: BastionId },
Faulted { id: BastionId },
Message(Box<dyn Message>),
}

impl Broadcast {
pub(super) fn new() -> Self {
let id = BastionId::new();
let id = BastionId::new();
let parent = None;
let (sender, recver) = mpsc::unbounded();
let children = FxHashMap::default();
Expand Down Expand Up @@ -84,7 +80,7 @@ impl Broadcast {
}

pub(super) fn faulted(&mut self) {
self.poison_pill_children();
self.poison_pill_children();
self.send_parent(BastionMessage::faulted(self.id.clone()));
}

Expand Down Expand Up @@ -204,4 +200,4 @@ impl Clone for BastionMessage {
BastionMessage::Message(msg) => BastionMessage::msg(objekt::clone_box(&**msg)),
}
}
}
}
40 changes: 19 additions & 21 deletions bastion/src/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,22 @@ impl Children {

loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(msg)) => {
match msg {
BastionMessage::PoisonPill | BastionMessage::Dead { .. } | BastionMessage::Faulted { .. } => {
REGISTRY.remove_children(&self);

if msg.is_faulted() {
self.bcast.faulted();
} else {
self.bcast.dead();
}

return self;
Poll::Ready(Some(msg)) => match msg {
BastionMessage::PoisonPill
| BastionMessage::Dead { .. }
| BastionMessage::Faulted { .. } => {
REGISTRY.remove_children(&self);

if msg.is_faulted() {
self.bcast.faulted();
} else {
self.bcast.dead();
}
// FIXME
BastionMessage::Message(_) => unimplemented!(),

return self;
}
}
BastionMessage::Message(_) => self.bcast.send_children(msg),
},
Poll::Ready(None) => {
REGISTRY.remove_children(&self);

Expand All @@ -122,7 +121,7 @@ impl Children {
}
}

pub(super) fn launch(mut self) -> JoinHandle<Self> {
pub(super) fn launch(mut self) -> JoinHandle<Self> {
for _ in 0..self.redundancy {
let bcast = self.bcast.new_child();
let id = bcast.id().clone();
Expand All @@ -136,8 +135,7 @@ impl Children {
let parent = self.bcast.sender().clone();
let ctx = BastionContext::new(id, parent, state.clone());

let exec = AssertUnwindSafe(thunk(ctx, msg).0)
.catch_unwind();
let exec = AssertUnwindSafe(thunk(ctx, msg).0).catch_unwind();

let child = Child { exec, bcast, state };
runtime::spawn(child.run());
Expand Down Expand Up @@ -180,22 +178,22 @@ impl Child {
match msg {
BastionMessage::PoisonPill => return self.dead(),
// FIXME
BastionMessage::Dead { .. } => unimplemented!(),
BastionMessage::Dead { .. } => unimplemented!(),
// FIXME
BastionMessage::Faulted { .. } => unimplemented!(),
BastionMessage::Message(msg) => {
state.push_msg(msg);

continue;
},
}
}
}
Poll::Ready(None) => return self.faulted(),
Poll::Pending => (),
}

if let Poll::Ready(res) = poll!(&mut self.exec) {
match res {
match res {
Ok(Ok(())) => return self.dead(),
Ok(Err(())) | Err(_) => return self.faulted(),
}
Expand Down
88 changes: 45 additions & 43 deletions bastion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,71 +10,73 @@ use uuid::Uuid;
pub struct BastionId(Uuid);

pub struct BastionContext {
id: BastionId,
parent: Sender,
state: Qutex<ContextState>,
id: BastionId,
parent: Sender,
state: Qutex<ContextState>,
}

pub(super) struct ContextState {
msgs: VecDeque<Box<dyn Message>>,
msgs: VecDeque<Box<dyn Message>>,
}

impl BastionId {
pub(super) fn new() -> Self {
let uuid = Uuid::new_v4();
pub(super) fn new() -> Self {
let uuid = Uuid::new_v4();

BastionId(uuid)
}
BastionId(uuid)
}
}

impl BastionContext {
pub(super) fn new(id: BastionId, parent: Sender, state: Qutex<ContextState>) -> Self {
BastionContext { id, parent, state }
}
pub(super) fn new(id: BastionId, parent: Sender, state: Qutex<ContextState>) -> Self {
BastionContext { id, parent, state }
}

pub fn id(&self) -> &BastionId {
&self.id
}
pub fn id(&self) -> &BastionId {
&self.id
}

pub fn send_msg(&self, id: &BastionId, msg: Box<dyn Message>) -> Result<(), Box<dyn Message>> {
let msg = BastionMessage::msg(msg);
pub fn send_msg(&self, id: &BastionId, msg: Box<dyn Message>) -> Result<(), Box<dyn Message>> {
let msg = BastionMessage::msg(msg);

// TODO: Err(Error)
REGISTRY.send_child(id, msg).map_err(|msg| msg.into_msg().unwrap())
}
// TODO: Err(Error)
REGISTRY
.send_child(id, msg)
.map_err(|msg| msg.into_msg().unwrap())
}

// TODO: Err(Error)
pub async fn recv(&self) -> Result<Box<dyn Message>, ()> {
loop {
// TODO: Err(Error)
let mut state = self.state.clone().lock_async().await.unwrap();
// TODO: Err(Error)
pub async fn recv(&self) -> Result<Box<dyn Message>, ()> {
loop {
// TODO: Err(Error)
let mut state = self.state.clone().lock_async().await.unwrap();

if let Some(msg) = state.msgs.pop_front() {
return Ok(msg);
}
if let Some(msg) = state.msgs.pop_front() {
return Ok(msg);
}

Guard::unlock(state);
Guard::unlock(state);

pending!();
}
}
pending!();
}
}

pub async fn try_recv(&self) -> Option<Box<dyn Message>> {
// TODO: Err(Error)
let mut state = self.state.clone().lock_async().await.ok()?;
pub async fn try_recv(&self) -> Option<Box<dyn Message>> {
// TODO: Err(Error)
let mut state = self.state.clone().lock_async().await.ok()?;

state.msgs.pop_front()
}
state.msgs.pop_front()
}
}

impl ContextState {
pub(super) fn new() -> Self {
let msgs = VecDeque::new();
pub(super) fn new() -> Self {
let msgs = VecDeque::new();

ContextState { msgs }
}
ContextState { msgs }
}

pub(super) fn push_msg(&mut self, msg: Box<dyn Message>) {
self.msgs.push_back(msg)
}
pub(super) fn push_msg(&mut self, msg: Box<dyn Message>) {
self.msgs.push_back(msg)
}
}
Loading

0 comments on commit be4b1bc

Please sign in to comment.