Skip to content

Commit

Permalink
Updated Supervisor to use vectors instead of trees and cleaned up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
r3v2d0g committed Oct 9, 2019
1 parent a552a03 commit ff14ea3
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 33 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ maintenance = { status = "actively-developed" }
#ctrlc = "3.1.3"
#ego-tree = "0.6.0"
#env_logger = "0.6.1"
#futures = "0.1.28"
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
fxhash = "0.2"
lazy_static = "1.4"
Expand Down
6 changes: 0 additions & 6 deletions src/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ impl Children {
loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(msg)) => {
let id = self.bcast.id().clone();

match msg {
BastionMessage::PoisonPill | BastionMessage::Dead { .. } | BastionMessage::Faulted { .. } => {
if msg.is_faulted() {
Expand Down Expand Up @@ -122,8 +120,6 @@ impl Child {
async fn run(mut self) {
loop {
if let Poll::Ready(res) = poll!(&mut self.exec) {
let id = self.bcast.id().clone();

match res {
Ok(Ok(())) => self.bcast.dead(),
Ok(Err(())) | Err(_) => self.bcast.faulted(),
Expand All @@ -134,8 +130,6 @@ impl Child {

match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(msg)) => {
let id = self.bcast.id().clone();

match msg {
BastionMessage::PoisonPill => {
self.bcast.dead();
Expand Down
2 changes: 1 addition & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::broadcast::{BastionMessage, Broadcast, Sender};
use crate::broadcast::Sender;
use uuid::Uuid;

pub struct BastionContext {
Expand Down
36 changes: 11 additions & 25 deletions src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use futures::{pending, poll};
use futures::prelude::*;
use fxhash::{FxHashMap, FxHashSet};
use runtime::task::JoinHandle;
use std::collections::BTreeMap;
use std::ops::RangeFrom;
use std::task::Poll;
use uuid::Uuid;
Expand All @@ -14,8 +13,9 @@ use uuid::Uuid;
pub struct Supervisor {
bcast: Broadcast,
children: Vec<Children>,
order: BTreeMap<usize, Uuid>, // FIXME
launched: FxHashMap<Uuid, (usize, JoinHandle<Children>)>, // FIXME
// FIXME: contains dead Children
order: Vec<Uuid>,
launched: FxHashMap<Uuid, (usize, JoinHandle<Children>)>,
dead: FxHashSet<Uuid>,
strategy: SupervisionStrategy,
}
Expand All @@ -32,7 +32,7 @@ impl Supervisor {
let bcast = Broadcast::new(id);

let children = Vec::new();
let order = BTreeMap::new();
let order = Vec::new();
let launched = FxHashMap::default();
let dead = FxHashSet::default();
let strategy = SupervisionStrategy::default();
Expand Down Expand Up @@ -76,44 +76,31 @@ impl Supervisor {
}

pub(super) fn launch_children(&mut self) {
let start = if let Some(order) = self.order.keys().next_back() {
*order + 1
} else {
0
};

for (order, children) in self.children.drain(..).enumerate() {
for children in self.children.drain(..) {
let id = children.id().clone();
let order = start + order;

self.order.insert(order, id.clone());
self.launched.insert(id, (order, children.launch()));
self.launched.insert(id.clone(), (self.order.len(), children.launch()));
self.order.push(id);
}
}

async fn kill_children(&mut self, range: RangeFrom<usize>) {
if range.start == 0 {
self.bcast.poison_pill_children();
} else {
for (_, id) in self.order.range(range.clone()) {
// FIXME: panics
for id in self.order.get(range.clone()).unwrap() {
self.bcast.poison_pill_child(id);
}
}

let mut removed = Vec::new();
let mut children = Vec::new();
for (order, id) in self.order.range(range) {
for id in self.order.drain(range) {
// FIXME: Err if None?
if let Some((_, launched)) = self.launched.remove(id) {
if let Some((_, launched)) = self.launched.remove(&id) {
// FIXME: join?
children.push(launched.await);
}

removed.push(*order);
}

for order in removed {
self.order.remove(&order);
}

// FIXME: might remove children
Expand Down Expand Up @@ -160,7 +147,6 @@ impl Supervisor {
self.launched.remove(&id);
self.bcast.remove_child(&id);

// FIXME: remove from order?
self.dead.insert(id);
}
BastionMessage::Faulted { id } => {
Expand Down

0 comments on commit ff14ea3

Please sign in to comment.