Skip to content

Commit

Permalink
Threadpool blocking (#317)
Browse files Browse the repository at this point in the history
This patch adds a `blocking` to `tokio-threadpool`. This function serves
as a way to annotate sections of code that will perform blocking
operations. This informs the thread pool that an additional thread needs
to be spawned to replace the current thread, which will no longer be
able to process the work queue.
  • Loading branch information
carllerche authored Apr 15, 2018
1 parent 372400e commit 61d635e
Show file tree
Hide file tree
Showing 26 changed files with 2,794 additions and 286 deletions.
17 changes: 15 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,28 @@ script:
# Make sure the benchmarks compile
cargo build --benches --all
export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0"
export TSAN_OPTIONS="suppressions=`pwd`/ci/tsan"
# === tokio-timer ====
# Run address sanitizer
ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" \
RUSTFLAGS="-Z sanitizer=address" \
cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu
# Run thread sanitizer
TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" \
RUSTFLAGS="-Z sanitizer=thread" \
cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu
# === tokio-threadpool ====
# Run address sanitizer
RUSTFLAGS="-Z sanitizer=address" \
cargo test -p tokio-threadpool --tests
# Run thread sanitizer
RUSTFLAGS="-Z sanitizer=thread" \
cargo test -p tokio-threadpool --tests
fi
- |
set -e
Expand Down
26 changes: 26 additions & 0 deletions ci/tsan
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,29 @@
# This causes many false positives.
race:Arc*drop
race:arc*Weak*drop

# `std` mpsc is not used in any Tokio code base. This race is triggered by some
# rust runtime logic.
race:std*mpsc_queue

# Probably more fences in std.
race:__call_tls_dtors

# The crossbeam deque uses fences.
race:crossbeam_deque

# This is excluded as this race shows up due to using the stealing features of
# the deque. Unfortunately, the implementation uses a fence, which makes tsan
# unhappy.
#
# TODO: It would be nice to not have to filter this out.
race:try_steal_task

# This filters out an expected data race in the treiber stack implementation.
# Treiber stacks are inherently racy. The pop operation will attempt to access
# the "next" pointer on the node it is attempting to pop. However, at this
# point it has not gained ownership of the node and another thread might beat
# it and take ownership of the node first (touching the next pointer). The
# original pop operation will fail due to the ABA guard, but tsan still picks
# up the access on the next pointer.
race:Backup::next_sleeper
3 changes: 3 additions & 0 deletions tokio-threadpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ futures2 = { version = "0.1", path = "../futures2", optional = true }
[dev-dependencies]
tokio-timer = "0.1"
env_logger = "0.4"

# For comparison benchmarks
futures-cpupool = "0.1.7"
threadpool = "1.7.1"

[features]
unstable-futures = [
Expand Down
148 changes: 148 additions & 0 deletions tokio-threadpool/benches/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#![feature(test)]
#![deny(warnings)]

extern crate futures;
extern crate rand;
extern crate tokio_threadpool;
extern crate threadpool;
extern crate test;

const ITER: usize = 1_000;

mod blocking {
use super::*;

use futures::future::*;
use tokio_threadpool::{Builder, blocking};

#[bench]
fn cpu_bound(b: &mut test::Bencher) {
let pool = Builder::new()
.pool_size(2)
.max_blocking(20)
.build();

b.iter(|| {
let count_down = Arc::new(CountDown::new(::ITER));

for _ in 0..::ITER {
let count_down = count_down.clone();

pool.spawn(lazy(move || {
poll_fn(|| {
blocking(|| {
perform_complex_computation()
})
.map_err(|_| panic!())
})
.and_then(move |_| {
// Do something with the value
count_down.dec();
Ok(())
})
}));
}

count_down.wait();
})
}
}

mod message_passing {
use super::*;

use futures::future::*;
use futures::sync::oneshot;
use tokio_threadpool::Builder;

#[bench]
fn cpu_bound(b: &mut test::Bencher) {
let pool = Builder::new()
.pool_size(2)
.max_blocking(20)
.build();

let blocking = threadpool::ThreadPool::new(20);

b.iter(|| {
let count_down = Arc::new(CountDown::new(::ITER));

for _ in 0..::ITER {
let count_down = count_down.clone();
let blocking = blocking.clone();

pool.spawn(lazy(move || {
// Create a channel to receive the return value.
let (tx, rx) = oneshot::channel();

// Spawn a task on the blocking thread pool to process the
// computation.
blocking.execute(move || {
let res = perform_complex_computation();
tx.send(res).unwrap();
});

rx.and_then(move |_| {
count_down.dec();
Ok(())
}).map_err(|_| panic!())
}));
}

count_down.wait();
})
}
}

fn perform_complex_computation() -> usize {
use rand::*;

// Simulate a CPU heavy computation
let mut rng = rand::thread_rng();
rng.gen()
}

// Util for waiting until the tasks complete

use std::sync::*;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;

struct CountDown {
rem: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}

impl CountDown {
fn new(rem: usize) -> Self {
CountDown {
rem: AtomicUsize::new(rem),
mutex: Mutex::new(()),
condvar: Condvar::new(),
}
}

fn dec(&self) {
let prev = self.rem.fetch_sub(1, AcqRel);

if prev != 1 {
return;
}

let _lock = self.mutex.lock().unwrap();
self.condvar.notify_all();
}

fn wait(&self) {
let mut lock = self.mutex.lock().unwrap();

loop {
if self.rem.load(Acquire) == 0 {
return;
}

lock = self.condvar.wait(lock).unwrap();
}
}
}
148 changes: 148 additions & 0 deletions tokio-threadpool/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use worker::Worker;

use futures::Poll;

/// Error raised by `blocking`.
#[derive(Debug)]
pub struct BlockingError {
_p: (),
}

/// Enter a blocking section of code.
///
/// The `blocking` function annotates a section of code that performs a blocking
/// operation, either by issuing a blocking syscall or by performing a long
/// running CPU-bound computation.
///
/// When the `blocking` function enters, it hands off the responsibility of
/// processing the current work queue to another thread. Then, it calls the
/// supplied closure. The closure is permitted to block indefinitely.
///
/// If the maximum number of concurrent `blocking` calls has been reached, then
/// `NotReady` is returned and the task is notified once existing `blocking`
/// calls complete. The maximum value is specified when creating a thread pool
/// using [`Builder::max_blocking`][build]
///
/// [build]: struct.Builder.html#method.max_blocking
///
/// # Return
///
/// When the blocking closure is executed, `Ok(T)` is returned, where `T` is the
/// closure's return value.
///
/// If the thread pool has shutdown, `Err` is returned.
///
/// If the number of concurrent `blocking` calls has reached the maximum,
/// `Ok(NotReady)` is returned and the current task is notified when a call to
/// `blocking` will succeed.
///
/// If `blocking` is called from outside the context of a Tokio thread pool,
/// `Err` is returned.
///
/// # Background
///
/// By default, the Tokio thread pool expects that tasks will only run for short
/// periods at a time before yielding back to the thread pool. This is the basic
/// premise of cooperative multitasking.
///
/// However, it is common to want to perform a blocking operation while
/// processing an asynchronous computation. Examples of blocking operation
/// include:
///
/// * Performing synchronous file operations (reading and writing).
/// * Blocking on acquiring a mutex.
/// * Performing a CPU bound computation, like cryptographic encryption or
/// decryption.
///
/// One option for dealing with blocking operations in an asynchronous context
/// is to use a thread pool dedicated to performing these operations. This not
/// ideal as it requires bidirectional message passing as well as a channel to
/// communicate which adds a level of buffering.
///
/// Instead, `blocking` hands off the responsiblity of processing the work queue
/// to another thread. This hand off is light compared to a channel and does not
/// require buffering.
///
/// # Examples
///
/// Block on receiving a message from a `std` channel. This example is a little
/// silly as using the non-blocking channel from the `futures` crate would make
/// more sense. The blocking receive can be replaced with any blocking operation
/// that needs to be performed.
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_threadpool;
///
/// use tokio_threadpool::{ThreadPool, blocking};
///
/// use futures::Future;
/// use futures::future::{lazy, poll_fn};
///
/// use std::sync::mpsc;
/// use std::thread;
/// use std::time::Duration;
///
/// pub fn main() {
/// // This is a *blocking* channel
/// let (tx, rx) = mpsc::channel();
///
/// // Spawn a thread to send a message
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// tx.send("hello").unwrap();
/// });
///
/// let pool = ThreadPool::new();
///
/// pool.spawn(lazy(move || {
/// // Because `blocking` returns `Poll`, it is intended to be used
/// // from the context of a `Future` implementation. Since we don't
/// // have a complicated requirement, we can use `poll_fn` in this
/// // case.
/// poll_fn(move || {
/// blocking(|| {
/// let msg = rx.recv().unwrap();
/// println!("message = {}", msg);
/// }).map_err(|_| panic!("the threadpool shut down"))
/// })
/// }));
///
/// // Wait for the task we just spawned to complete.
/// pool.shutdown_on_idle().wait().unwrap();
/// }
/// ```
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
where F: FnOnce() -> T,
{
let res = Worker::with_current(|worker| {
let worker = match worker {
Some(worker) => worker,
None => {
return Err(BlockingError { _p: () });
}
};

// Transition the worker state to blocking. This will exit the fn early
// with `NotRead` if the pool does not have enough capacity to enter
// blocking mode.
worker.transition_to_blocking()
});

// If the transition cannot happen, exit early
try_ready!(res);

// Currently in blocking mode, so call the inner closure
let ret = f();

// Try to transition out of blocking mode. This is a fast path that takes
// back ownership of the worker if the worker handoff didn't complete yet.
Worker::with_current(|worker| {
// Worker must be set since it was above.
worker.unwrap()
.transition_from_blocking();
});

// Return the result
Ok(ret.into())
}
Loading

0 comments on commit 61d635e

Please sign in to comment.