Skip to content

Commit

Permalink
Changed signature of the Supervisor::restart method
Browse files Browse the repository at this point in the history
  • Loading branch information
Relrin committed Jan 21, 2020
1 parent c7b64b7 commit 4c79d7a
Showing 1 changed file with 9 additions and 49 deletions.
58 changes: 9 additions & 49 deletions bastion/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use fxhash::FxHashMap;
use lightproc::prelude::*;
use log::Level;
use std::cmp::{Eq, PartialEq};
use std::ops::RangeFrom;
use std::ops::{Range, RangeFrom};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
Expand Down Expand Up @@ -252,7 +252,7 @@ impl Supervisor {
self.pre_start_msgs.clear();
self.pre_start_msgs.shrink_to_fit();

self.restart(0..).await;
self.restart(0..self.order.len()).await;

debug!(
"Supervisor({}): Removing {} stopped elements.",
Expand Down Expand Up @@ -754,10 +754,10 @@ impl Supervisor {
self
}

async fn restart(&mut self, range: RangeFrom<usize>) {
async fn restart(&mut self, range: Range<usize>) {
debug!("Supervisor({}): Restarting range: {:?}", self.id(), range);
// TODO: stop or kill?
self.kill(range.clone()).await;
self.kill(range.start..).await;

let restart_strategy = self.restart_strategy.clone();
let supervisor_id = &self.id().clone();
Expand Down Expand Up @@ -964,53 +964,13 @@ impl Supervisor {
);
match self.strategy {
SupervisionStrategy::OneForOne => {
let (order, launched, _) = self.launched.remove(&id).ok_or(())?;
// TODO: add a "waiting" list and poll from it instead of awaiting
// FIXME: panics?
let supervised = launched.await.unwrap();
supervised.callbacks().before_restart();

self.bcast.unregister(supervised.id());

let parent = Parent::supervisor(self.as_ref());
let bcast =
Broadcast::new(parent, supervised.elem().clone().with_id(BastionId::new()));
let id = bcast.id().clone();
debug!(
"Supervisor({}): Resetting Supervised({}) to Supervised({}).",
self.id(),
supervised.id(),
bcast.id()
);
// FIXME: panics?
let supervised = supervised.reset(bcast).await.unwrap();
supervised.callbacks().after_restart();

self.bcast.register(supervised.bcast());
if self.started {
let msg = BastionMessage::start();
let env =
Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_child(&id, env);
}

debug!(
"Supervisor({}): Launching Supervised({}).",
self.id(),
supervised.id()
);
let (start, _, _) = self.launched.get(&id).ok_or(())?;
let start = *start;

let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count + 1,
None => 0,
};
let launched = supervised.launch();
self.launched
.insert(id.clone(), (order, launched, restart_count));
self.order[order] = id;
self.restart(start..start+1).await;
}
SupervisionStrategy::OneForAll => {
self.restart(0..).await;
self.restart(0..self.order.len()).await;

// TODO: should be empty
self.stopped.shrink_to_fit();
Expand All @@ -1020,7 +980,7 @@ impl Supervisor {
let (start, _, _) = self.launched.get(&id).ok_or(())?;
let start = *start;

self.restart(start..).await;
self.restart(start..self.order.len()).await;
}
}

Expand Down

0 comments on commit 4c79d7a

Please sign in to comment.