Skip to content

Commit

Permalink
windsock: fix rare cloud shotover shutdown race condition (#1369)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 17, 2023
1 parent d9a6091 commit 27f0e24
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions shotover-proxy/benches/windsock/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ impl Ec2InstanceWithShotover {
.push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml"))
.await;

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
tokio::task::spawn(async move {
let mut receiver = self
.instance
Expand All @@ -300,24 +301,29 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo
if let Level::Error = event.level {
tracing::error!("shotover error:\n {event}");
}
if tx.send(event).is_err() {
if event_tx.send(event).is_err() {
return
}
}
Some(Err(err)) => panic!("shotover-bin failed: {err:?}"),
None => return,
}
},
_ = tx.closed() => {
_ = shutdown_rx.recv() => {
// shutdown_tx is dropped, instructing us to shutdown
// we MUST drop self before dropping event_tx to ensure that the Arc<Ec2InstanceWithShotover> clone is dropped before the task indicates that it has terminated.
// Otherwise we may hit a race condition and fail the assertion that there is only one Arc<Ec2InstanceWithShotover> clone alive.
std::mem::drop(self);
std::mem::drop(event_tx);
return;
},
};
}
}
});

// wait for shotover to startup
loop {
let event = rx
let event = event_rx
.recv()
.await
.expect("Shotover shutdown before indicating that it had started");
Expand All @@ -328,23 +334,28 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo
break;
}
}
RunningShotover { rx }

RunningShotover {
shutdown_tx,
event_rx,
}
}
}

pub struct RunningShotover {
rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>,
event_rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
}

impl RunningShotover {
pub async fn shutdown(mut self) {
while let Ok(event) = self.rx.try_recv() {
// dropping shutdown_tx instructs the task to shutdown causing shotover to be terminated
std::mem::drop(self.shutdown_tx);

while let Some(event) = self.event_rx.recv().await {
if let Level::Warn | Level::Error = event.level {
panic!("Received error/warn event from shotover:\n {event}")
}
}

// dropping rx instructs the task to shutdown causing shotover to be terminated
std::mem::drop(self.rx)
}
}

0 comments on commit 27f0e24

Please sign in to comment.