Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some thread pool docs #421

Merged
merged 3 commits into from
Jun 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,116 @@
#![doc(html_root_url = "https://docs.rs/tokio-threadpool/0.1.4")]
#![deny(warnings, missing_docs, missing_debug_implementations)]

// The Tokio thread pool is designed to scheduled futures in Tokio based
// applications. The thread pool structure manages two sets of threads:
//
// * Worker threads.
// * Backup threads.
//
// Worker threads are used to schedule futures using a work-stealing strategy.
// Backup threads, on the other hand, are intended only to support the
// `blocking` API. Threads will transition between the two sets.
//
// The advantage of the work-stealing strategy is minimal cross-thread
// coordination. The thread pool attempts to make as much progress as possible
// without communicating across threads.
//
// # Crate layout
//
// The primary type, `Pool`, holds the majority of a thread pool's state,
// including the state for each worker. Each worker's state is maintained in an
// instance of `worker::Entry`.
//
// `Worker` contains the logic that runs on each worker thread. It holds an
// `Arc` to `Pool` and is able to access its state from `Pool`.
//
// `Task` is a harness around an individual future. It manages polling and
// scheduling that future.
//
// # Worker overview
//
// Each worker has two queues: a deque and a mpsc channel. The deque is the
// primary queue for tasks that are scheduled to run on the worker thread. Tasks
// can only be pushed onto the deque by the worker, but other workers may
// "steal" from that deque. The mpsc channel is used to submit futures while
// external to the pool.
//
// As long as the thread pool has not been shutdown, a worker will run in a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sentence makes me wonder: what happens when the thread pool shuts down?

// loop. Each loop, it consumes all tasks on its mpsc channel and pushes it onto
// the deque. It then pops tasks off of the deque and executes them.
//
// If a worker has no work, i.e., both queues are empty. It attempts to steal.
// To do this, it randomly scans other workers' deques and tries to pop a task.
// If it finds no work to steal, the thread goes to sleep.
//
// When the worker detects that the pool has been shut down, it exits the loop,
// cleans up its state, and shuts the thread down.
//
// # Thread pool initialization
//
// By default, no threads are spawned on creation. Instead, when new futures are
// spawned, the pool first checks if there are enough active worker threads. If
// not, a new worker thread is spawned.
//
// # Spawning futures
//
// The spawning behavior depends on whether a future was spawned from within a
// worker or thread or if it was spawned from an external handle.
//
// When spawning a future while external to the thread pool, the current
// strategy is to randomly pick a worker to submit the task to. The task is then
// pushed onto that worker's mpsc channel.
//
// When spawning a future while on a worker thread, the task is pushed onto the
// back of the current worker's deque.
//
// # Sleeping workers
//
// Sleeping workers are tracked using a treiber stack [1]. This results in the
// thread that most recently went to sleep getting woken up first. When the pool
// is not under load, this helps threads shutdown faster.
//
// Sleeping is done by using `tokio_executor::Park` implementations. This allows
// the user of the thread pool to customize the work that is performed to sleep.
// This is how injecting timers and other functionality into the thread pool is
// done.
//
// [1]: https://en.wikipedia.org/wiki/Treiber_Stack
//
// # Notifying workers
//
// When there is work to be done, workers must be notified. However, notifying a
// worker requires cross thread coordination. Ideally, a worker would only be
// notified when it is sleeping, but there is no way to know if a worker is
// sleeping without cross thread communication.
//
// The two cases when a worker might need to be notified are:
//
// 1) A task is externally submitted to a worker via the mpsc channel.
// 2) A worker has a back log of work and needs other workers to steal from it.
//
// In the first case, the worker will always be notified. However, it could be
// possible to avoid the notification if the mpsc channel has two or greater
// number of tasks *after* the task is submitted. In this case, we are able to
// assume that the worker has previously been notified.
//
// The second case is trickier. Currently, whenever a worker spawns a new future
// (pushing it onto its deque) and when it pops a future from its mpsc, it tries
// to notify a sleeping worker to wake up and start stealing. This is a lot of
// notification and it **might** be possible to reduce it.
//
// Also, whenever a worker is woken up via a signal and it does find work, it,
// in turn, will try to wake up a new worker.
//
// # `blocking`
//
// The strategy for handling blocking closures is to hand off the worker to a
// new thread. This implies handing off the `deque` and `mpsc`. Once this is
// done, the new thread continues to process the work queue and the original
// thread is able to block. Once it finishes processing the blocking future, the
// thread has no additional work and is inserted into the backup pool. This
// makes it available to other workers that encounter a `blocking` call.

extern crate tokio_executor;

extern crate crossbeam_deque as deque;
Expand Down
10 changes: 8 additions & 2 deletions tokio-threadpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ use std::thread;

use rand::{Rng, SeedableRng, XorShiftRng};

// TODO: Rename this
#[derive(Debug)]
pub(crate) struct Pool {
// ThreadPool state
// Tracks the state of the thread pool (running, shutting down, ...).
//
// While workers check this field as a hint to detect shutdown, it is
// **not** used as a primary point of coordination for workers. The sleep
// stack is used as the primary point of coordination for workers.
//
// The value of this atomic is deserialized into a `pool::State` instance.
// See comments for that type.
pub state: AtomicUsize,

// Stack tracking sleeping workers.
Expand Down
3 changes: 3 additions & 0 deletions tokio-threadpool/src/worker/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use deque;
// operations are thread-safe vs. which ones require ownership of the worker.
pub(crate) struct WorkerEntry {
// Worker state. This is mutated when notifying the worker.
//
// The `usize` value is deserialized to a `worker::State` instance. See
// comments on that type.
pub state: AtomicUsize,

// Next entry in the parked Trieber stack
Expand Down
28 changes: 26 additions & 2 deletions tokio-threadpool/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,29 @@ impl Worker {
state = actual;
}

// If this is the first iteration of the worker loop, then the state can
// be signaled.
// `first` is set to true the first time this function is called after
// the thread has started.
//
// This check is to handle the scenario where a worker gets signaled
// while it is already happily running. The `is_signaled` state is
// intended to wake up a worker that has been previously sleeping in
// effect increasing the number of active workers. If this is the first
// time `check_run_state` is called, then being in a signalled state is
// normal and the thread was started to handle it. However, if this is
// **not** the first time the fn was called, then the number of active
// workers has not been increased by the signal, so `signal_work` has to
// be called again to try to wake up another worker.
//
// For example, if the thread pool is configured to allow 4 workers.
// Worker 1 is processing tasks from its `deque`. Worker 2 receives its
// first task. Worker 2 will pick a random worker to signal. It does
// this by popping off the sleep stack, but there is no guarantee that
// workers on the sleep stack are actually sleeping. It is possible that
// Worker 1 gets signaled.
//
// Without this check, in the above case, no additional workers will get
// started, which results in the thread pool permanently being at 2
// workers even though it should reach 4.
if !first && state.is_signaled() {
trace!("Worker::check_run_state; delegate signal");
// This worker is not ready to be signaled, so delegate the signal
Expand Down Expand Up @@ -537,6 +558,9 @@ impl Worker {
match task {
Empty => {
if found_work {
// TODO: Why is this called on every iteration? Would it
// not be better to only signal when work was found
// after waking up?
trace!("found work while draining; signal_work");
self.inner.signal_work(&self.inner);
}
Expand Down