Skip to content

Commit

Permalink
Cleanup background task without abort
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein committed Nov 11, 2022
1 parent dbbb13e commit 282baee
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions sled-agent/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use uuid::Uuid;
Expand Down Expand Up @@ -252,6 +253,13 @@ impl<'t> SmfHelper<'t> {
}
}

struct Task {
// A signal for the initializer task to terminate
exit_tx: oneshot::Sender<()>,
// A task repeatedly trying to initialize the zone
initializer: JoinHandle<()>,
}

// Describes the Switch Zone state.
enum SwitchZone {
// The switch zone is not currently running.
Expand All @@ -261,8 +269,8 @@ enum SwitchZone {
Initializing {
// The request for the zone
request: ServiceZoneRequest,
// A task repeatedly trying to initialize the zone
initializer: JoinHandle<()>,
// A background task
worker: Option<Task>,
},
// The Zone is currently running.
Running(RunningZone),
Expand Down Expand Up @@ -834,14 +842,18 @@ impl ServiceManager {
let log = &self.inner.log;
let mut switch_zone = self.inner.switch_zone.lock().await;

match (&*switch_zone, request) {
match (&mut *switch_zone, request) {
(SwitchZone::Disabled, Some(request)) => {
info!(log, "Enabling switch zone (new)");
let mgr = self.clone();
let (exit_tx, exit_rx) = oneshot::channel();
*switch_zone = SwitchZone::Initializing {
request,
initializer: tokio::task::spawn(async move {
mgr.initialize_switch_zone().await
worker: Some(Task {
exit_tx,
initializer: tokio::task::spawn(async move {
mgr.initialize_switch_zone(exit_rx).await
}),
}),
};
}
Expand All @@ -854,9 +866,19 @@ impl ServiceManager {
(SwitchZone::Disabled, None) => {
info!(log, "Disabling switch zone (already complete)");
}
(SwitchZone::Initializing { initializer, .. }, None) => {
(SwitchZone::Initializing { worker, .. }, None) => {
info!(log, "Disabling switch zone (was initializing)");
initializer.abort();
let worker = worker
.take()
.expect("Initializing without background task");
// If this succeeds, we told the background task to exit
// successfully. If it fails, the background task already
// exited.
let _ = worker.exit_tx.send(());
worker
.initializer
.await
.expect("Switch initializer task panicked");
*switch_zone = SwitchZone::Disabled;
}
(SwitchZone::Running(_), None) => {
Expand All @@ -866,7 +888,7 @@ impl ServiceManager {
}
}

async fn initialize_switch_zone(&self) {
async fn initialize_switch_zone(&self, mut exit_rx: oneshot::Receiver<()>) {
loop {
{
let mut switch_zone = self.inner.switch_zone.lock().await;
Expand All @@ -889,9 +911,14 @@ impl ServiceManager {
}
}

// Poll for the device every second - this timeout is somewhat
// arbitrary, but we probably don't want to use backoff here.
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::select! {
// If we've been told to stop trying, bail.
_ = &mut exit_rx => return,

// Poll for the device every second - this timeout is somewhat
// arbitrary, but we probably don't want to use backoff here.
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => (),
};
}
}
}
Expand Down

0 comments on commit 282baee

Please sign in to comment.