Skip to content

Commit

Permalink
Implement linear backoff and fix possible datarace
Browse files Browse the repository at this point in the history
Closes #24

Signed-off-by: Aron Heinecke <aron.heinecke@t-online.de>
  • Loading branch information
0xpr03 committed Oct 4, 2020
1 parent d0d3160 commit da8ead3
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 23 deletions.
7 changes: 7 additions & 0 deletions config/template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ snapshot_console_on_crash = true
snapshot_console_on_manual_stop = false
# Save last console out on user invoked service kill, default false
snapshot_console_on_manual_kill = false
# backoff in ms on restart
retry_backoff_ms = 1000
# whether to backoff exponentially, based on retry_backoff_ms
retry_expotential_backoff = true
# maximum restarts that happen <= backoff time
retry_max = 10


[[services]]
id = 1
Expand Down
5 changes: 5 additions & 0 deletions examples/crash_backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use std::io::{Error, ErrorKind, Result};

fn main() -> Result<()> {
Err(Error::new(ErrorKind::Other, "oh no!"))
}
4 changes: 4 additions & 0 deletions frontend/src/lib/Api.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export const ServiceState = {
Crashed: "Crashed",
Stopping: "Stopping",
Killed: "Killed",
EndedBackoff: "EndedBackoff",
CrashedBackoff: "CrashedBackoff",
};

