Skip to content

Commit

Permalink
Merge pull request #1121 from rust-lang/deadlock-begone
Browse files Browse the repository at this point in the history
  • Loading branch information
shepmaster authored Nov 19, 2024
2 parents 0f564e6 + 13631b4 commit 5cefccc
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
4 changes: 3 additions & 1 deletion compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2848,8 +2848,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
let stdin = SyncIoBridge::new(stdin);
let mut stdin = BufWriter::new(stdin);

let handle = tokio::runtime::Handle::current();

loop {
let coordinator_msg = futures::executor::block_on(async {
let coordinator_msg = handle.block_on(async {
select! {
() = token.cancelled() => None,
msg = rx.recv() => msg,
Expand Down
6 changes: 4 additions & 2 deletions compiler/base/orchestrator/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ impl ProcessState {
let statistics_task = tokio::task::spawn_blocking({
let child_id = child.id();
let worker_msg_tx = worker_msg_tx.clone();
move || stream_command_statistics(child_id, worker_msg_tx)
let handle = tokio::runtime::Handle::current();
move || stream_command_statistics(child_id, worker_msg_tx, handle)
});

let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
Expand Down Expand Up @@ -943,6 +944,7 @@ mod stats {
fn stream_command_statistics(
child_id: Option<u32>,
worker_msg_tx: MultiplexingSender,
handle: tokio::runtime::Handle,
) -> Result<(), CommandStatisticsError> {
use command_statistics_error::*;
use stats::*;
Expand All @@ -959,7 +961,7 @@ fn stream_command_statistics(
let process = Process::new(process_id).context(InvalidProcessSnafu { process_id })?;

while let Some(stats) = process.stats() {
let sent = futures::executor::block_on(worker_msg_tx.send_ok(stats));
let sent = handle.block_on(worker_msg_tx.send_ok(stats));
if sent.is_err() {
// No one listening anymore
break;
Expand Down
12 changes: 8 additions & 4 deletions ui/src/request_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Handle {
let g = self
.attempt_start_request(category, payload)
.await
.map(|id| EndGuardInner(id, How::Abandoned, self));
.map(|id| EndGuardInner(id, How::Abandoned, Some(self)));
EndGuard(g)
}
}
Expand All @@ -238,12 +238,16 @@ impl EndGuard {
}
}

struct EndGuardInner(Id, How, Handle);
struct EndGuardInner(Id, How, Option<Handle>);

impl Drop for EndGuardInner {
fn drop(&mut self) {
let Self(id, how, ref handle) = *self;
futures::executor::block_on(handle.attempt_end_request(id, how))
let Self(id, how, ref mut handle) = *self;
if let Ok(h) = tokio::runtime::Handle::try_current() {
if let Some(handle) = handle.take() {
h.spawn(async move { handle.attempt_end_request(id, how).await });
}
}
}
}

Expand Down
13 changes: 8 additions & 5 deletions ui/src/server_axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::{
sync::{Arc, LazyLock},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::sync::Mutex;
use tokio::{select, sync::Mutex};
use tower_http::{
cors::{self, CorsLayer},
request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
Expand Down Expand Up @@ -71,7 +71,7 @@ pub(crate) async fn serve(config: Config) {
let factory = Factory(Arc::new(config.coordinator_factory()));

let request_db = config.request_database();
let (_db_task, db_handle) = request_db.spawn();
let (db_task, db_handle) = request_db.spawn();

let root_files = static_file_service(config.root_path(), MAX_AGE_ONE_DAY);
let asset_files = static_file_service(config.asset_path(), MAX_AGE_ONE_YEAR);
Expand Down Expand Up @@ -170,9 +170,12 @@ pub(crate) async fn serve(config: Config) {
.await
.unwrap();

axum::serve(listener, app.into_make_service())
.await
.unwrap();
let server = axum::serve(listener, app.into_make_service());

select! {
v = server => v.unwrap(),
v = db_task => v.unwrap(),
}
}

fn get_or_post<T: 'static>(handler: impl Handler<T, ()> + Copy) -> MethodRouter {
Expand Down

0 comments on commit 5cefccc

Please sign in to comment.