From 61d635e8adab9c9d5e7b35577112061d00e00bc0 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sun, 15 Apr 2018 12:29:22 -0700 Subject: [PATCH] Threadpool blocking (#317) This patch adds a `blocking` to `tokio-threadpool`. This function serves as a way to annotate sections of code that will perform blocking operations. This informs the thread pool that an additional thread needs to be spawned to replace the current thread, which will no longer be able to process the work queue. --- .travis.yml | 17 +- ci/tsan | 26 + tokio-threadpool/Cargo.toml | 3 + tokio-threadpool/benches/blocking.rs | 148 ++++++ tokio-threadpool/src/blocking.rs | 148 ++++++ tokio-threadpool/src/builder.rs | 96 +++- tokio-threadpool/src/config.rs | 16 +- tokio-threadpool/src/lib.rs | 6 +- tokio-threadpool/src/notifier.rs | 45 +- tokio-threadpool/src/park/default_park.rs | 27 +- tokio-threadpool/src/pool/backup.rs | 304 +++++++++++ tokio-threadpool/src/pool/backup_stack.rs | 185 +++++++ tokio-threadpool/src/pool/mod.rs | 270 ++++++++-- tokio-threadpool/src/shutdown.rs | 5 +- tokio-threadpool/src/task/blocking.rs | 499 ++++++++++++++++++ tokio-threadpool/src/task/blocking_state.rs | 89 ++++ tokio-threadpool/src/task/mod.rs | 47 +- tokio-threadpool/src/task/queue.rs | 2 +- tokio-threadpool/src/task/state.rs | 4 +- tokio-threadpool/src/worker/entry.rs | 8 +- tokio-threadpool/src/worker/mod.rs | 465 ++++++++++++---- .../src/{pool => worker}/stack.rs | 15 +- tokio-threadpool/src/worker/state.rs | 2 +- tokio-threadpool/tests/blocking.rs | 410 ++++++++++++++ tokio-threadpool/tests/hammer.rs | 107 ++++ tokio-threadpool/tests/threadpool.rs | 136 +---- 26 files changed, 2794 insertions(+), 286 deletions(-) create mode 100644 tokio-threadpool/benches/blocking.rs create mode 100644 tokio-threadpool/src/blocking.rs create mode 100644 tokio-threadpool/src/pool/backup.rs create mode 100644 tokio-threadpool/src/pool/backup_stack.rs create mode 100644 tokio-threadpool/src/task/blocking.rs create mode 100644 tokio-threadpool/src/task/blocking_state.rs rename tokio-threadpool/src/{pool => worker}/stack.rs (96%) create mode 100644 tokio-threadpool/tests/blocking.rs create mode 100644 tokio-threadpool/tests/hammer.rs diff --git a/.travis.yml b/.travis.yml index f51370edeb2..2c0fab09813 100644 --- a/.travis.yml +++ b/.travis.yml @@ -35,15 +35,28 @@ script: # Make sure the benchmarks compile cargo build --benches --all + export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" + export TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" + + # === tokio-timer ==== + # Run address sanitizer - ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" \ RUSTFLAGS="-Z sanitizer=address" \ cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu # Run thread sanitizer - TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" \ RUSTFLAGS="-Z sanitizer=thread" \ cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu + + # === tokio-threadpool ==== + + # Run address sanitizer + RUSTFLAGS="-Z sanitizer=address" \ + cargo test -p tokio-threadpool --tests + + # Run thread sanitizer + RUSTFLAGS="-Z sanitizer=thread" \ + cargo test -p tokio-threadpool --tests fi - | set -e diff --git a/ci/tsan b/ci/tsan index 552e0accf4c..e81ae691407 100644 --- a/ci/tsan +++ b/ci/tsan @@ -4,3 +4,29 @@ # This causes many false positives. race:Arc*drop race:arc*Weak*drop + +# `std` mpsc is not used in any Tokio code base. This race is triggered by some +# rust runtime logic. +race:std*mpsc_queue + +# Probably more fences in std. +race:__call_tls_dtors + +# The crossbeam deque uses fences. +race:crossbeam_deque + +# This is excluded as this race shows up due to using the stealing features of +# the deque. Unfortunately, the implementation uses a fence, which makes tsan +# unhappy. +# +# TODO: It would be nice to not have to filter this out. +race:try_steal_task + +# This filters out an expected data race in the treiber stack implementation. +# Treiber stacks are inherently racy. The pop operation will attempt to access +# the "next" pointer on the node it is attempting to pop. However, at this +# point it has not gained ownership of the node and another thread might beat +# it and take ownership of the node first (touching the next pointer). The +# original pop operation will fail due to the ABA guard, but tsan still picks +# up the access on the next pointer. +race:Backup::next_sleeper diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml index 0914bb920b5..f0e6fd42401 100644 --- a/tokio-threadpool/Cargo.toml +++ b/tokio-threadpool/Cargo.toml @@ -26,7 +26,10 @@ futures2 = { version = "0.1", path = "../futures2", optional = true } [dev-dependencies] tokio-timer = "0.1" env_logger = "0.4" + +# For comparison benchmarks futures-cpupool = "0.1.7" +threadpool = "1.7.1" [features] unstable-futures = [ diff --git a/tokio-threadpool/benches/blocking.rs b/tokio-threadpool/benches/blocking.rs new file mode 100644 index 00000000000..ea432c885f9 --- /dev/null +++ b/tokio-threadpool/benches/blocking.rs @@ -0,0 +1,148 @@ +#![feature(test)] +#![deny(warnings)] + +extern crate futures; +extern crate rand; +extern crate tokio_threadpool; +extern crate threadpool; +extern crate test; + +const ITER: usize = 1_000; + +mod blocking { + use super::*; + + use futures::future::*; + use tokio_threadpool::{Builder, blocking}; + + #[bench] + fn cpu_bound(b: &mut test::Bencher) { + let pool = Builder::new() + .pool_size(2) + .max_blocking(20) + .build(); + + b.iter(|| { + let count_down = Arc::new(CountDown::new(::ITER)); + + for _ in 0..::ITER { + let count_down = count_down.clone(); + + pool.spawn(lazy(move || { + poll_fn(|| { + blocking(|| { + perform_complex_computation() + }) + .map_err(|_| panic!()) + }) + .and_then(move |_| { + // Do something with the value + count_down.dec(); + Ok(()) + }) + })); + } + + count_down.wait(); + }) + } +} + +mod message_passing { + use super::*; + + use futures::future::*; + use futures::sync::oneshot; + use tokio_threadpool::Builder; + + #[bench] + fn cpu_bound(b: &mut test::Bencher) { + let pool = Builder::new() + .pool_size(2) + .max_blocking(20) + .build(); + + let blocking = threadpool::ThreadPool::new(20); + + b.iter(|| { + let count_down = Arc::new(CountDown::new(::ITER)); + + for _ in 0..::ITER { + let count_down = count_down.clone(); + let blocking = blocking.clone(); + + pool.spawn(lazy(move || { + // Create a channel to receive the return value. + let (tx, rx) = oneshot::channel(); + + // Spawn a task on the blocking thread pool to process the + // computation. + blocking.execute(move || { + let res = perform_complex_computation(); + tx.send(res).unwrap(); + }); + + rx.and_then(move |_| { + count_down.dec(); + Ok(()) + }).map_err(|_| panic!()) + })); + } + + count_down.wait(); + }) + } +} + +fn perform_complex_computation() -> usize { + use rand::*; + + // Simulate a CPU heavy computation + let mut rng = rand::thread_rng(); + rng.gen() +} + +// Util for waiting until the tasks complete + +use std::sync::*; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::*; + +struct CountDown { + rem: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +impl CountDown { + fn new(rem: usize) -> Self { + CountDown { + rem: AtomicUsize::new(rem), + mutex: Mutex::new(()), + condvar: Condvar::new(), + } + } + + fn dec(&self) { + let prev = self.rem.fetch_sub(1, AcqRel); + + if prev != 1 { + return; + } + + let _lock = self.mutex.lock().unwrap(); + self.condvar.notify_all(); + } + + fn wait(&self) { + let mut lock = self.mutex.lock().unwrap(); + + loop { + if self.rem.load(Acquire) == 0 { + return; + } + + lock = self.condvar.wait(lock).unwrap(); + } + } +} diff --git a/tokio-threadpool/src/blocking.rs b/tokio-threadpool/src/blocking.rs new file mode 100644 index 00000000000..212dbe01c05 --- /dev/null +++ b/tokio-threadpool/src/blocking.rs @@ -0,0 +1,148 @@ +use worker::Worker; + +use futures::Poll; + +/// Error raised by `blocking`. +#[derive(Debug)] +pub struct BlockingError { + _p: (), +} + +/// Enter a blocking section of code. +/// +/// The `blocking` function annotates a section of code that performs a blocking +/// operation, either by issuing a blocking syscall or by performing a long +/// running CPU-bound computation. +/// +/// When the `blocking` function enters, it hands off the responsibility of +/// processing the current work queue to another thread. Then, it calls the +/// supplied closure. The closure is permitted to block indefinitely. +/// +/// If the maximum number of concurrent `blocking` calls has been reached, then +/// `NotReady` is returned and the task is notified once existing `blocking` +/// calls complete. The maximum value is specified when creating a thread pool +/// using [`Builder::max_blocking`][build] +/// +/// [build]: struct.Builder.html#method.max_blocking +/// +/// # Return +/// +/// When the blocking closure is executed, `Ok(T)` is returned, where `T` is the +/// closure's return value. +/// +/// If the thread pool has shutdown, `Err` is returned. +/// +/// If the number of concurrent `blocking` calls has reached the maximum, +/// `Ok(NotReady)` is returned and the current task is notified when a call to +/// `blocking` will succeed. +/// +/// If `blocking` is called from outside the context of a Tokio thread pool, +/// `Err` is returned. +/// +/// # Background +/// +/// By default, the Tokio thread pool expects that tasks will only run for short +/// periods at a time before yielding back to the thread pool. This is the basic +/// premise of cooperative multitasking. +/// +/// However, it is common to want to perform a blocking operation while +/// processing an asynchronous computation. Examples of blocking operation +/// include: +/// +/// * Performing synchronous file operations (reading and writing). +/// * Blocking on acquiring a mutex. +/// * Performing a CPU bound computation, like cryptographic encryption or +/// decryption. +/// +/// One option for dealing with blocking operations in an asynchronous context +/// is to use a thread pool dedicated to performing these operations. This not +/// ideal as it requires bidirectional message passing as well as a channel to +/// communicate which adds a level of buffering. +/// +/// Instead, `blocking` hands off the responsiblity of processing the work queue +/// to another thread. This hand off is light compared to a channel and does not +/// require buffering. +/// +/// # Examples +/// +/// Block on receiving a message from a `std` channel. This example is a little +/// silly as using the non-blocking channel from the `futures` crate would make +/// more sense. The blocking receive can be replaced with any blocking operation +/// that needs to be performed. +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio_threadpool; +/// +/// use tokio_threadpool::{ThreadPool, blocking}; +/// +/// use futures::Future; +/// use futures::future::{lazy, poll_fn}; +/// +/// use std::sync::mpsc; +/// use std::thread; +/// use std::time::Duration; +/// +/// pub fn main() { +/// // This is a *blocking* channel +/// let (tx, rx) = mpsc::channel(); +/// +/// // Spawn a thread to send a message +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// tx.send("hello").unwrap(); +/// }); +/// +/// let pool = ThreadPool::new(); +/// +/// pool.spawn(lazy(move || { +/// // Because `blocking` returns `Poll`, it is intended to be used +/// // from the context of a `Future` implementation. Since we don't +/// // have a complicated requirement, we can use `poll_fn` in this +/// // case. +/// poll_fn(move || { +/// blocking(|| { +/// let msg = rx.recv().unwrap(); +/// println!("message = {}", msg); +/// }).map_err(|_| panic!("the threadpool shut down")) +/// }) +/// })); +/// +/// // Wait for the task we just spawned to complete. +/// pool.shutdown_on_idle().wait().unwrap(); +/// } +/// ``` +pub fn blocking(f: F) -> Poll +where F: FnOnce() -> T, +{ + let res = Worker::with_current(|worker| { + let worker = match worker { + Some(worker) => worker, + None => { + return Err(BlockingError { _p: () }); + } + }; + + // Transition the worker state to blocking. This will exit the fn early + // with `NotRead` if the pool does not have enough capacity to enter + // blocking mode. + worker.transition_to_blocking() + }); + + // If the transition cannot happen, exit early + try_ready!(res); + + // Currently in blocking mode, so call the inner closure + let ret = f(); + + // Try to transition out of blocking mode. This is a fast path that takes + // back ownership of the worker if the worker handoff didn't complete yet. + Worker::with_current(|worker| { + // Worker must be set since it was above. + worker.unwrap() + .transition_from_blocking(); + }); + + // Return the result + Ok(ret.into()) +} diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index 9583206c255..0e390fd721c 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -2,7 +2,7 @@ use callback::Callback; use config::{Config, MAX_WORKERS}; use park::{BoxPark, BoxedPark, DefaultPark}; use sender::Sender; -use pool::Pool; +use pool::{Pool, MAX_BACKUP}; use thread_pool::ThreadPool; use worker::{self, Worker, WorkerId}; @@ -63,6 +63,10 @@ pub struct Builder { /// Number of workers to spawn pool_size: usize, + /// Maximum number of futures that can be in a blocking section + /// concurrently. + max_blocking: usize, + /// Generates the `Park` instances new_park: Box BoxPark>, } @@ -99,11 +103,14 @@ impl Builder { Builder { pool_size: num_cpus, + max_blocking: 100, config: Config { keep_alive: None, name_prefix: None, stack_size: None, around_worker: None, + after_start: None, + before_stop: None, }, new_park, } @@ -138,6 +145,37 @@ impl Builder { self } + /// Set the maximum number of concurrent blocking sections. + /// + /// When the maximum concurrent `blocking` calls is reached, any further + /// calls to `blocking` will return `NotReady` and the task is notified once + /// previously in-flight calls to `blocking` return. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is 100. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = Builder::new() + /// .max_blocking(200) + /// .build(); + /// # } + /// ``` + pub fn max_blocking(&mut self, val: usize) -> &mut Self { + assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP); + self.max_blocking = val; + self + } + /// Set the worker thread keep alive duration /// /// If set, a worker thread will wait for up to the specified duration for @@ -255,6 +293,61 @@ impl Builder { self } + /// Execute function `f` after each thread is started but before it starts + /// doing work. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = Builder::new() + /// .after_start(|| { + /// println!("thread started"); + /// }) + /// .build(); + /// # } + /// ``` + pub fn after_start(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.config.after_start = Some(Arc::new(f)); + self + } + + /// Execute function `f` before each thread stops. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::Builder; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = Builder::new() + /// .before_stop(|| { + /// println!("thread stopping"); + /// }) + /// .build(); + /// # } + /// ``` + pub fn before_stop(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.config.before_stop = Some(Arc::new(f)); + self + } + /// Customize the `park` instance used by each worker thread. /// /// The provided closure `f` is called once per worker and returns a `Park` @@ -331,6 +424,7 @@ impl Builder { let inner = Arc::new( Pool::new( workers.into_boxed_slice(), + self.max_blocking, self.config.clone())); // Wrap with `Sender` diff --git a/tokio-threadpool/src/config.rs b/tokio-threadpool/src/config.rs index 70fc20872d2..e51efc2ba40 100644 --- a/tokio-threadpool/src/config.rs +++ b/tokio-threadpool/src/config.rs @@ -1,18 +1,32 @@ use callback::Callback; +use std::fmt; +use std::sync::Arc; use std::time::Duration; /// Thread pool specific configuration values -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct Config { pub keep_alive: Option, // Used to configure a worker thread pub name_prefix: Option, pub stack_size: Option, pub around_worker: Option, + pub after_start: Option>, + pub before_stop: Option>, } /// Max number of workers that can be part of a pool. This is the most that can /// fit in the scheduler state. Note, that this is the max number of **active** /// threads. There can be more standby threads. pub(crate) const MAX_WORKERS: usize = 1 << 15; + +impl fmt::Debug for Config { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Config") + .field("keep_alive", &self.keep_alive) + .field("name_prefix", &self.name_prefix) + .field("stack_size", &self.stack_size) + .finish() + } +} diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index 5c33b09db24..1fea677e943 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -4,8 +4,10 @@ #![deny(warnings, missing_docs, missing_debug_implementations)] extern crate tokio_executor; -extern crate futures; + extern crate crossbeam_deque as deque; +#[macro_use] +extern crate futures; extern crate num_cpus; extern crate rand; @@ -17,6 +19,7 @@ extern crate futures2; pub mod park; +mod blocking; mod builder; mod callback; mod config; @@ -31,6 +34,7 @@ mod task; mod thread_pool; mod worker; +pub use blocking::{blocking, BlockingError}; pub use builder::Builder; pub use sender::Sender; pub use shutdown::Shutdown; diff --git a/tokio-threadpool/src/notifier.rs b/tokio-threadpool/src/notifier.rs index 13190b4bba6..74c78cc8025 100644 --- a/tokio-threadpool/src/notifier.rs +++ b/tokio-threadpool/src/notifier.rs @@ -2,6 +2,7 @@ use pool::Pool; use task::Task; use std::mem; +use std::ops; use std::sync::{Arc, Weak}; use futures::executor::Notify; @@ -15,14 +16,22 @@ pub(crate) struct Notifier { pub inner: Weak, } +/// A guard that ensures that the inner value gets forgotten. +#[derive(Debug)] +struct Forget(Option); + impl Notify for Notifier { fn notify(&self, id: usize) { trace!("Notifier::notify; id=0x{:x}", id); unsafe { let ptr = id as *const Task; - let task = Arc::from_raw(ptr); + // We did not actually take ownership of the `Arc` in this function + // so we must ensure that the Arc is forgotten. + let task = Forget::new(Arc::from_raw(ptr)); + + // TODO: Unify this with Task::notify if task.schedule() { // TODO: Check if the pool is still running // @@ -33,9 +42,6 @@ impl Notify for Notifier { let _ = inner.submit(task, &inner); } } - - // We did not actually take ownership of the `Arc` in this function. - mem::forget(task); } } @@ -47,15 +53,12 @@ impl Notify for Notifier { // is to call `Arc::from_raw` which returns a strong ref. So, to // maintain the invariants, `t1` has to be forgotten. This prevents the // ref count from being decremented. - let t1 = unsafe { Arc::from_raw(ptr) }; - let t2 = t1.clone(); + let t1 = Forget::new(unsafe { Arc::from_raw(ptr) }); - mem::forget(t1); - - // t2 is forgotten so that the fn exits without decrementing the ref + // The clone is forgotten so that the fn exits without decrementing the ref // count. The caller of `clone_id` ensures that `drop_id` is called when // the ref count needs to be decremented. - mem::forget(t2); + let _ = Forget::new(t1.clone()); id } @@ -67,3 +70,25 @@ impl Notify for Notifier { } } } + +// ===== impl Forget ===== + +impl Forget { + fn new(t: T) -> Self { + Forget(Some(t)) + } +} + +impl ops::Deref for Forget { + type Target = T; + + fn deref(&self) -> &T { + self.0.as_ref().unwrap() + } +} + +impl Drop for Forget { + fn drop(&mut self) { + mem::forget(self.0.take()); + } +} diff --git a/tokio-threadpool/src/park/default_park.rs b/tokio-threadpool/src/park/default_park.rs index 57a0c9a853b..73e98b0e86c 100644 --- a/tokio-threadpool/src/park/default_park.rs +++ b/tokio-threadpool/src/park/default_park.rs @@ -53,6 +53,17 @@ impl DefaultPark { DefaultPark { inner } } + + /// Unpark the thread without having to clone the unpark handle. + /// + /// Named `notify` to avoid conflicting with the `unpark` fn. + pub(crate) fn notify(&self) { + self.inner.unpark(); + } + + pub(crate) fn park_sync(&self, duration: Option) { + self.inner.park(duration); + } } impl Park for DefaultPark { @@ -65,11 +76,13 @@ impl Park for DefaultPark { } fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park(None) + self.inner.park(None); + Ok(()) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.inner.park(Some(duration)) + self.inner.park(Some(duration)); + Ok(()) } } @@ -83,11 +96,11 @@ impl Unpark for DefaultUnpark { impl Inner { /// Park the current thread for at most `dur`. - fn park(&self, timeout: Option) -> Result<(), ParkError> { + fn park(&self, timeout: Option) { // If currently notified, then we skip sleeping. This is checked outside // of the lock to avoid acquiring a mutex if not necessary. match self.state.compare_and_swap(NOTIFY, IDLE, SeqCst) { - NOTIFY => return Ok(()), + NOTIFY => return, IDLE => {}, _ => unreachable!(), } @@ -95,7 +108,7 @@ impl Inner { // If the duration is zero, then there is no need to actually block if let Some(ref dur) = timeout { if *dur == Duration::from_millis(0) { - return Ok(()); + return; } } @@ -109,7 +122,7 @@ impl Inner { // Notified before we could sleep, consume the notification and // exit self.state.store(IDLE, SeqCst); - return Ok(()); + return; } IDLE => {}, _ => unreachable!(), @@ -128,8 +141,6 @@ impl Inner { // except that I find it helpful to make it explicit where we want the // mutex to unlock. drop(m); - - Ok(()) } fn unpark(&self) { diff --git a/tokio-threadpool/src/pool/backup.rs b/tokio-threadpool/src/pool/backup.rs new file mode 100644 index 00000000000..4f0c295a539 --- /dev/null +++ b/tokio-threadpool/src/pool/backup.rs @@ -0,0 +1,304 @@ +use park::DefaultPark; +use worker::{WorkerId}; + +use std::cell::UnsafeCell; +use std::fmt; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed}; + +/// State associated with a thread in the thread pool. +/// +/// The pool manages a number of threads. Some of those threads are considered +/// "primary" threads and process the work queue. When a task being run on a +/// primary thread enters a blocking context, the responsibility of processing +/// the work queue must be handed off to another thread. This is done by first +/// checking for idle threads on the backup stack. If one is found, the worker +/// token (`WorkerId`) is handed off to that running thread. If none are found, +/// a new thread is spawned. +/// +/// This state manages the exchange. A thread that is idle, not assigned to a +/// work queue, sits around for a specified amount of time. When the worker +/// token is handed off, it is first stored in `handoff`. The backup thread is +/// then signaled. At this point, the backup thread wakes up from sleep and +/// reads `handoff`. At that point, it has been promoted to a primary thread and +/// will begin processing inbound work on the work queue. +/// +/// The name `Backup` isn't really great for what the type does, but I have not +/// come up with a better name... Maybe it should just be named `Thread`. +#[derive(Debug)] +pub(crate) struct Backup { + /// Worker ID that is being handed to this thread. + handoff: UnsafeCell>, + + /// Thread state. + /// + /// This tracks: + /// + /// * Is queued flag + /// * If the pool is shutting down. + /// * If the thread is running + state: AtomicUsize, + + /// Next entry in the treiber stack. + next_sleeper: UnsafeCell, + + /// Used to put the thread to sleep + park: DefaultPark, +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub(crate) struct BackupId(pub(crate) usize); + +#[derive(Debug)] +pub(crate) enum Handoff { + Worker(WorkerId), + Idle, + Terminated, +} + +/// Tracks thread state. +#[derive(Clone, Copy, Eq, PartialEq)] +struct State(usize); + +/// Set when the worker is pushed onto the scheduler's stack of sleeping +/// threads. +/// +/// This flag also serves as a "notification" bit. If another thread is +/// attempting to hand off a worker to the backup thread, then the pushed bit +/// will not be set when the thread tries to shutdown. +pub const PUSHED: usize = 0b001; + +/// Set when the thread is running +pub const RUNNING: usize = 0b010; + +/// Set when the thread pool has terminated +pub const TERMINATED: usize = 0b100; + +// ===== impl Backup ===== + +impl Backup { + pub fn new() -> Backup { + Backup { + handoff: UnsafeCell::new(None), + state: AtomicUsize::new(State::new().into()), + next_sleeper: UnsafeCell::new(BackupId(0)), + park: DefaultPark::new(), + } + } + + /// Called when the thread is starting + pub fn start(&self, worker_id: &WorkerId) { + debug_assert!({ + let state: State = self.state.load(Relaxed).into(); + + debug_assert!(!state.is_pushed()); + debug_assert!(state.is_running()); + debug_assert!(!state.is_terminated()); + + true + }); + + // The handoff value is equal to `worker_id` + debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); + + unsafe { *self.handoff.get() = None; } + } + + pub fn is_running(&self) -> bool { + let state: State = self.state.load(Relaxed).into(); + state.is_running() + } + + /// Hands off the worker to a thread. + /// + /// Returns `true` if the thread needs to be spawned. + pub fn worker_handoff(&self, worker_id: WorkerId) -> bool { + unsafe { + // The backup worker should not already have been handoff a worker. + debug_assert!((*self.handoff.get()).is_none()); + + // Set the handoff + *self.handoff.get() = Some(worker_id); + } + + // This *probably* can just be `Release`... memory orderings, how do + // they work? + let prev = State::worker_handoff(&self.state); + debug_assert!(prev.is_pushed()); + + if prev.is_running() { + // Wakeup the backup thread + self.park.notify(); + false + } else { + true + } + } + + /// Terminate the worker + pub fn signal_stop(&self) { + let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into(); + + debug_assert!(!prev.is_terminated()); + debug_assert!(prev.is_pushed()); + + if prev.is_running() { + self.park.notify(); + } + } + + /// Release the worker + pub fn release(&self) { + let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into(); + + debug_assert!(prev.is_running()); + } + + /// Wait for a worker handoff + pub fn wait_for_handoff(&self, sleep: bool) -> Handoff { + let mut state: State = self.state.load(Acquire).into(); + + // Run in a loop since there can be spurious wakeups + loop { + if !state.is_pushed() { + if state.is_terminated() { + return Handoff::Terminated; + } + + let worker_id = unsafe { + (*self.handoff.get()).take() + .expect("no worker handoff") + }; + + return Handoff::Worker(worker_id); + } + + if sleep { + // TODO: Park with a timeout + self.park.park_sync(None); + + // Reload the state + state = self.state.load(Acquire).into(); + debug_assert!(state.is_running()); + } else { + debug_assert!(state.is_running()); + + // Transition out of running + let mut next = state; + next.unset_running(); + + let actual = self.state.compare_and_swap( + state.into(), + next.into(), + AcqRel).into(); + + if actual == state { + debug_assert!(!next.is_running()); + + return Handoff::Idle; + } + + state = actual; + } + } + } + + pub fn is_pushed(&self) -> bool { + let state: State = self.state.load(Relaxed).into(); + state.is_pushed() + } + + pub fn set_pushed(&self, ordering: Ordering) { + let prev: State = self.state.fetch_or(PUSHED, ordering).into(); + debug_assert!(!prev.is_pushed()); + } + + #[inline] + pub fn next_sleeper(&self) -> BackupId { + unsafe { *self.next_sleeper.get() } + } + + #[inline] + pub fn set_next_sleeper(&self, val: BackupId) { + unsafe { *self.next_sleeper.get() = val; } + } +} + +// ===== impl State ===== + +impl State { + /// Returns a new, default, thread `State` + pub fn new() -> State { + State(0) + } + + /// Returns true if the thread entry is pushed in the sleeper stack + pub fn is_pushed(&self) -> bool { + self.0 & PUSHED == PUSHED + } + + pub fn set_pushed(&mut self) { + self.0 |= PUSHED; + } + + fn unset_pushed(&mut self) { + self.0 &= !PUSHED; + } + + pub fn is_running(&self) -> bool { + self.0 & RUNNING == RUNNING + } + + pub fn set_running(&mut self) { + self.0 |= RUNNING; + } + + pub fn unset_running(&mut self) { + self.0 &= !RUNNING; + } + + pub fn is_terminated(&self) -> bool { + self.0 & TERMINATED == TERMINATED + } + + fn worker_handoff(state: &AtomicUsize) -> State { + let mut curr: State = state.load(Acquire).into(); + + loop { + let mut next = curr; + next.set_running(); + next.unset_pushed(); + + let actual = state.compare_and_swap( + curr.into(), next.into(), AcqRel).into(); + + if actual == curr { + return curr; + } + + curr = actual; + } + } +} + +impl From for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("backup::State") + .field("is_pushed", &self.is_pushed()) + .field("is_running", &self.is_running()) + .field("is_terminated", &self.is_terminated()) + .finish() + } +} diff --git a/tokio-threadpool/src/pool/backup_stack.rs b/tokio-threadpool/src/pool/backup_stack.rs new file mode 100644 index 00000000000..b2a54d64901 --- /dev/null +++ b/tokio-threadpool/src/pool/backup_stack.rs @@ -0,0 +1,185 @@ +use pool::{Backup, BackupId}; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, AcqRel}; + +#[derive(Debug)] +pub(crate) struct BackupStack { + state: AtomicUsize, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +struct State(usize); + +pub(crate) const MAX_BACKUP: usize = 1 << 15; + +/// Extracts the head of the backup stack from the state +const STACK_MASK: usize = ((1 << 16) - 1); + +/// Used to mark the stack as empty +pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP); + +/// Used to mark the stack as terminated +pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1); + +/// How many bits the treiber ABA guard is offset by +const ABA_GUARD_SHIFT: usize = 16; + +#[cfg(target_pointer_width = "64")] +const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; + +#[cfg(target_pointer_width = "32")] +const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; + +// ===== impl BackupStack ===== + +impl BackupStack { + pub fn new() -> BackupStack { + let state = AtomicUsize::new(State::new().into()); + BackupStack { state } + } + + /// Push a backup thread onto the stack + /// + /// # Return + /// + /// Returns `Ok` on success. + /// + /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. + /// Whene terminated, pushing new entries is no longer permitted. + pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> { + let mut state: State = self.state.load(Acquire).into(); + + entries[id.0].set_pushed(AcqRel); + + loop { + let mut next = state; + + let head = state.head(); + + if head == TERMINATED { + // The pool is terminated, cannot push the sleeper. + return Err(()); + } + + entries[id.0].set_next_sleeper(head); + next.set_head(id); + + let actual = self.state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if state == actual { + return Ok(()); + } + + state = actual; + } + } + + /// Pop a backup thread off the stack. + /// + /// If `terminate` is set and the stack is empty when this function is + /// called, the state of the stack is transitioned to "terminated". At this + /// point, no further entries can be pushed onto the stack. + /// + /// # Return + /// + /// * Returns the index of the popped worker and the worker's observed + /// state. + /// + /// * `Ok(None)` if the stack is empty. + /// * `Err(_)` is returned if the pool has been shutdown. + pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result, ()> { + // Figure out the empty value + let terminal = match terminate { + true => TERMINATED, + false => EMPTY, + }; + + let mut state: State = self.state.load(Acquire).into(); + + loop { + let head = state.head(); + + if head == EMPTY { + let mut next = state; + next.set_head(terminal); + + if next == state { + debug_assert!(terminal == EMPTY); + return Ok(None); + } + + let actual = self.state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if actual != state { + state = actual; + continue; + } + + return Ok(None); + } else if head == TERMINATED { + return Err(()); + } + + debug_assert!(head.0 < MAX_BACKUP); + + let mut next = state; + + let next_head = entries[head.0].next_sleeper(); + + // TERMINATED can never be set as the "next pointer" on a worker. + debug_assert!(next_head != TERMINATED); + + if next_head == EMPTY { + next.set_head(terminal); + } else { + next.set_head(next_head); + } + + let actual = self.state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if actual == state { + debug_assert!(entries[head.0].is_pushed()); + return Ok(Some(head)); + } + + state = actual; + } + } +} + +// ===== impl State ===== + +impl State { + fn new() -> State { + State(EMPTY.0) + } + + fn head(&self) -> BackupId { + BackupId(self.0 & STACK_MASK) + } + + fn set_head(&mut self, val: BackupId) { + let val = val.0; + + // The ABA guard protects against the ABA problem w/ treiber stacks + let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; + + self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; + } +} + +impl From for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From for usize { + fn from(src: State) -> Self { + src.0 + } +} diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index 91305c2bc46..2d9c152c341 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -1,24 +1,31 @@ +mod backup; +mod backup_stack; mod state; -mod stack; +pub(crate) use self::backup::{Backup, BackupId}; +pub(crate) use self::backup_stack::MAX_BACKUP; pub(crate) use self::state::{ State, Lifecycle, MAX_FUTURES, }; -use self::stack::SleepStack; + +use self::backup::Handoff; +use self::backup_stack::BackupStack; use config::Config; use shutdown_task::ShutdownTask; -use task::Task; +use task::{Task, Blocking}; use worker::{self, Worker, WorkerId}; +use futures::Poll; use futures::task::AtomicTask; use std::cell::UnsafeCell; use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::thread; use rand::{Rng, SeedableRng, XorShiftRng}; @@ -29,9 +36,9 @@ pub(crate) struct Pool { pub state: AtomicUsize, // Stack tracking sleeping workers. - sleep_stack: SleepStack, + sleep_stack: worker::Stack, - // Number of workers who haven't reached the final state of shutdown + // Number of workers that haven't reached the final state of shutdown // // This is only used to know when to single `shutdown_task` once the // shutdown process has completed. @@ -40,11 +47,28 @@ pub(crate) struct Pool { // Used to generate a thread local RNG seed pub next_thread_id: AtomicUsize, - // Storage for workers + // Worker state + // + // A worker is a thread that is processing the work queue and polling + // futures. // - // This will *usually* be a small number + // This will *usually* be a small number. pub workers: Box<[worker::Entry]>, + // Backup thread state + // + // In order to efficiently support `blocking`, a pool of backup threads is + // needed. These backup threads are ready to take over a worker if the + // future being processed requires blocking. + backup: Box<[Backup]>, + + // Stack of sleeping backup threads + pub backup_stack: BackupStack, + + // State regarding coordinating blocking sections and tracking tasks that + // are pending blocking capacity. + blocking: Blocking, + // Task notified when the worker shuts down pub shutdown_task: ShutdownTask, @@ -52,17 +76,41 @@ pub(crate) struct Pool { pub config: Config, } +const TERMINATED: usize = 1; + impl Pool { /// Create a new `Pool` - pub fn new(workers: Box<[worker::Entry]>, config: Config) -> Pool { + pub fn new(workers: Box<[worker::Entry]>, max_blocking: usize, config: Config) -> Pool { let pool_size = workers.len(); + let total_size = max_blocking + pool_size; + + // Create the set of backup entries + // + // This is `backup + pool_size` because the core thread pool running the + // workers is spawned from backup as well. + let backup = (0..total_size).map(|_| { + Backup::new() + }).collect::>().into_boxed_slice(); + + let backup_stack = BackupStack::new(); + + for i in (0..backup.len()).rev() { + backup_stack.push(&backup, BackupId(i)) + .unwrap(); + } + + // Initialize the blocking state + let blocking = Blocking::new(max_blocking); let ret = Pool { state: AtomicUsize::new(State::new().into()), - sleep_stack: SleepStack::new(), - num_workers: AtomicUsize::new(pool_size), + sleep_stack: worker::Stack::new(), + num_workers: AtomicUsize::new(0), next_thread_id: AtomicUsize::new(0), workers, + backup, + backup_stack, + blocking, shutdown_task: ShutdownTask { task1: AtomicTask::new(), #[cfg(feature = "unstable-futures")] @@ -141,6 +189,10 @@ impl Pool { self.terminate_sleeping_workers(); } + pub fn is_shutdown(&self) -> bool { + self.num_workers.load(Acquire) == TERMINATED + } + /// Called by `Worker` as it tries to enter a sleeping state. Before it /// sleeps, it must push itself onto the sleep stack. This enables other /// threads to see it when signaling work. @@ -151,32 +203,67 @@ impl Pool { pub fn terminate_sleeping_workers(&self) { use worker::Lifecycle::Signaled; + // First, set the TERMINATED flag on `num_workers`. This signals that + // whichever thread transitions the count to zero must notify the + // shutdown task. + let prev = self.num_workers.fetch_or(TERMINATED, AcqRel); + let notify = prev == 0; + trace!(" -> shutting down workers"); // Wakeup all sleeping workers. They will wake up, see the state // transition, and terminate. while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) { - trace!(" -> shutdown worker; idx={:?}; state={:?}", idx, worker_state); + self.workers[idx].signal_stop(worker_state); + } - if self.workers[idx].signal_stop(worker_state).is_err() { - // The worker is already in the shutdown state, immediately - // track that it has terminated as the worker will never work - // again. - self.worker_terminated(); - } + // Now terminate any backup threads + // + // The call to `pop` must be successful because shutting down the pool + // is coordinated and at this point, this is the only thread that will + // attempt to transition the backup stack to "terminated". + while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) { + self.backup[backup_id.0].signal_stop(); + } + + if notify { + self.shutdown_task.notify(); } } - pub fn worker_terminated(&self) { - let prev = self.num_workers.fetch_sub(1, AcqRel); + /// Track that a worker thread has started + /// + /// If `Err` is returned, then the thread is not permitted to started. + fn thread_started(&self) -> Result<(), ()> { + let mut curr = self.num_workers.load(Acquire); + + loop { + if curr & TERMINATED == TERMINATED { + return Err(()); + } + + let actual = self.num_workers.compare_and_swap( + curr, curr + 2, AcqRel); + + if curr == actual { + return Ok(()); + } - trace!("worker_terminated; num_workers={}", prev - 1); + curr = actual; + } + } - if 1 == prev { - trace!("notifying shutdown task"); + fn thread_stopped(&self) { + let prev = self.num_workers.fetch_sub(2, AcqRel); + + if prev == TERMINATED | 2 { self.shutdown_task.notify(); } } + pub fn poll_blocking_capacity(&self, task: &Arc) -> Poll<(), ::BlockingError> { + self.blocking.poll_blocking_capacity(task) + } + /// Submit a task to the scheduler. /// /// Called from either inside or outside of the scheduler. If currently on @@ -184,15 +271,19 @@ impl Pool { pub fn submit(&self, task: Arc, inner: &Arc) { Worker::with_current(|worker| { match worker { - Some(worker) => { - let idx = worker.id.idx; + // If the worker is in blocking mode, then even though the + // thread-local variable is set, the current thread does not + // have ownership of that worker entry. This is because the + // worker entry has already been handed off to another thread. + Some(worker) if !worker.is_blocking() => { + let idx = worker.id.0; trace!(" -> submit internal; idx={}", idx); worker.inner.workers[idx].submit_internal(task); worker.inner.signal_work(inner); } - None => { + _ => { self.submit_external(task, inner); } } @@ -203,7 +294,7 @@ impl Pool { /// /// Called from outside of the scheduler, this function is how new tasks /// enter the system. - fn submit_external(&self, task: Arc, inner: &Arc) { + pub fn submit_external(&self, task: Arc, inner: &Arc) { use worker::Lifecycle::Notified; // First try to get a handle to a sleeping worker. This ensures that @@ -234,12 +325,130 @@ impl Pool { let entry = &self.workers[idx]; if !entry.submit_external(task, state) { - self.spawn_worker(idx, inner); + self.spawn_thread(WorkerId::new(idx), inner); } } - fn spawn_worker(&self, idx: usize, inner: &Arc) { - Worker::spawn(WorkerId::new(idx), inner); + pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> { + // First update the state, this cannot fail because the caller must have + // exclusive access to the backup token. + self.backup[backup_id.0].release(); + + // Push the backup entry back on the stack + self.backup_stack.push(&self.backup, backup_id) + } + + pub fn notify_blocking_task(&self, pool: &Arc) { + self.blocking.notify_task(&pool); + } + + /// Provision a thread to run a worker + pub fn spawn_thread(&self, id: WorkerId, inner: &Arc) { + let backup_id = match self.backup_stack.pop(&self.backup, false) { + Ok(Some(backup_id)) => backup_id, + Ok(None) => panic!("no thread available"), + Err(_) => { + debug!("failed to spawn worker thread due to the thread pool shutting down"); + return; + } + }; + + let need_spawn = self.backup[backup_id.0] + .worker_handoff(id.clone()); + + if !need_spawn { + return; + } + + if self.thread_started().is_err() { + // The pool is shutting down. + return; + } + + let mut th = thread::Builder::new(); + + if let Some(ref prefix) = inner.config.name_prefix { + th = th.name(format!("{}{}", prefix, backup_id.0)); + } + + if let Some(stack) = inner.config.stack_size { + th = th.stack_size(stack); + } + + let inner = inner.clone(); + + let res = th.spawn(move || { + if let Some(ref f) = inner.config.after_start { + f(); + } + + let mut worker_id = id; + + inner.backup[backup_id.0].start(&worker_id); + + loop { + // The backup token should be in the running state. + debug_assert!(inner.backup[backup_id.0].is_running()); + + // TODO: Avoid always cloning + let worker = Worker::new(worker_id, backup_id, inner.clone()); + + // Run the worker. If the worker transitioned to a "blocking" + // state, then `is_blocking` will be true. + if !worker.do_run() { + // The worker shutdown, so exit the thread. + break; + } + + // Push the thread back onto the backup stack. This makes it + // available for future handoffs. + // + // This **must** happen before notifying the task. + let res = inner.backup_stack + .push(&inner.backup, backup_id); + + if res.is_err() { + // The pool is being shutdown. + break; + } + + // The task switched the current thread to blocking mode. + // Now that the blocking task completed, any tasks + inner.notify_blocking_task(&inner); + + debug_assert!(inner.backup[backup_id.0].is_running()); + + // Wait for a handoff + let handoff = inner.backup[backup_id.0] + .wait_for_handoff(true); + + match handoff { + Handoff::Worker(id) => { + debug_assert!(inner.backup[backup_id.0].is_running()); + worker_id = id; + } + Handoff::Idle => { + // Worker is idle + break; + } + Handoff::Terminated => { + // TODO: When wait_for_handoff supports blocking with a + // timeout, this will have to be smarter + break; + } + } + } + + if let Some(ref f) = inner.config.before_stop { + f(); + } + + inner.thread_stopped(); + }); + + if let Err(e) = res { + warn!("failed to spawn worker thread; err={:?}", e); + } } /// If there are any other workers currently relaxing, signal them that work @@ -277,7 +486,7 @@ impl Pool { } Shutdown => { trace!("signal_work -- spawn; idx={}", idx); - Worker::spawn(WorkerId::new(idx), inner); + self.spawn_thread(WorkerId(idx), inner); } Running | Notified | Signaled => { // The workers are already active. No need to wake them up. @@ -286,7 +495,6 @@ impl Pool { } } - /// Generates a random number /// /// Uses a thread-local seeded XorShift. diff --git a/tokio-threadpool/src/shutdown.rs b/tokio-threadpool/src/shutdown.rs index 416cf1248b4..29f2a342f30 100644 --- a/tokio-threadpool/src/shutdown.rs +++ b/tokio-threadpool/src/shutdown.rs @@ -1,8 +1,6 @@ use pool::Pool; use sender::Sender; -use std::sync::atomic::Ordering::{Acquire}; - use futures::{Future, Poll, Async}; #[cfg(feature = "unstable-futures")] use futures2; @@ -35,11 +33,10 @@ impl Future for Shutdown { fn poll(&mut self) -> Poll<(), ()> { use futures::task; - trace!("Shutdown::poll"); self.inner().shutdown_task.task1.register_task(task::current()); - if 0 != self.inner().num_workers.load(Acquire) { + if !self.inner().is_shutdown() { return Ok(Async::NotReady); } diff --git a/tokio-threadpool/src/task/blocking.rs b/tokio-threadpool/src/task/blocking.rs new file mode 100644 index 00000000000..ff32b5ab3de --- /dev/null +++ b/tokio-threadpool/src/task/blocking.rs @@ -0,0 +1,499 @@ +use pool::Pool; +use task::{Task, BlockingState}; + +use futures::{Poll, Async}; + +use std::cell::UnsafeCell; +use std::fmt; +use std::ptr; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, Release, AcqRel, Relaxed}; +use std::thread; + +/// Manages the state around entering a blocking section and tasks that are +/// queued pending the ability to block. +/// +/// This is a hybrid counter and instrusive mpsc channel (like `Queue`). +#[derive(Debug)] +pub(crate) struct Blocking { + /// Queue head. + /// + /// This is either the current remaining capacity for blocking sections + /// **or** if the max has been reached, the head of a pending blocking + /// capacity channel of tasks. + /// + /// When this points to a task, it represents a strong reference, i.e. + /// `Arc`. + state: AtomicUsize, + + /// Tail pointer. This is `Arc` unless it points to `stub`. + tail: UnsafeCell<*mut Task>, + + /// Stub pointer, used as part of the intrusive mpsc channel algorithm + /// described by 1024cores. + stub: Box, + + /// The channel algorithm is MPSC. This means that, in order to pop tasks, + /// coordination is required. + /// + /// Since it doesn't matter *which* task pops & notifies the queued task, we + /// can avoid a full mutex and make the "lock" lock free. + /// + /// Instead, threads race to set the "entered" bit. When the transition is + /// successfully made, the thread has permission to pop tasks off of the + /// queue. If a thread loses the race, instead of waiting to pop a task, it + /// signals to the winning thread that it should pop an additional task. + lock: AtomicUsize, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum CanBlock { + /// Blocking capacity has been allocated to this task. + /// + /// The capacity allocation is initially checked before a task is polled. If + /// capacity has been allocated, it is consumed and tracked as `Allocated`. + Allocated, + + /// Allocation capacity must be either available to the task when it is + /// polled or not available. This means that a task can only ask for + /// capacity once. This state is used to track a task that has not yet asked + /// for blocking capacity. When a task needs blocking capacity, if it is in + /// this state, it can immediately try to get an allocation. + CanRequest, + + /// The task has requested blocking capacity, but none is available. + NoCapacity, +} + +/// Decorates the `usize` value of `Blocking::state`, providing fns to +/// manipulate the state instead of requiring bit ops. +#[derive(Copy, Clone, Eq, PartialEq)] +struct State(usize); + +/// Flag differentiating between remaining capacity and task pointers. +/// +/// If we assume pointers are properly aligned, then the least significant bit +/// will always be zero. So, we use that bit to track if the value represents a +/// number. +const NUM_FLAG: usize = 1; + +/// When representing "numbers", the state has to be shifted this much (to get +/// rid of the flag bit). +const NUM_SHIFT: usize = 1; + +// ====== impl Blocking ===== +// +impl Blocking { + /// Create a new `Blocking`. + pub fn new(capacity: usize) -> Blocking { + assert!(capacity > 0, "blocking capacity must be greater than zero"); + + let stub = Box::new(Task::stub()); + let ptr = &*stub as *const _ as *mut _; + + // Allocations are aligned + debug_assert!(ptr as usize & NUM_FLAG == 0); + + // The initial state value. This starts at the max capacity. + let init = State::new(capacity); + + Blocking { + state: AtomicUsize::new(init.into()), + tail: UnsafeCell::new(ptr), + stub: stub, + lock: AtomicUsize::new(0), + } + } + + /// Atomically either acquire blocking capacity or queue the task to be + /// notified once capacity becomes available. + /// + /// The caller must ensure that `task` has not previously been queued to be + /// notified when capacity becomes available. + pub fn poll_blocking_capacity(&self, task: &Arc) -> Poll<(), ::BlockingError> { + // This requires atomically claiming blocking capacity and if none is + // available, queuing &task. + + // The task cannot be queued at this point. The caller must ensure this. + debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued()); + + // Don't bump the ref count unless necessary. + let mut strong: Option<*const Task> = None; + + // Load the state + let mut curr: State = self.state.load(Acquire).into(); + + loop { + let mut next = curr; + + if !next.claim_capacity(&self.stub) { + debug_assert!(curr.ptr().is_some()); + + // Unable to claim capacity, so we must queue `task` onto the + // channel. + // + // This guard also serves to ensure that queuing work that is + // only needed to run once only gets run once. + if strong.is_none() { + // First, transition the task to a "queued" state. This + // prevents double queuing. + // + // This is also the only thread that can set the queued flag + // at this point. And, the goal is for this to only be + // visible when the task node is polled from the channel. + // The memory ordering is established by MPSC queue + // operation. + // + // Note that, if the task doesn't get queued (because the + // CAS fails and capacity is now available) then this flag + // must be unset. Again, there is no race because until the + // task is queued, no other thread can see it. + let prev = BlockingState::toggle_queued(&task.blocking, Relaxed); + debug_assert!(!prev.is_queued()); + + // Bump the ref count + strong = Some(Arc::into_raw(task.clone())); + + // Set the next pointer. This does not require an atomic + // operation as this node is not currently accessible to + // other threads via the queue. + task.next_blocking.store(ptr::null_mut(), Relaxed); + } + + let ptr = strong.unwrap(); + + // Update the head to point to the new node. We need to see the + // previous node in order to update the next pointer as well as + // release `task` to any other threads calling `push`. + next.set_ptr(ptr); + } + + debug_assert_ne!(curr.0, 0); + debug_assert_ne!(next.0, 0); + + let actual = self.state.compare_and_swap( + curr.into(), + next.into(), + AcqRel).into(); + + if curr == actual { + break; + } + + curr = actual; + } + + match curr.ptr() { + Some(prev) => { + let ptr = strong.unwrap(); + + // Finish pushing + unsafe { + (*prev).next_blocking + .store(ptr as *mut _, Release); + } + + // The node was queued to be notified once capacity is made + // available. + Ok(Async::NotReady) + } + None => { + debug_assert!(curr.remaining_capacity() > 0); + + // If `strong` is set, gotta undo a bunch of work + if let Some(ptr) = strong { + let _ = unsafe { Arc::from_raw(ptr) }; + + // Unset the queued flag. + let prev = BlockingState::toggle_queued(&task.blocking, Relaxed); + debug_assert!(prev.is_queued()); + } + + // Capacity has been obtained + Ok(().into()) + } + } + } + + unsafe fn push_stub(&self) { + let task: *mut Task = &*self.stub as *const _ as *mut _; + + // Set the next pointer. This does not require an atomic operation as + // this node is not accessible. The write will be flushed with the next + // operation + (*task).next_blocking.store(ptr::null_mut(), Relaxed); + + // Update the head to point to the new node. We need to see the previous + // node in order to update the next pointer as well as release `task` + // to any other threads calling `push`. + let prev = self.state.swap(task as usize, AcqRel); + + // The stub is only pushed when there are pending tasks. Because of + // this, the state must *always* be in pointer mode. + debug_assert!(State::from(prev).is_ptr()); + + let prev = prev as *const Task; + + // We don't want the *existing* pointer to be a stub. + debug_assert_ne!(prev, task); + + // Release `task` to the consume end. + (*prev).next_blocking.store(task, Release); + } + + pub fn notify_task(&self, pool: &Arc) { + let prev = self.lock.fetch_add(1, AcqRel); + + + if prev != 0 { + // Another thread has the lock and will be responsible for notifying + // pending tasks. + return; + } + + let mut dec = 1; + + loop { + let mut remaining_pops = dec; + while remaining_pops > 0 { + remaining_pops -= 1; + + let task = match self.pop(remaining_pops) { + Some(t) => t, + None => break, + }; + + Task::notify_blocking(task, pool); + } + + // Decrement the number of handled notifications + let actual = self.lock.fetch_sub(dec, AcqRel); + + if actual == dec { + break; + } + + // This can only be greater than expected as we are the only thread + // that is decrementing. + debug_assert!(actual > dec); + dec = actual - dec; + } + } + + /// Pop a task + /// + /// `rem` represents the remaining number of times the caller will pop. If + /// there are no more tasks to pop, `rem` is used to set the remaining + /// capacity. + fn pop(&self, rem: usize) -> Option> { + 'outer: + loop { + unsafe { + let mut tail = *self.tail.get(); + let mut next = (*tail).next_blocking.load(Acquire); + + let stub = &*self.stub as *const _ as *mut _; + + if tail == stub { + if next.is_null() { + // This loop is not part of the standard intrusive mpsc + // channel algorithm. This is where we atomically pop + // the last task and add `rem` to the remaining capacity. + // + // This modification to the pop algorithm works because, + // at this point, we have not done any work (only done + // reading). We have a *pretty* good idea that there is + // no concurrent pusher. + // + // The capacity is then atomically added by doing an + // AcqRel CAS on `state`. The `state` cell is the + // linchpin of the algorithm. + // + // By successfully CASing `head` w/ AcqRel, we ensure + // that, if any thread was racing and entered a push, we + // see that and abort pop, retrying as it is + // "inconsistent". + let mut curr: State = self.state.load(Acquire).into(); + + loop { + if curr.has_task(&self.stub) { + // Inconsistent state, yield the thread and try + // again. + thread::yield_now(); + continue 'outer; + } + + let mut after = curr; + + // +1 here because `rem` represents the number of + // pops that will come after the current one. + after.add_capacity(rem + 1, &self.stub); + + let actual: State = self.state.compare_and_swap( + curr.into(), + after.into(), + AcqRel).into(); + + if actual == curr { + // Successfully returned the remaining capacity + return None; + } + + curr = actual; + } + } + + *self.tail.get() = next; + tail = next; + next = (*next).next_blocking.load(Acquire); + } + + if !next.is_null() { + *self.tail.get() = next; + + // No ref_count inc is necessary here as this poll is paired + // with a `push` which "forgets" the handle. + return Some(Arc::from_raw(tail)); + } + + let state = self.state.load(Acquire); + + // This must always be a pointer + debug_assert!(State::from(state).is_ptr()); + + if state != tail as usize { + // Try aain + thread::yield_now(); + continue 'outer; + } + + self.push_stub(); + + next = (*tail).next_blocking.load(Acquire); + + if !next.is_null() { + *self.tail.get() = next; + + return Some(Arc::from_raw(tail)); + } + + thread::yield_now(); + // Try again + } + } + } +} + +// ====== impl State ===== + +impl State { + /// Return a new `State` representing the remaining capacity at the maximum + /// value. + fn new(capacity: usize) -> State { + State((capacity << NUM_SHIFT) | NUM_FLAG) + } + + fn remaining_capacity(&self) -> usize { + if !self.has_remaining_capacity() { + return 0; + } + + self.0 >> 1 + } + + fn has_remaining_capacity(&self) -> bool { + self.0 & NUM_FLAG == NUM_FLAG + } + + fn has_task(&self, stub: &Task) -> bool { + !(self.has_remaining_capacity() || self.is_stub(stub)) + } + + fn is_stub(&self, stub: &Task) -> bool { + self.0 == stub as *const _ as usize + } + + /// Try to claim blocking capacity. + /// + /// # Return + /// + /// Returns `true` if the capacity was claimed, `false` otherwise. If + /// `false` is returned, it can be assumed that `State` represents the head + /// pointer in the mpsc channel. + fn claim_capacity(&mut self, stub: &Task) -> bool { + if !self.has_remaining_capacity() { + return false; + } + + debug_assert!(self.0 != 1); + + self.0 -= 1 << NUM_SHIFT; + + if self.0 == NUM_FLAG { + // Set the state to the stub pointer. + self.0 = stub as *const _ as usize; + } + + true + } + + /// Add blockin capacity. + fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool { + debug_assert!(capacity > 0); + + if self.is_stub(stub) { + self.0 = (capacity << NUM_SHIFT) | NUM_FLAG; + true + } else if self.has_remaining_capacity() { + self.0 += capacity << NUM_SHIFT; + true + } else { + false + } + } + + fn is_ptr(&self) -> bool { + self.0 & NUM_FLAG == 0 + } + + fn ptr(&self) -> Option<*const Task> { + if self.is_ptr() { + Some(self.0 as *const Task) + } else { + None + } + } + + fn set_ptr(&mut self, ptr: *const Task) { + let ptr = ptr as usize; + debug_assert!(ptr & NUM_FLAG == 0); + self.0 = ptr + } +} + +impl From for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let mut fmt = fmt.debug_struct("State"); + + if self.is_ptr() { + fmt.field("ptr", &self.0); + } else { + fmt.field("remaining", &self.remaining_capacity()); + } + + fmt.finish() + } +} diff --git a/tokio-threadpool/src/task/blocking_state.rs b/tokio-threadpool/src/task/blocking_state.rs new file mode 100644 index 00000000000..b41fc4868de --- /dev/null +++ b/tokio-threadpool/src/task/blocking_state.rs @@ -0,0 +1,89 @@ +use task::CanBlock; + +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// State tracking task level state to support `blocking`. +/// +/// This tracks two separate flags. +/// +/// a) If the task is queued in the pending blocking channel. This prevents +/// double queuing (which would break the linked list). +/// +/// b) If the task has been allocated capacity to block. +#[derive(Eq, PartialEq)] +pub(crate) struct BlockingState(usize); + +const QUEUED: usize = 0b01; +const ALLOCATED: usize = 0b10; + +impl BlockingState { + /// Create a new, default, `BlockingState`. + pub fn new() -> BlockingState { + BlockingState(0) + } + + /// Returns `true` if the state represents the associated task being queued + /// in the pending blocking capacity channel + pub fn is_queued(&self) -> bool { + self.0 & QUEUED == QUEUED + } + + /// Toggle the queued flag + /// + /// Returns the state before the flag has been toggled. + pub fn toggle_queued(state: &AtomicUsize, ordering: Ordering) -> BlockingState { + state.fetch_xor(QUEUED, ordering).into() + } + + /// Returns `true` if the state represents the associated task having been + /// allocated capacity to block. + pub fn is_allocated(&self) -> bool { + self.0 & ALLOCATED == ALLOCATED + } + + /// Atomically consume the capacity allocation and return if the allocation + /// was present. + /// + /// If this returns `true`, then the task has the ability to block for the + /// duration of the `poll`. + pub fn consume_allocation(state: &AtomicUsize, ordering: Ordering) -> CanBlock { + let state: Self = state.fetch_and(!ALLOCATED, ordering).into(); + + if state.is_allocated() { + CanBlock::Allocated + } else if state.is_queued() { + CanBlock::NoCapacity + } else { + CanBlock::CanRequest + } + } + + pub fn notify_blocking(state: &AtomicUsize, ordering: Ordering) { + let prev: Self = state.fetch_xor(ALLOCATED | QUEUED, ordering).into(); + + debug_assert!(prev.is_queued()); + debug_assert!(!prev.is_allocated()); + } +} + +impl From for BlockingState { + fn from(src: usize) -> BlockingState { + BlockingState(src) + } +} + +impl From for usize { + fn from(src: BlockingState) -> usize { + src.0 + } +} + +impl fmt::Debug for BlockingState { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BlockingState") + .field("is_queued", &self.is_queued()) + .field("is_allocated", &self.is_allocated()) + .finish() + } +} diff --git a/tokio-threadpool/src/task/mod.rs b/tokio-threadpool/src/task/mod.rs index 3361789a160..84309ffd67b 100644 --- a/tokio-threadpool/src/task/mod.rs +++ b/tokio-threadpool/src/task/mod.rs @@ -1,10 +1,15 @@ +mod blocking; +mod blocking_state; mod queue; mod state; +pub(crate) use self::blocking::{Blocking, CanBlock}; pub(crate) use self::queue::{Queue, Poll}; +use self::blocking_state::BlockingState; use self::state::State; use notifier::Notifier; +use pool::Pool; use sender::Sender; use futures::{self, Future, Async}; @@ -24,12 +29,18 @@ use futures2; /// This also behaves as a node in the inbound work queue and the blocking /// queue. pub(crate) struct Task { - /// Task state + /// Task lifecycle state state: AtomicUsize, + /// Task blocking related state + blocking: AtomicUsize, + /// Next pointer in the queue that submits tasks to a worker. next: AtomicPtr, + /// Next pointer in the queue of tasks pending blocking capacity. + next_blocking: AtomicPtr, + /// Store the future at the head of the struct /// /// The future is dropped immediately when it transitions to Complete @@ -69,7 +80,9 @@ impl Task { Task { state: AtomicUsize::new(State::new().into()), + blocking: AtomicUsize::new(BlockingState::new().into()), next: AtomicPtr::new(ptr::null_mut()), + next_blocking: AtomicPtr::new(ptr::null_mut()), future: UnsafeCell::new(Some(task_fut)), } } @@ -80,8 +93,10 @@ impl Task { where F: FnOnce(usize) -> futures2::task::Waker { let mut inner = Box::new(Task { - next: AtomicPtr::new(ptr::null_mut()), state: AtomicUsize::new(State::new().into()), + blocking: AtomicUsize::new(BlockingState::new().into()), + next: AtomicPtr::new(ptr::null_mut()), + next_blocking: AtomicPtr::new(ptr::null_mut()), future: None, }); @@ -99,8 +114,10 @@ impl Task { let task_fut = TaskFuture::Futures1(executor::spawn(future)); Task { - next: AtomicPtr::new(ptr::null_mut()), state: AtomicUsize::new(State::stub().into()), + blocking: AtomicUsize::new(BlockingState::new().into()), + next: AtomicPtr::new(ptr::null_mut()), + next_blocking: AtomicPtr::new(ptr::null_mut()), future: UnsafeCell::new(Some(task_fut)), } } @@ -115,8 +132,6 @@ impl Task { let actual: State = self.state.compare_and_swap( Scheduled.into(), Running.into(), AcqRel).into(); - trace!("running; state={:?}", actual); - match actual { Scheduled => {}, _ => panic!("unexpected task state; {:?}", actual), @@ -195,6 +210,19 @@ impl Task { } } + /// Notify the task + pub fn notify(me: Arc, pool: &Arc) { + if me.schedule(){ + let _ = pool.submit(me, pool); + } + } + + /// Notify the task it has been allocated blocking capacity + pub fn notify_blocking(me: Arc, pool: &Arc) { + BlockingState::notify_blocking(&me.blocking, AcqRel); + Task::notify(me, pool); + } + /// Transition the task state to scheduled. /// /// Returns `true` if the caller is permitted to schedule the task. @@ -227,6 +255,15 @@ impl Task { } } + /// Consumes any allocated capacity to block. + /// + /// Returns `true` if capacity was allocated, `false` otherwise. + pub fn consume_blocking_allocation(&self) -> CanBlock { + // This flag is the primary point of coordination. The queued flag + // happens "around" setting the blocking capacity. + BlockingState::consume_allocation(&self.blocking, AcqRel) + } + /// Drop the future /// /// This must only be called by the thread that successfully transitioned diff --git a/tokio-threadpool/src/task/queue.rs b/tokio-threadpool/src/task/queue.rs index d205b6a7c24..d91c564bc61 100644 --- a/tokio-threadpool/src/task/queue.rs +++ b/tokio-threadpool/src/task/queue.rs @@ -13,7 +13,7 @@ pub(crate) struct Queue { /// This is a strong reference to `Task` (i.e, `Arc`) head: AtomicPtr, - /// Tail pointer. This is `Arc`. + /// Tail pointer. This is `Arc` unless it points to `stub`. tail: UnsafeCell<*mut Task>, /// Stub pointer, used as part of the intrusive mpsc channel algorithm diff --git a/tokio-threadpool/src/task/state.rs b/tokio-threadpool/src/task/state.rs index 92a2fc8ec23..9023eec5fbb 100644 --- a/tokio-threadpool/src/task/state.rs +++ b/tokio-threadpool/src/task/state.rs @@ -17,6 +17,8 @@ pub(crate) enum State { Complete = 4, } +// ===== impl State ===== + impl State { /// Returns the initial task state. /// @@ -37,7 +39,7 @@ impl From for State { debug_assert!( src >= Idle as usize && - src <= Complete as usize); + src <= Complete as usize, "actual={}", src); unsafe { ::std::mem::transmute(src) } } diff --git a/tokio-threadpool/src/worker/entry.rs b/tokio-threadpool/src/worker/entry.rs index 005148d446e..99c28962fe6 100644 --- a/tokio-threadpool/src/worker/entry.rs +++ b/tokio-threadpool/src/worker/entry.rs @@ -137,7 +137,7 @@ impl WorkerEntry { /// Returns `Ok` when the worker was successfully signaled. /// /// Returns `Err` if the worker has already terminated. - pub fn signal_stop(&self, mut state: State) -> Result<(), ()> { + pub fn signal_stop(&self, mut state: State) { use worker::Lifecycle::*; // Transition the worker state to signaled @@ -146,7 +146,7 @@ impl WorkerEntry { match state.lifecycle() { Shutdown => { - return Err(()); + return; } Running | Sleeping => {} Notified | Signaled => { @@ -161,7 +161,7 @@ impl WorkerEntry { // b) The shutdown signal is stored as the head of the // sleep, stack which will prevent the worker from going to // sleep again. - return Ok(()); + return; } } @@ -179,8 +179,6 @@ impl WorkerEntry { // Wakeup the worker self.wakeup(); - - Ok(()) } /// Pop a task diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs index 474a42c5ec5..d5a14662939 100644 --- a/tokio-threadpool/src/worker/mod.rs +++ b/tokio-threadpool/src/worker/mod.rs @@ -1,27 +1,30 @@ mod entry; +mod stack; mod state; pub(crate) use self::entry::{ WorkerEntry as Entry, }; +pub(crate) use self::stack::Stack; pub(crate) use self::state::{ State, Lifecycle, }; -use pool::{self, Pool}; +use pool::{self, Pool, BackupId}; use notifier::Notifier; use sender::Sender; -use task::Task; +use task::{self, Task, CanBlock}; use tokio_executor; +use futures::{Poll, Async}; + use std::cell::Cell; use std::marker::PhantomData; use std::rc::Rc; use std::sync::atomic::Ordering::{AcqRel, Acquire}; use std::sync::Arc; -use std::thread; use std::time::{Duration, Instant}; /// Thread worker @@ -36,6 +39,19 @@ pub struct Worker { // WorkerEntry index pub(crate) id: WorkerId, + // Backup thread ID assigned to processing this worker. + backup_id: BackupId, + + // Set to the task that is currently being polled by the worker. This is + // needed so that `blocking` blocks are able to interact with this task. + // + // This has to be a raw pointer to make it compile, but great care is taken + // when this is set. + current_task: CurrentTask, + + // Set when the thread is in blocking mode. + is_blocking: Cell, + // Set when the worker should finalize on drop should_finalize: Cell, @@ -43,65 +59,72 @@ pub struct Worker { _p: PhantomData>, } +/// Tracks the state related to the currently running task. +#[derive(Debug)] +struct CurrentTask { + /// This has to be a raw pointer to make it compile, but great care is taken + /// when this is set. + task: Cell>>, + + /// Tracks the blocking capacity allocation state. + can_block: Cell, +} + /// Identifiers a thread pool worker. /// /// This identifier is unique scoped by the thread pool. It is possible that /// different thread pool instances share worker identifier values. #[derive(Debug, Clone, Hash, Eq, PartialEq)] -pub struct WorkerId { - pub(crate) idx: usize, -} +pub struct WorkerId(pub(crate) usize); // Pointer to the current worker info thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _)); impl Worker { - pub(crate) fn spawn(id: WorkerId, inner: &Arc) { - trace!("spawning new worker thread; id={}", id.idx); - - let mut th = thread::Builder::new(); - - if let Some(ref prefix) = inner.config.name_prefix { - th = th.name(format!("{}{}", prefix, id.idx)); - } - - if let Some(stack) = inner.config.stack_size { - th = th.stack_size(stack); + pub(crate) fn new(id: WorkerId, backup_id: BackupId, inner: Arc) -> Worker { + Worker { + inner, + id, + backup_id, + current_task: CurrentTask::new(), + is_blocking: Cell::new(false), + should_finalize: Cell::new(false), + _p: PhantomData, } + } - let inner = inner.clone(); - - th.spawn(move || { - let worker = Worker { - inner, - id, - should_finalize: Cell::new(false), - _p: PhantomData, - }; - - // Make sure the ref to the worker does not move - let wref = &worker; - - // Create another worker... It's ok, this is just a new type around - // `Pool` that is expected to stay on the current thread. - CURRENT_WORKER.with(|c| { - c.set(wref as *const _); + pub(crate) fn is_blocking(&self) -> bool { + self.is_blocking.get() + } - let inner = wref.inner.clone(); - let mut sender = Sender { inner }; + /// Run the worker + /// + /// Returns `true` if the thread should keep running as a `backup` thread. + pub(crate) fn do_run(&self) -> bool { + // Create another worker... It's ok, this is just a new type around + // `Pool` that is expected to stay on the current thread. + CURRENT_WORKER.with(|c| { + c.set(self as *const _); + + let inner = self.inner.clone(); + let mut sender = Sender { inner }; + + // Enter an execution context + let mut enter = tokio_executor::enter().unwrap(); + + tokio_executor::with_default(&mut sender, &mut enter, |enter| { + if let Some(ref callback) = self.inner.config.around_worker { + callback.call(self, enter); + } else { + self.run(); + } + }); + }); - // Enter an execution context - let mut enter = tokio_executor::enter().unwrap(); + // Can't be in blocking mode and finalization mode + debug_assert!(!self.is_blocking.get() || !self.should_finalize.get()); - tokio_executor::with_default(&mut sender, &mut enter, |enter| { - if let Some(ref callback) = wref.inner.config.around_worker { - callback.call(wref, enter); - } else { - wref.run(); - } - }); - }); - }).unwrap(); + self.is_blocking.get() } pub(crate) fn with_current) -> R, R>(f: F) -> R { @@ -116,6 +139,67 @@ impl Worker { }) } + /// Transition the current worker to a blocking worker + pub(crate) fn transition_to_blocking(&self) -> Poll<(), ::BlockingError> { + use self::CanBlock::*; + + // If we get this far, then `current_task` has been set. + let task_ref = self.current_task.get_ref(); + + // First step is to acquire blocking capacity for the task. + match self.current_task.can_block() { + // Capacity to block has already been allocated to this task. + Allocated => {} + + // The task has already requested capacity to block, but there is + // none yet available. + NoCapacity => return Ok(Async::NotReady), + + // The task has yet to ask for capacity + CanRequest => { + // Atomically attempt to acquire blocking capacity, and if none + // is available, register the task to be notified once capacity + // becomes available. + match self.inner.poll_blocking_capacity(task_ref)? { + Async::Ready(()) => { + self.current_task.set_can_block(Allocated); + } + Async::NotReady => { + self.current_task.set_can_block(NoCapacity); + return Ok(Async::NotReady); + } + } + } + } + + // The task has been allocated blocking capacity. At this point, this is + // when the current thread transitions from a worker to a backup thread. + // To do so requires handing over the worker to another backup thread. + + if self.is_blocking.get() { + // The thread is already in blocking mode, so there is nothing else + // to do. Return `Ready` and allow the caller to block the thread. + return Ok(().into()); + } + + trace!("transition to blocking state"); + + // Transitioning to blocking requires handing over the worker state to + // another thread so that the work queue can continue to be processed. + + self.inner.spawn_thread(self.id.clone(), &self.inner); + + // Track that the thread has now fully entered the blocking state. + self.is_blocking.set(true); + + Ok(().into()) + } + + /// Transition from blocking + pub(crate) fn transition_from_blocking(&self) { + // TODO: Attempt to take ownership of the worker again. + } + /// Returns a reference to the worker's identifier. /// /// This identifier is unique scoped by the thread pool. It is possible that @@ -149,25 +233,19 @@ impl Worker { // Run the next available task if self.try_run_task(¬ify, &mut sender) { - if tick % LIGHT_SLEEP_INTERVAL == 0 { - self.sleep_light(); + if self.is_blocking.get() { + // Exit out of the run state + return; } - tick = tick.wrapping_add(1); - spin_cnt = 0; - - // As long as there is work, keep looping. - continue; - } - - // No work in this worker's queue, it is time to try stealing. - if self.try_steal_task(¬ify, &mut sender) { if tick % LIGHT_SLEEP_INTERVAL == 0 { self.sleep_light(); } tick = tick.wrapping_add(1); spin_cnt = 0; + + // As long as there is work, keep looping. continue; } @@ -190,9 +268,31 @@ impl Worker { // If there still isn't any work to do, shutdown the worker? } + // The pool is terminating. However, transitioning the pool state to + // terminated is the very first step of the finalization process. Other + // threads may not see this state and try to spawn a new thread. To + // ensure consistency, before the current thread shuts down, it must + // return the backup token to the stack. + // + // The returned result is ignored because `Err` represents the pool + // shutting down. We are currently aware of this fact. + let _ = self.inner.release_backup(self.backup_id); + self.should_finalize.set(true); } + /// Try to run a task + /// + /// Returns `true` if work was found. + #[inline] + fn try_run_task(&self, notify: &Arc, sender: &mut Sender) -> bool { + if self.try_run_owned_task(notify, sender) { + return true; + } + + self.try_steal_task(notify, sender) + } + /// Checks the worker's current state, updating it as needed. /// /// Returns `true` if the worker should run. @@ -200,6 +300,8 @@ impl Worker { fn check_run_state(&self, first: bool) -> bool { use self::Lifecycle::*; + debug_assert!(!self.is_blocking.get()); + let mut state: State = self.entry().state.load(Acquire).into(); loop { @@ -249,8 +351,7 @@ impl Worker { /// Runs the next task on this worker's queue. /// /// Returns `true` if work was found. - #[inline] - fn try_run_task(&self, notify: &Arc, sender: &mut Sender) -> bool { + fn try_run_owned_task(&self, notify: &Arc, sender: &mut Sender) -> bool { use deque::Steal::*; // Poll the internal queue for a task to run @@ -267,10 +368,11 @@ impl Worker { /// Tries to steal a task from another worker. /// /// Returns `true` if work was found - #[inline] fn try_steal_task(&self, notify: &Arc, sender: &mut Sender) -> bool { use deque::Steal::*; + debug_assert!(!self.is_blocking.get()); + let len = self.inner.workers.len(); let mut idx = self.inner.rand_usize() % len; let mut found_work = false; @@ -285,9 +387,12 @@ impl Worker { self.run_task(task, notify, sender); trace!("try_steal_task -- signal_work; self={}; from={}", - self.id.idx, idx); + self.id.0, idx); // Signal other workers that work is available + // + // TODO: Should this be called here or before + // `run_task`? self.inner.signal_work(&self.inner); return true; @@ -312,10 +417,28 @@ impl Worker { fn run_task(&self, task: Arc, notify: &Arc, sender: &mut Sender) { use task::Run::*; - match task.run(notify, sender) { + let run = self.run_task2(&task, notify, sender); + + // TODO: Try to claim back the worker state in case the backup thread + // did not start up fast enough. This is a performance optimization. + + match run { Idle => {} Schedule => { - self.entry().push_internal(task); + if self.is_blocking.get() { + // The future has been notified while it was running. + // However, the future also entered a blocking section, + // which released the worker state from this thread. + // + // This means that scheduling the future must be done from + // a point of view external to the worker set. + // + // We have to call `submit_external` instead of `submit` + // here because `self` is still set as the current worker. + self.inner.submit_external(task, &self.inner); + } else { + self.entry().push_internal(task); + } } Complete => { let mut state: pool::State = self.inner.state.load(Acquire).into(); @@ -351,6 +474,52 @@ impl Worker { } } + /// Actually run the task. This is where `Worker::current_task` is set. + /// + /// Great care is needed to ensure that `current_task` is unset in this + /// function. + fn run_task2(&self, + task: &Arc, + notify: &Arc, + sender: &mut Sender) + -> task::Run + { + struct Guard<'a> { + worker: &'a Worker, + allocated_at_run: bool + } + + impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + // A task is allocated at run when it was explicitly notified + // that the task has capacity to block. When this happens, that + // capacity is automatically allocated to the notified task. + // This capacity is "use it or lose it", so if the thread is not + // transitioned to blocking in this call, then another task has + // to be notified. + if self.allocated_at_run && !self.worker.is_blocking.get() { + self.worker.inner.notify_blocking_task(&self.worker.inner); + } + + self.worker.current_task.clear(); + } + } + + let can_block = task.consume_blocking_allocation(); + + // Set `current_task` + self.current_task.set(task, can_block); + + // Create the guard, this ensures that `current_task` is unset when the + // function returns, even if the return is caused by a panic. + let _g = Guard { + worker: self, + allocated_at_run: can_block == CanBlock::Allocated + }; + + task.run(notify, sender) + } + /// Drains all tasks on the extern queue and pushes them onto the internal /// queue. /// @@ -396,7 +565,11 @@ impl Worker { fn sleep(&self) -> bool { use self::Lifecycle::*; - trace!("Worker::sleep; worker={:?}", self); + // Putting a worker to sleep is a multipart operation. This is, in part, + // due to the fact that a worker can be notified without it being popped + // from the sleep stack. Extra care is needed to deal with this. + + trace!("Worker::sleep; worker={:?}", self.id); let mut state: State = self.entry().state.load(Acquire).into(); @@ -439,12 +612,12 @@ impl Worker { if !state.is_pushed() { debug_assert!(next.is_pushed()); - trace!(" sleeping -- push to stack; idx={}", self.id.idx); + trace!(" sleeping -- push to stack; idx={}", self.id.0); // We obtained permission to push the worker into the // sleeper queue. - if let Err(_) = self.inner.push_sleeper(self.id.idx) { - trace!(" sleeping -- push to stack failed; idx={}", self.id.idx); + if let Err(_) = self.inner.push_sleeper(self.id.0) { + trace!(" sleeping -- push to stack failed; idx={}", self.id.0); // The push failed due to the pool being terminated. // // This is true because the "work" being woken up for is @@ -459,7 +632,7 @@ impl Worker { state = actual; } - trace!(" -> starting to sleep; idx={}", self.id.idx); + trace!(" -> starting to sleep; idx={}", self.id.0); let sleep_until = self.inner.config.keep_alive .map(|dur| Instant::now() + dur); @@ -467,6 +640,7 @@ impl Worker { // The state has been transitioned to sleeping, we can now wait by // calling the parker. This is done in a loop as condvars can wakeup // spuriously. + 'sleep: loop { let mut drop_thread = false; @@ -495,14 +669,91 @@ impl Worker { } } - trace!(" -> wakeup; idx={}", self.id.idx); + trace!(" -> wakeup; idx={}", self.id.0); // Reload the state state = self.entry().state.load(Acquire).into(); + // If the worker has been notified, transition back to running. + match state.lifecycle() { + Sleeping => { + if !drop_thread { + // This goes back to the outer loop. + continue 'sleep; + } + } + Notified | Signaled => { + // Transition back to running + loop { + let mut next = state; + next.set_lifecycle(Running); + + let actual = self.entry().state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if actual == state { + return true; + } + + state = actual; + } + } + Shutdown | Running => { + // To get here, the block above transitioned the tate to + // `Sleeping`. No other thread can concurrently + // transition to `Shutdown` or `Running`. + unreachable!(); + } + } + + // The thread has reached the maximum permitted sleep duration. + // It is now going to begin to shutdown. + // + // Doing this requires first releasing the thread to the backup + // stack. Because the moment the worker state is transitioned to + // `Shutdown`, other threads **expect** the thread's backup + // entry to be available on the backup stack. + // + // However, it is possible that the worker is notified between + // us pushing the backup entry onto the backup stack and + // transitioning the worker to `Shutdown`. If this happens, the + // current thread lost the token to run the backup entry and has + // to shutdown no matter what. + // + // To deal with this, the worker is transitioned to another + // thread. This is a pretty rare condition. + // + // If pushing on the backup stack fails, then the pool is being + // terminated and the thread should just shutdown + let backup_push_err = self.inner.release_backup(self.backup_id).is_err(); + + if backup_push_err { + debug_assert!({ + let state: State = self.entry().state.load(Acquire).into(); + state.lifecycle() != Sleeping + }); + + self.should_finalize.set(true); + + return true; + } + loop { - match state.lifecycle() { - Sleeping => {} + let mut next = state; + next.set_lifecycle(Shutdown); + + let actual: State = self.entry().state.compare_and_swap( + state.into(), next.into(), AcqRel).into(); + + if actual == state { + // Transitioned to a shutdown state + return false; + } + + match actual.lifecycle() { + Sleeping => { + state = actual; + } Notified | Signaled => { // Transition back to running loop { @@ -513,7 +764,8 @@ impl Worker { state.into(), next.into(), AcqRel).into(); if actual == state { - return true; + self.inner.spawn_thread(self.id.clone(), &self.inner); + return false; } state = actual; @@ -526,27 +778,7 @@ impl Worker { unreachable!(); } } - - if !drop_thread { - // This goees back to the outer loop. - break; - } - - let mut next = state; - next.set_lifecycle(Shutdown); - - let actual = self.entry().state.compare_and_swap( - state.into(), next.into(), AcqRel).into(); - - if actual == state { - // Transitioned to a shutdown state - return false; - } - - state = actual; } - - // The worker hasn't been notified, go back to sleep } } @@ -562,13 +794,14 @@ impl Worker { } fn entry(&self) -> &Entry { - &self.inner.workers[self.id.idx] + debug_assert!(!self.is_blocking.get()); + &self.inner.workers[self.id.0] } } impl Drop for Worker { fn drop(&mut self) { - trace!("shutting down thread; idx={}", self.id.idx); + trace!("shutting down thread; idx={}", self.id.0); if self.should_finalize.get() { // Get all inbound work and push it onto the work queue. The work @@ -579,13 +812,51 @@ impl Drop for Worker { self.entry().drain_tasks(); // TODO: Drain the work queue... - self.inner.worker_terminated(); } } } +// ===== impl CurrentTask ===== + +impl CurrentTask { + /// Returns a default `CurrentTask` representing no task. + fn new() -> CurrentTask { + CurrentTask { + task: Cell::new(None), + can_block: Cell::new(CanBlock::CanRequest), + } + } + + /// Returns a reference to the task. + fn get_ref(&self) -> &Arc { + unsafe { &*self.task.get().unwrap() } + } + + fn can_block(&self) -> CanBlock { + self.can_block.get() + } + + fn set_can_block(&self, can_block: CanBlock) { + self.can_block.set(can_block); + } + + fn set(&self, task: &Arc, can_block: CanBlock) { + self.task.set(Some(task as *const _)); + self.can_block.set(can_block); + } + + /// Reset the `CurrentTask` to null state. + fn clear(&self) { + self.task.set(None); + self.can_block.set(CanBlock::CanRequest); + } +} + +// ===== impl WorkerId ===== + impl WorkerId { + /// Returns a `WorkerId` representing the worker entry at index `idx`. pub(crate) fn new(idx: usize) -> WorkerId { - WorkerId { idx } + WorkerId(idx) } } diff --git a/tokio-threadpool/src/pool/stack.rs b/tokio-threadpool/src/worker/stack.rs similarity index 96% rename from tokio-threadpool/src/pool/stack.rs rename to tokio-threadpool/src/worker/stack.rs index 60cb75ee088..9e0e7c9b40f 100644 --- a/tokio-threadpool/src/pool/stack.rs +++ b/tokio-threadpool/src/worker/stack.rs @@ -19,7 +19,7 @@ use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed}; /// /// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack #[derive(Debug)] -pub(crate) struct SleepStack { +pub(crate) struct Stack { state: AtomicUsize, } @@ -36,6 +36,8 @@ pub(crate) struct SleepStack { pub struct State(usize); /// Extracts the head of the worker stack from the scheduler state +/// +/// The 16 relates to the value of MAX_WORKERS const STACK_MASK: usize = ((1 << 16) - 1); /// Used to mark the stack as empty @@ -53,13 +55,13 @@ const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; #[cfg(target_pointer_width = "32")] const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; -// ===== impl SleepStack ===== +// ===== impl Stack ===== -impl SleepStack { - /// Create a new `SleepStack` representing the empty state. - pub fn new() -> SleepStack { +impl Stack { + /// Create a new `Stack` representing the empty state. + pub fn new() -> Stack { let state = AtomicUsize::new(State::new().into()); - SleepStack { state } + Stack { state } } /// Push a worker onto the stack @@ -99,7 +101,6 @@ impl SleepStack { } } - /// Pop a worker off the stack. /// /// If `terminate` is set and the stack is empty when this function is diff --git a/tokio-threadpool/src/worker/state.rs b/tokio-threadpool/src/worker/state.rs index a74517c6e95..cb98937840c 100644 --- a/tokio-threadpool/src/worker/state.rs +++ b/tokio-threadpool/src/worker/state.rs @@ -18,7 +18,7 @@ pub(crate) enum Lifecycle { /// The worker does not currently have an associated thread. Shutdown = 0 << LIFECYCLE_SHIFT, - /// The worker is currently processing its task. + /// The worker is doing work Running = 1 << LIFECYCLE_SHIFT, /// The worker is currently asleep in the condvar diff --git a/tokio-threadpool/tests/blocking.rs b/tokio-threadpool/tests/blocking.rs new file mode 100644 index 00000000000..e9de847d0e7 --- /dev/null +++ b/tokio-threadpool/tests/blocking.rs @@ -0,0 +1,410 @@ +extern crate tokio_threadpool; + +extern crate env_logger; +#[macro_use] +extern crate futures; +extern crate rand; + +use tokio_threadpool::*; + +use futures::*; +use futures::future::{lazy, poll_fn}; +use rand::*; + +use std::sync::*; +use std::sync::atomic::*; +use std::sync::atomic::Ordering::*; +use std::time::Duration; +use std::thread; + +#[test] +fn basic() { + let _ = ::env_logger::init(); + + let pool = Builder::new() + .pool_size(1) + .max_blocking(1) + .build(); + + let (tx1, rx1) = mpsc::channel(); + let (tx2, rx2) = mpsc::channel(); + + pool.spawn(lazy(move || { + let res = blocking(|| { + let v = rx1.recv().unwrap(); + tx2.send(v).unwrap(); + }).unwrap(); + + assert!(res.is_ready()); + Ok(().into()) + })); + + pool.spawn(lazy(move || { + tx1.send(()).unwrap(); + Ok(().into()) + })); + + rx2.recv().unwrap(); +} + +#[test] +fn notify_task_on_capacity() { + const BLOCKING: usize = 10; + + let pool = Builder::new() + .pool_size(1) + .max_blocking(1) + .build(); + + let rem = Arc::new(AtomicUsize::new(BLOCKING)); + let (tx, rx) = mpsc::channel(); + + for _ in 0..BLOCKING { + let rem = rem.clone(); + let tx = tx.clone(); + + pool.spawn(lazy(move || { + poll_fn(move || { + blocking(|| { + thread::sleep(Duration::from_millis(100)); + let prev = rem.fetch_sub(1, Relaxed); + + if prev == 1 { + tx.send(()).unwrap(); + } + }).map_err(|e| panic!("blocking err {:?}", e)) + }) + })); + } + + rx.recv().unwrap(); + + assert_eq!(0, rem.load(Relaxed)); +} + +#[test] +fn capacity_is_use_it_or_lose_it() { + use futures::*; + use futures::Async::*; + use futures::sync::oneshot; + use futures::task::Task; + + // TODO: Run w/ bigger pool size + + let pool = Builder::new() + .pool_size(1) + .max_blocking(1) + .build(); + + let (tx1, rx1) = mpsc::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = mpsc::channel(); + let (tx4, rx4) = mpsc::channel(); + + // First, fill the blocking capacity + pool.spawn(lazy(move || { + poll_fn(move || { + blocking(|| { + rx1.recv().unwrap(); + }).map_err(|_| panic!()) + }) + })); + + pool.spawn(lazy(move || { + rx2 + .map_err(|_| panic!()) + .and_then(|task: Task| { + poll_fn(move || { + blocking(|| { + // Notify the other task + task.notify(); + + // Block until woken + rx3.recv().unwrap(); + }).map_err(|_| panic!()) + }) + }) + })); + + // Spawn a future that will try to block, get notified, then not actually + // use the blocking + let mut i = 0; + let mut tx2 = Some(tx2); + + pool.spawn(lazy(move || { + poll_fn(move || { + match i { + 0 => { + i = 1; + + let res = blocking(|| unreachable!()) + .map_err(|_| panic!()); + + assert!(res.unwrap().is_not_ready()); + + // Unblock the first blocker + tx1.send(()).unwrap(); + + return Ok(NotReady); + } + 1 => { + i = 2; + + // Skip blocking, and notify the second task that it should + // start blocking + let me = task::current(); + tx2.take().unwrap().send(me).unwrap(); + + return Ok(NotReady); + } + 2 => { + let res = blocking(|| unreachable!()) + .map_err(|_| panic!()); + + assert!(res.unwrap().is_not_ready()); + + // Unblock the first blocker + tx3.send(()).unwrap(); + tx4.send(()).unwrap(); + Ok(().into()) + } + _ => unreachable!(), + } + }) + })); + + rx4.recv().unwrap(); +} + +#[test] +fn blocking_thread_does_not_take_over_shutdown_worker_thread() { + let pool = Builder::new() + .pool_size(2) + .max_blocking(1) + .build(); + + let (enter_tx, enter_rx) = mpsc::channel(); + let (exit_tx, exit_rx) = mpsc::channel(); + let (try_tx, try_rx) = mpsc::channel(); + + let exited = Arc::new(AtomicBool::new(false)); + + { + let exited = exited.clone(); + + pool.spawn(lazy(move || { + poll_fn(move || { + blocking(|| { + enter_tx.send(()).unwrap(); + exit_rx.recv().unwrap(); + exited.store(true, Relaxed); + }).map_err(|_| panic!()) + }) + })); + } + + // Wait for the task to block + let _ = enter_rx.recv().unwrap(); + + // Spawn another task that attempts to block + pool.spawn(lazy(move || { + poll_fn(move || { + let res = blocking(|| { + + }).unwrap(); + + assert_eq!( + res.is_ready(), + exited.load(Relaxed)); + + try_tx.send(res.is_ready()).unwrap(); + + Ok(res) + }) + })); + + // Wait for the second task to try to block (and not be ready). + let res = try_rx.recv().unwrap(); + assert!(!res); + + // Unblock the first task + exit_tx.send(()).unwrap(); + + // Wait for the second task to successfully block. + let res = try_rx.recv().unwrap(); + assert!(res); + + drop(pool); +} + +#[test] +fn blockin_one_time_gets_capacity_for_multiple_blocks() { + const ITER: usize = 1; + const BLOCKING: usize = 2; + + for _ in 0..ITER { + let pool = Builder::new() + .pool_size(4) + .max_blocking(1) + .build(); + + let rem = Arc::new(AtomicUsize::new(BLOCKING)); + let (tx, rx) = mpsc::channel(); + + for _ in 0..BLOCKING { + let rem = rem.clone(); + let tx = tx.clone(); + + pool.spawn(lazy(move || { + poll_fn(move || { + // First block + let res = blocking(|| { + thread::sleep(Duration::from_millis(100)); + }).map_err(|e| panic!("blocking err {:?}", e)); + + try_ready!(res); + + let res = blocking(|| { + thread::sleep(Duration::from_millis(100)); + let prev = rem.fetch_sub(1, Relaxed); + + if prev == 1 { + tx.send(()).unwrap(); + } + }); + + assert!(res.unwrap().is_ready()); + + Ok(().into()) + }) + })); + } + + rx.recv().unwrap(); + + assert_eq!(0, rem.load(Relaxed)); + } +} + +#[test] +fn shutdown() { + const ITER: usize = 1_000; + const BLOCKING: usize = 10; + + for _ in 0..ITER { + let num_inc = Arc::new(AtomicUsize::new(0)); + let num_dec = Arc::new(AtomicUsize::new(0)); + let (tx, rx) = mpsc::channel(); + + let pool = { + let num_inc = num_inc.clone(); + let num_dec = num_dec.clone(); + + Builder::new() + .pool_size(1) + .max_blocking(BLOCKING) + .after_start(move || { num_inc.fetch_add(1, Relaxed); }) + .before_stop(move || { num_dec.fetch_add(1, Relaxed); }) + .build() + }; + + let barrier = Arc::new(Barrier::new(BLOCKING)); + + for _ in 0..BLOCKING { + let barrier = barrier.clone(); + let tx = tx.clone(); + + pool.spawn(lazy(move || { + let res = blocking(|| { + barrier.wait(); + Ok::<_, ()>(()) + }).unwrap(); + + tx.send(()).unwrap(); + + assert!(res.is_ready()); + Ok(().into()) + })); + } + + for _ in 0..BLOCKING { + rx.recv().unwrap(); + } + + // Shutdown + drop(pool); + + assert_eq!(11, num_inc.load(Relaxed)); + assert_eq!(11, num_dec.load(Relaxed)); + } +} + +#[derive(Debug, Copy, Clone)] +enum Sleep { + Skip, + Yield, + Rand, + Fixed(Duration), +} + +#[test] +fn hammer() { + use self::Sleep::*; + + const ITER: usize = 5; + + let combos = [ + (2, 4, 1_000, Skip), + (2, 4, 1_000, Yield), + (2, 4, 100, Rand), + (2, 4, 100, Fixed(Duration::from_millis(3))), + (2, 4, 100, Fixed(Duration::from_millis(12))), + ]; + + for &(size, max_blocking, n, sleep) in &combos { + for _ in 0..ITER { + let pool = Builder::new() + .pool_size(size) + .max_blocking(max_blocking) + .build(); + + let cnt_task = Arc::new(AtomicUsize::new(0)); + let cnt_block = Arc::new(AtomicUsize::new(0)); + + for _ in 0..n { + let cnt_task = cnt_task.clone(); + let cnt_block = cnt_block.clone(); + + pool.spawn(lazy(move || { + cnt_task.fetch_add(1, Relaxed); + + poll_fn(move || { + blocking(|| { + match sleep { + Skip => {} + Yield => { + thread::yield_now(); + } + Rand => { + let ms = thread_rng().gen_range(3, 12); + thread::sleep(Duration::from_millis(ms)); + } + Fixed(dur) => { + thread::sleep(dur); + } + } + + cnt_block.fetch_add(1, Relaxed); + }).map_err(|_| panic!()) + }) + })); + } + + // Wait for the work to complete + pool.shutdown_on_idle().wait().unwrap(); + + assert_eq!(n, cnt_task.load(Relaxed)); + assert_eq!(n, cnt_block.load(Relaxed)); + } + } +} diff --git a/tokio-threadpool/tests/hammer.rs b/tokio-threadpool/tests/hammer.rs new file mode 100644 index 00000000000..7d1e5152aed --- /dev/null +++ b/tokio-threadpool/tests/hammer.rs @@ -0,0 +1,107 @@ +extern crate futures; +extern crate tokio_threadpool; + +use tokio_threadpool::*; + +use futures::{Future, Stream, Sink, Poll}; + +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::*; + +#[test] +fn hammer() { + use futures::future; + use futures::sync::{oneshot, mpsc}; + + const N: usize = 1000; + const ITER: usize = 20; + + struct Counted { + cnt: Arc, + inner: T, + } + + impl Future for Counted { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } + } + + impl Drop for Counted { + fn drop(&mut self) { + self.cnt.fetch_add(1, Relaxed); + } + } + + for _ in 0.. ITER { + let pool = Builder::new() + // .pool_size(30) + .build(); + + let cnt = Arc::new(AtomicUsize::new(0)); + + let (listen_tx, listen_rx) = mpsc::unbounded::>>(); + let mut listen_tx = listen_tx.wait(); + + pool.spawn({ + let c1 = cnt.clone(); + let c2 = cnt.clone(); + let pool = pool.sender().clone(); + let task = listen_rx + .map_err(|e| panic!("accept error = {:?}", e)) + .for_each(move |tx| { + let task = future::lazy(|| { + let (tx2, rx2) = oneshot::channel(); + + tx.send(tx2).unwrap(); + rx2 + }) + .map_err(|e| panic!("e={:?}", e)) + .and_then(|_| { + Ok(()) + }); + + pool.spawn(Counted { + inner: task, + cnt: c1.clone(), + }).unwrap(); + + Ok(()) + }); + + Counted { + inner: task, + cnt: c2, + } + }); + + for _ in 0..N { + let cnt = cnt.clone(); + let (tx, rx) = oneshot::channel(); + listen_tx.send(tx).unwrap(); + + pool.spawn({ + let task = rx + .map_err(|e| panic!("rx err={:?}", e)) + .and_then(|tx| { + tx.send(()).unwrap(); + Ok(()) + }); + + Counted { + inner: task, + cnt, + } + }); + } + + drop(listen_tx); + + pool.shutdown_on_idle().wait().unwrap(); + assert_eq!(N * 2 + 1, cnt.load(Relaxed)); + } +} diff --git a/tokio-threadpool/tests/threadpool.rs b/tokio-threadpool/tests/threadpool.rs index 3b704851d4f..bb05cb739c6 100644 --- a/tokio-threadpool/tests/threadpool.rs +++ b/tokio-threadpool/tests/threadpool.rs @@ -26,7 +26,7 @@ fn lazy(f: F) -> Box + Send> wher use std::cell::Cell; use std::sync::{mpsc, Arc}; -use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; +use std::sync::atomic::*; use std::sync::atomic::Ordering::Relaxed; use std::time::Duration; @@ -88,19 +88,25 @@ fn natural_shutdown_simple_futures() { let _ = ::env_logger::init(); for _ in 0..1_000 { - static NUM_INC: AtomicUsize = ATOMIC_USIZE_INIT; - static NUM_DEC: AtomicUsize = ATOMIC_USIZE_INIT; + let num_inc = Arc::new(AtomicUsize::new(0)); + let num_dec = Arc::new(AtomicUsize::new(0)); FOO.with(|f| { f.set(1); - let pool = Builder::new() - .around_worker(|w, _| { - NUM_INC.fetch_add(1, Relaxed); - w.run(); - NUM_DEC.fetch_add(1, Relaxed); - }) - .build(); + let pool = { + let num_inc = num_inc.clone(); + let num_dec = num_dec.clone(); + + Builder::new() + .around_worker(move |w, _| { + num_inc.fetch_add(1, Relaxed); + w.run(); + num_dec.fetch_add(1, Relaxed); + }) + .build() + }; + let mut tx = pool.sender().clone(); let a = { @@ -136,11 +142,11 @@ fn natural_shutdown_simple_futures() { await_shutdown(pool.shutdown()); // Assert that at least one thread started - let num_inc = NUM_INC.load(Relaxed); + let num_inc = num_inc.load(Relaxed); assert!(num_inc > 0); // Assert that all threads shutdown - let num_dec = NUM_DEC.load(Relaxed); + let num_dec = num_dec.load(Relaxed); assert_eq!(num_inc, num_dec); }); } @@ -255,6 +261,8 @@ fn drop_threadpool_drops_futures() { let b = num_dec.clone(); let pool = Builder::new() + .max_blocking(2) + .pool_size(20) .around_worker(move |w, _| { a.fetch_add(1, Relaxed); w.run(); @@ -471,7 +479,11 @@ fn busy_threadpool_is_not_idle() { #[cfg(feature = "unstable-futures")] use futures2::channel::oneshot; - let pool = ThreadPool::new(); + // let pool = ThreadPool::new(); + let pool = Builder::new() + .pool_size(4) + .max_blocking(2) + .build(); let mut tx = pool.sender().clone(); let (term_tx, term_rx) = oneshot::channel(); @@ -548,101 +560,3 @@ fn panic_in_task() { await_shutdown(pool.shutdown_on_idle()); } - -#[test] -#[cfg(not(feature = "unstable-futures"))] -fn hammer() { - use futures::future; - use futures::sync::{oneshot, mpsc}; - - const N: usize = 1000; - const ITER: usize = 20; - - struct Counted { - cnt: Arc, - inner: T, - } - - impl Future for Counted { - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll() - } - } - - impl Drop for Counted { - fn drop(&mut self) { - self.cnt.fetch_add(1, Relaxed); - } - } - - for _ in 0.. ITER { - let pool = Builder::new() - // .pool_size(30) - .build(); - - let cnt = Arc::new(AtomicUsize::new(0)); - - let (listen_tx, listen_rx) = mpsc::unbounded::>>(); - let mut listen_tx = listen_tx.wait(); - - pool.spawn({ - let c1 = cnt.clone(); - let c2 = cnt.clone(); - let pool = pool.sender().clone(); - let task = listen_rx - .map_err(|e| panic!("accept error = {:?}", e)) - .for_each(move |tx| { - let task = future::lazy(|| { - let (tx2, rx2) = oneshot::channel(); - - tx.send(tx2).unwrap(); - rx2 - }) - .map_err(|e| panic!("e={:?}", e)) - .and_then(|_| { - Ok(()) - }); - - pool.spawn(Counted { - inner: task, - cnt: c1.clone(), - }).unwrap(); - - Ok(()) - }); - - Counted { - inner: task, - cnt: c2, - } - }); - - for _ in 0..N { - let cnt = cnt.clone(); - let (tx, rx) = oneshot::channel(); - listen_tx.send(tx).unwrap(); - - pool.spawn({ - let task = rx - .map_err(|e| panic!("rx err={:?}", e)) - .and_then(|tx| { - tx.send(()).unwrap(); - Ok(()) - }); - - Counted { - inner: task, - cnt, - } - }); - } - - drop(listen_tx); - - pool.shutdown_on_idle().wait().unwrap(); - assert_eq!(N * 2 + 1, cnt.load(Relaxed)); - } -}