export const ConsoleType = {
Expand Down Expand Up @@ -175,6 +177,7 @@ export class Permissions {
}

export class Log {
static ServiceMaxRetries = "ServiceMaxRetries";
static SystemStart = "SystemStartup";
static KilledCmd = "ServiceCmdKilled";
static Killed = "ServiceKilled";
Expand Down Expand Up @@ -213,6 +216,7 @@ export function formatLog(entry) {
switch (Object.keys(entry.action)[0]) {
case Log.StartFailure: return "Startup failure: "+entry.action[Log.StartFailure];
case Log.Crash: return "Service crashed, signal "+entry.action[Log.Crash];
case Log.ServiceMaxRetries: return "Maximum start retries reached: "+entry.action[Log.ServiceMaxRetries];
case Log.Input: return "Console input by "+entry.invoker.name+": "+entry.action[Log.Input];
}
}
Expand Down
7 changes: 6 additions & 1 deletion frontend/src/views/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ export default class Service extends React.Component {
render () {
const running = this.state.state === ServiceState.Running;
const stopping = this.state.state === ServiceState.Stopping;
const stopped = !running && !stopping;
const backoff = this.state.state === ServiceState.EndedBackoff || this.state.state === ServiceState.CrashedBackoff;
const stopped = !running && !stopping && !backoff;
const perms = this.state.permissions;
const perm_console = Permissions.hasFlag(perms, Permissions.OUTPUT) || Permissions.hasFlag(perms, Permissions.STDIN_ALL);
const perm_log = Permissions.hasFlag(perms, Permissions.LOG);
Expand Down Expand Up @@ -231,6 +232,10 @@ export default class Service extends React.Component {
<Col><Button onClick={() => this.stopService()}
disabled={!Permissions.hasFlag(perms, Permissions.STOP)} variant="danger">Stop</Button></Col>
}
{backoff &&
<Col><Button onClick={() => this.stopService()}
disabled={!Permissions.hasFlag(perms, Permissions.STOP)} variant="danger">Abort Backoff</Button></Col>
}
{(running || stopping) &&
<Col><Button onClick={() => this.killService()}
disabled={!Permissions.hasFlag(perms, Permissions.KILL)} variant="danger">Kill</Button></Col>
Expand Down
1 change: 1 addition & 0 deletions src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl NewLogEntry {
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum LogAction {
SystemStartup,
ServiceMaxRetries(usize),
ServiceCmdKilled,
ServiceKilled,
ServiceCmdStop,
Expand Down
2 changes: 2 additions & 0 deletions src/handler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ pub enum ControllerError {
DBError(db::Error),
#[fail(display = "Service has no soft-stop parameter")]
NoSoftStop,
#[fail(display = "Service has no backoff handle!")]
NoBackoffHandle,
}

impl From<db::Error> for ControllerError {
Expand Down
2 changes: 1 addition & 1 deletion src/handler/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub mod unchecked {
#[rtype(result = "Result<(), ControllerError>")]
pub struct StartService {
pub id: SID,
/// Invoker to use for logging
/// Invoker to use for logging & differentiate between user and internal (re)start
pub user: Option<UID>,
}

Expand Down
159 changes: 138 additions & 21 deletions src/handler/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use failure::Fallible;
use futures::stream::StreamExt;
use metrohash::MetroHashMap;
use serde::Serialize;
use std::env::current_dir;
use std::{env::current_dir, time::Duration};
use std::ffi::OsString;
use std::path::Path;
use strip_ansi_escapes as ansi_esc;
Expand Down Expand Up @@ -105,7 +105,7 @@ impl Handler<StartService> for ServiceController {
return Err(ControllerError::ServiceRunning);
}
trace!("starting..");
if let Err(e) = instance.run(ctx.address()) {
if let Err(e) = instance.run(ctx.address(),msg.user.is_some()) {
return Err(ControllerError::StartupIOError(e));
}
Self::log(
Expand Down Expand Up @@ -165,8 +165,12 @@ impl Handler<KillService> for ServiceController {
None,
);
return Ok(());
} else if service.state.in_backoff() {
// handle kill during backoff, abort backoff restart
service.stop_backoff()
} else {
Err(ControllerError::NoServiceHandle)
}
Err(ControllerError::NoServiceHandle)
} else {
Err(ControllerError::InvalidInstance(msg.id))
}
Expand All @@ -178,7 +182,10 @@ impl Handler<StopService> for ServiceController {

fn handle(&mut self, msg: StopService, _ctx: &mut Context<Self>) -> Self::Result {
if let Some(service) = self.services.get_mut(&msg.id) {
if !service.running.load(Ordering::Acquire) {
// service in backoff, stop backoff restart, if applicable
if !service.running.load(Ordering::Acquire) && service.state.in_backoff() {
service.stop_backoff()?;
} else {
return Err(ControllerError::ServiceStopped);
}
let stdin = match service.stdin.as_mut() {
Expand Down Expand Up @@ -229,9 +236,18 @@ impl Handler<ServiceStateChanged> for ServiceController {
snapshot = instance.model.snapshot_console_on_manual_kill;
LogAction::ServiceKilled
}
State::ServiceMaxRetries => {
LogAction::ServiceMaxRetries(instance.backoff_counter)
}
State::Stopping => {
unreachable!("unreachable: service-stopping-state in state update!")
}
State::EndedBackoff => {
unreachable!("unreachable: service-ended-backoff-state in state update!")
}
State::CrashedBackoff => {
unreachable!("unreachable: service-crashed-backoff-state in state update!")
}
};

let log_data = match snapshot {
Expand All @@ -250,15 +266,57 @@ impl Handler<ServiceStateChanged> for ServiceController {
instance.model.restart && state == State::Crashed
};

trace!("restart: {}",restart);
if restart {
ctx.address().do_send(StartService {
id: instance.model.id,
user: None,
});
instance.backoff_counter += 1;
// if no max retry limit and no backoff time is set, restart instantly
if instance.model.retry_max.is_none() && instance.model.retry_backoff_ms.is_none() {
info!("No backoff limit/time configured, restarting \"{}\" instantly.",instance.model.name);
ctx.address().do_send(StartService {
id: instance.model.id,
user: None,
});
} else if instance.can_backoff() {
let backoff_time = instance.get_backoff_time();
let id = instance.model.id.clone();
let name = instance.model.name.clone();
let flag = instance.backoff_kill_flag.clone();
let addr = ctx.address();
let (fut,aborter) = future::abortable(async move {
tokio::time::delay_for(backoff_time).await;
if flag.load(Ordering::Acquire) {
return;
}
trace!("Restarting from backoff");
if let Err(e) = addr.try_send(StartService {
id,
user: None,
}) {
warn!("Unable to send restart message from backoff for {} {}", name, e);
}
});
let id = instance.model.id.clone();
spawn(fut.map(move |v| {
if let Err(e) = v {
error!("Backoff error instance {}: {}", id, e);
}
}));
instance.backoff_kill_handle = Some(aborter);
} else {
trace!("Reached max retries!");
instance.state.set_state(State::ServiceMaxRetries);
ctx.address().do_send(ServiceStateChanged {
id: instance.model.id,
running: false,
});
// TODO: log max retriess
}
} else {
// cleanup
instance.kill_handle = None;
instance.stdin = None;
// reset backoff
instance.reset_backoff(true);
}
}
}
Expand Down Expand Up @@ -460,16 +518,25 @@ struct Instance {
stdin: Option<tokio::sync::mpsc::Sender<String>>,
start_time: Option<u64>,
end_time: Option<u64>,
last_backoff: Option<u64>,
backoff_counter: usize,
/// Handle to kill backoff timer to abort a delayed restart
backoff_kill_handle: Option<future::AbortHandle>,
/// Flag to check, avoiding race condition between aborthandle and future poll on delay end
backoff_kill_flag: Arc<AtomicBool>,
}

#[derive(PartialEq, Serialize)]
pub enum State {
Stopped = 0,
Running = 1,
Ended = 2,
Crashed = 3,
Stopping = 4,
Killed = 5,
EndedBackoff = 3,
Crashed = 4,
CrashedBackoff = 5,
Stopping = 6,
Killed = 7,
ServiceMaxRetries = 8,
}

// derived from https://gist.github.com/polypus74/eabc7bb00873e6b90abe230f9e632989
Expand All @@ -492,6 +559,12 @@ impl StateFlag {
pub fn set_state(&self, state: State) {
self.inner.store(state as usize, Ordering::SeqCst)
}
/// Returns true if state is currently in backoff wait
/// Meaning it's waiting for a delay to restart
pub fn in_backoff(&self) -> bool {
let state = self.get_state();
state == State::CrashedBackoff || state == State::EndedBackoff
}
}

impl From<usize> for State {
Expand All @@ -501,10 +574,13 @@ impl From<usize> for State {
0 => Stopped,
1 => Running,
2 => Ended,
3 => Crashed,
4 => Stopping,
5 => Killed,
_ => unreachable!(),
3 => EndedBackoff,
4 => Crashed,
5 => CrashedBackoff,
6 => Stopping,
7 => Killed,
8 => ServiceMaxRetries,
_ => unreachable!("Invalid service state: {}",val),
}
}
}
Expand Down Expand Up @@ -539,8 +615,8 @@ impl Instance {
msg
}
/// Run instance, outer catch function to log startup errors to tty
fn run(&mut self, addr: Addr<ServiceController>) -> Result<(), ::std::io::Error> {
let res = self.run_internal(addr);
fn run(&mut self, addr: Addr<ServiceController>, user_initiated: bool) -> Result<(), ::std::io::Error> {
let res = self.run_internal(addr, user_initiated);
if let Err(e) = &res {
let mut buffer_w = self.tty.write().expect("Can't write buffer!");
buffer_w.push_back(ConsoleType::State(
Expand Down Expand Up @@ -588,13 +664,14 @@ impl Instance {
}

/// real service starter
fn run_internal(&mut self, addr: Addr<ServiceController>) -> Result<(), ::std::io::Error> {
fn run_internal(&mut self, addr: Addr<ServiceController>, user_initiated: bool) -> Result<(), ::std::io::Error> {
if self.model.enabled
&& !self
.running
.compare_and_swap(false, true, Ordering::Acquire)
.compare_and_swap(false, true, Ordering::AcqRel)
{
trace!("Starting {}", self.model.name);
self.reset_backoff(user_initiated);
trace!("Starting {}, through user: {}", self.model.name,user_initiated);
{
let mut buffer_w = self.tty.write().expect("Can't write buffer!");
buffer_w.push_back(ConsoleType::State(
Expand Down Expand Up @@ -774,6 +851,38 @@ impl Instance {
}
Ok(())
}
/// Reset backoff, also resets counter if enabled
fn reset_backoff(&mut self, backoff: bool) {
trace!("Resetting backoff, counter: {}",backoff);
self.last_backoff = None;
self.backoff_kill_flag.store(false, Ordering::Release);
if backoff {
self.backoff_counter = 0;
}
}
fn can_backoff(&self) -> bool {
trace!("Backoff retries: {}/{:?}",self.backoff_counter,self.model.retry_max);
self.model.retry_max.map_or(true, |v|self.backoff_counter < v)
}
fn get_backoff_time(&self) -> Duration {
trace!("get_backoff_time");
if let Some(v) = self.model.retry_backoff_ms {
Duration::from_millis(v * (self.backoff_counter as u64))
} else {
Duration::from_millis(10_000 * (self.backoff_counter as u64))
}
}
/// Stop backoff from execution
fn stop_backoff(&mut self) -> Result<(), ControllerError> {
trace!("stop_backoff");
if let Some(handle) = self.backoff_kill_handle.take() {
self.backoff_kill_flag.store(true, Ordering::Release);
handle.abort();
Ok(())
} else {
return Err(ControllerError::NoBackoffHandle.into());
}
}
}

impl From<Service> for Instance {
Expand All @@ -788,6 +897,14 @@ impl From<Service> for Instance {
stdin: None,
start_time: None,
end_time: None,
backoff_counter: 0,
last_backoff: None,
backoff_kill_handle: None,
backoff_kill_flag: Arc::new(AtomicBool::new(false)),
// TODO:
// add kill-switch for delayed future, https://docs.rs/tokio/0.2.22/tokio/time/fn.delay_for.html
// to allow delayed backoff future that restarts, but can also be cancelled on manual interaction
// need to also add some kind of additional state flag to show the user a running backoff
}
}
}
}
8 changes: 8 additions & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub struct Service {
pub snapshot_console_on_manual_stop: bool,
#[serde(default)]
pub snapshot_console_on_manual_kill: bool,
#[serde(default)]
pub retry_max: Option<usize>,
#[serde(default)]
pub retry_backoff_ms: Option<u64>,
}

impl Settings {
Expand Down Expand Up @@ -166,6 +170,8 @@ mod tests {
snapshot_console_on_manual_kill: true,
id: 0,
restart: true,
retry_backoff_ms: Some(0),
retry_max: Some(0)
},
Service {
name: "some service2".to_owned(),
Expand All @@ -183,6 +189,8 @@ mod tests {
args: vec!["asd".to_owned(), "def".to_owned()],
id: 1,
restart: true,
retry_backoff_ms: Some(0),
retry_max: Some(0)
},
],
};
Expand Down

0 comments on commit da8ead3

Please sign in to comment.