Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Remove last slab dependency #2917

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};

use slab::Slab;

use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::time::Duration;

Expand Down Expand Up @@ -59,7 +57,18 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
worker_threads: Slab<thread::JoinHandle<()>>,
/// Prior to shutdown, we clean up JoinHandles by having each timed-out
/// thread join on the previous timed-out thread. This is not strictly
/// necessary but helps avoid Valgrind false positives, see
/// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666
/// for more information.
last_exiting_thread: Option<thread::JoinHandle<()>>,
/// This holds the JoinHandles for all running threads; on shutdown, the thread
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
/// benefit)
worker_thread_index: usize,
}

type Task = task::Notified<NoopSchedule>;
Expand Down Expand Up @@ -106,7 +115,9 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
worker_threads: Slab::new(),
last_exiting_thread: None,
worker_threads: HashMap::new(),
worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
Expand Down Expand Up @@ -138,12 +149,21 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());

let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new());

drop(shared);

if self.shutdown_rx.wait(timeout) {
for handle in workers.drain() {
let _ = last_exited_thread.map(|th| th.join());

// Loom requires that execution be deterministic, so sort by thread ID before joining.
// (HashMaps use a randomly-seeded hash function, so the order is nondeterministic)
let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect();
workers.sort_by_key(|(id, _)| *id);

for (_id, handle) in workers.into_iter() {
let _ = handle.join();
}
}
Expand Down Expand Up @@ -205,11 +225,13 @@ impl Spawner {

if let Some(shutdown_tx) = shutdown_tx {
let mut shared = self.inner.shared.lock();
let entry = shared.worker_threads.vacant_entry();

let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
let id = shared.worker_thread_index;
shared.worker_thread_index += 1;

entry.insert(handle);
let handle = self.spawn_thread(shutdown_tx, rt, id);

shared.worker_threads.insert(id, handle);
}

Ok(())
Expand All @@ -219,7 +241,7 @@ impl Spawner {
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
worker_id: usize,
id: usize,
) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());

Expand All @@ -233,20 +255,21 @@ impl Spawner {
.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
rt.blocking_spawner.inner.run(worker_id);
rt.blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
.unwrap()
}
}

impl Inner {
fn run(&self, worker_id: usize) {
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
f()
}

let mut shared = self.shared.lock();
let mut join_on_thread = None;

'main: loop {
// BUSY
Expand Down Expand Up @@ -277,7 +300,11 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
shared.worker_threads.remove(worker_id);
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);

break 'main;
}
Expand Down Expand Up @@ -324,6 +351,10 @@ impl Inner {
if let Some(f) = &self.before_stop {
f()
}

if let Some(handle) = join_on_thread {
let _ = handle.join();
}
}
}

Expand Down