From 55e166d61c635e8574636183b611e6bb1fe1ddd6 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Thu, 22 Oct 2020 20:46:03 +0000 Subject: [PATCH] runtime: Remove last slab dependency This removes the last slab dependency by replacing the current slab-based JoinHandle tracking with one based on HashMap instead. --- tokio/src/runtime/blocking/pool.rs | 59 +++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index d0f2c1c8d14..a00db8e83bb 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -10,9 +10,7 @@ use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; -use slab::Slab; - -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::time::Duration; @@ -59,7 +57,18 @@ struct Shared { num_notify: u32, shutdown: bool, shutdown_tx: Option, - worker_threads: Slab>, + /// Prior to shutdown, we clean up JoinHandles by having each timed-out + /// thread join on the previous timed-out thread. This is not strictly + /// necessary but helps avoid Valgrind false positives, see + /// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666 + /// for more information. + last_exiting_thread: Option>, + /// This holds the JoinHandles for all running threads; on shutdown, the thread + /// calling shutdown handles joining on these. + worker_threads: HashMap>, + /// This is a counter used to iterate worker_threads in a consistent order (for loom's + /// benefit) + worker_thread_index: usize, } type Task = task::Notified; @@ -106,7 +115,9 @@ impl BlockingPool { num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), - worker_threads: Slab::new(), + last_exiting_thread: None, + worker_threads: HashMap::new(), + worker_thread_index: 0, }), condvar: Condvar::new(), thread_name: builder.thread_name.clone(), @@ -138,12 +149,21 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); - let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new()); + + let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); + let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new()); drop(shared); if self.shutdown_rx.wait(timeout) { - for handle in workers.drain() { + let _ = last_exited_thread.map(|th| th.join()); + + // Loom requires that execution be deterministic, so sort by thread ID before joining. + // (HashMaps use a randomly-seeded hash function, so the order is nondeterministic) + let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect(); + workers.sort_by_key(|(id, _)| *id); + + for (_id, handle) in workers.into_iter() { let _ = handle.join(); } } @@ -205,11 +225,13 @@ impl Spawner { if let Some(shutdown_tx) = shutdown_tx { let mut shared = self.inner.shared.lock(); - let entry = shared.worker_threads.vacant_entry(); - let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); + let id = shared.worker_thread_index; + shared.worker_thread_index += 1; - entry.insert(handle); + let handle = self.spawn_thread(shutdown_tx, rt, id); + + shared.worker_threads.insert(id, handle); } Ok(()) @@ -219,7 +241,7 @@ impl Spawner { &self, shutdown_tx: shutdown::Sender, rt: &Handle, - worker_id: usize, + id: usize, ) -> thread::JoinHandle<()> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); @@ -233,7 +255,7 @@ impl Spawner { .spawn(move || { // Only the reference should be moved into the closure let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(worker_id); + rt.blocking_spawner.inner.run(id); drop(shutdown_tx); }) .unwrap() @@ -241,12 +263,13 @@ impl Spawner { } impl Inner { - fn run(&self, worker_id: usize) { + fn run(&self, worker_thread_id: usize) { if let Some(f) = &self.after_start { f() } let mut shared = self.shared.lock(); + let mut join_on_thread = None; 'main: loop { // BUSY @@ -277,7 +300,11 @@ impl Inner { // Even if the condvar "timed out", if the pool is entering the // shutdown phase, we want to perform the cleanup logic. if !shared.shutdown && timeout_result.timed_out() { - shared.worker_threads.remove(worker_id); + // We'll join the prior timed-out thread's JoinHandle after dropping the lock. + // This isn't done when shutting down, because the thread calling shutdown will + // handle joining everything. + let my_handle = shared.worker_threads.remove(&worker_thread_id); + join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle); break 'main; } @@ -324,6 +351,10 @@ impl Inner { if let Some(f) = &self.before_stop { f() } + + if let Some(handle) = join_on_thread { + let _ = handle.join(); + } } }