Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool blocking #317

Merged
merged 15 commits into from
Apr 15, 2018
Merged

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented Apr 13, 2018

This patch adds a blocking to tokio-threadpool. This function serves as a
way to annotate sections of code that will perform blocking operations. This
informs the thread pool that an additional thread needs to be spawned to
replace the current thread, which will no longer be able to process the work
queue.

By default, the Tokio thread pool expects that tasks will only run for short
periods at a time before yielding back to the thread pool. This is the basic
premise of cooperative multitasking.

However, it is common to want to perform a blocking operation while
processing an asynchronous computation. Examples of blocking operation
include:

  • Performing synchronous file operations (reading and writing).
  • Blocking on acquiring a mutex.
  • Performing a CPU bound computation, like cryptographic encryption or
    decryption.

One option for dealing with blocking operations in an asynchronous context
is to use a thread pool dedicated to performing these operations. This not
ideal as it requires bidirectional message passing as well as a channel to
communicate which adds a level of buffering.

Instead, blocking hands off the responsiblity of processing the work queue
to another thread. This hand off is light compared to a channel and does not
require buffering.

This patch adds a `blocking` to `tokio-threadpool`. This function serves
as a way to annotate sections of code that will perform blocking
operations. This informs the thread pool that an additional thread needs
to be spawned to replace the current thread, which will no longer be
able to process the work queue.
@@ -255,6 +289,22 @@ impl Builder {
self
}

/// TODO: Dox
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

@@ -99,11 +103,14 @@ impl Builder {

Builder {
pool_size: num_cpus,
max_blocking: 100,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason this isn't stored on the Config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Config object is passed into the pool. Values not in Config don't need to be stored and are just used to initialize state (in this case, populate the stack of sleeping threads).

let t2 = t1.clone();

mem::forget(t1);
let t1 = Forget::new(unsafe { Arc::from_raw(ptr) });

// t2 is forgotten so that the fn exits without decrementing the ref
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment still refers to t2 but that variable has been removed.

use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed};

#[derive(Debug)]
pub(crate) struct Backup {
Copy link
Member

@olix0r olix0r Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are Backup's responsibilities? Why is it called Backup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done... hopefully it makes sense.

Backup isn't a great name though...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

@kpp kpp mentioned this pull request Apr 14, 2018
@carllerche
Copy link
Member Author

Ok, I think this should be ready to be merged.

cc/ @olix0r and @seanmonstar

When calling `blocking` from outside of a threadpool, error instead of
panic.
@seanmonstar
Copy link
Member

I can look at the code a little later, but my first thought after reading the motivation is that it'd be much more convincing with measurements. Blah blah premature optimizations blah :)

I'd be worried about a few things here:

  • Does the existence of the implementation affect the performance of the runtime if I never use it?
  • What is the impact of moving a worker into blocking mode that had many other tasks that need to be stolen?

A comparison of this new feature set versus a typical thread pool with a channel would be useful. More so two, one with occasional blocking calls and many smaller async tasks, and another that just slams blocking calls.

@carllerche
Copy link
Member Author

carllerche commented Apr 14, 2018

but my first thought after reading the motivation is that it'd be much more convincing with measurements. Blah blah premature optimizations

I have not done any measurements personally, but I would argue for precedent. This is the strategy employed by both Go, Erlang, and Java (ForkJoin) when dealing with the same situation. Also intuition matches up as there is less work required in this strategy (no allocation for a queue, less data needs to be moved).

Does the existence of the implementation affect the performance of the runtime if I never use it?

No, none of the code paths / logic impact normal usage. They all happen once you call blocking.

What is the impact of moving a worker into blocking mode that had many other tasks that need to be stolen?

I'm not sure I follow exactly the question, but there should be no impact. This added logic does not impact the ability to steal tasks from the worker. Also, once a worker enters blocking mode, its work queue is handed off to thread standing by, so there is minimal lag in processing the work queue of the worker that just transitioned to blocking mode.

@carllerche
Copy link
Member Author

A comparison of this new feature set versus a typical thread pool with a channel would be useful. More so two, one with occasional blocking calls and many smaller async tasks, and another that just slams blocking calls.

I don't think benchmarks at this point would be super useful as I have not put a huge emphasis on tuning for max performance. Again, at this point I am heavily leaning on precedent and established best practice and getting the features implemented so that there is a solid base upon which tuning can be done.

@carllerche
Copy link
Member Author

Also, there is the fact that, w/ blocking, you can work w/ files using Read / Write, which means you are not forced to move all reads into a Vec<u8> (or Bytes) just to send it over a channel back to the primary task.

You could, of course, move greater amounts of logic from your task to the blocking thread pool, but then you start losing the advantages of the runtime.

I guess, this is all to say, I'm not sure what a fair comparison of this strategy vs. a channel would be.

PS: I have no idea why AppVeyor is unable to download Rust.

@seanmonstar
Copy link
Member

once a worker enters blocking mode, its work queue is handed off to thread standing by, so there is minimal lag in processing the work queue of the worker that just transitioned to blocking mode.

What I mean is if that thread had many async tasks to poll, entering blocking mode means message passing to steal the tasks, and those other workers may have already been loaded. Worse, if the blocking operation didn't actually block (say the file was in cache), then this worker needs to spend time stealing tasks back immediately. I just have a perhaps wrongly placed fear that usage of this could have more costs than saved, in some cases, and I'm scared of giving that power to a user. The fact that everywhere in Scala docs around async stuff has gigantic warnings to not block means users will mess it up anyways.

As for no measurements, I know they aren't here yet, hence why I said seeing them would be more convincing. In many projects, especially where the highest performance is required, measurements are essential for proving fundamental changes.

I don't maintain Tokio, so of course I couldn't demand them. I would however require them before using them in hyper. 🤷

@carllerche
Copy link
Member Author

What I mean is if that thread had many async tasks to poll, entering blocking mode means message passing to steal the tasks, and those other workers may have already been loaded. Worse, if the blocking operation didn't actually block (say the file was in cache), then this worker needs to spend time stealing tasks back immediately. I just have a perhaps wrongly placed fear that usage of this could have more costs than saved, in some cases, and I'm scared of giving that power to a user.

Ah no, that isn't at all what happens. blocking is very light and does not impact any other worker threads (processing work queues).

  1. It sets a usize value in an existing cell
  2. Notifies an idle thread (or spawns one if none are available).
  3. Performs blocking work
  4. Become an "idle" thread that can be notified by a blocking call in another thread.

The hand off process is just an atomic flag set to signal the hand off and a condvar signal if the idle thread is sleeping. At this point, it becomes the old worker thread and continues processing the work queue from the point the old thread left off.

The entire hand off process is allocation free and completes and is not dependent on the number of pending tasks.

Re numbers, it would be trivial to show a comparison that significantly favors this strategy. I don't know if it would be worth while.

For example, repeatedly reading n bytes from a file (same file). With a blocking thread pool, the process would be:

  1. Send a message requesting reading the byte, include an allocated oneshot to get the response. This allocates a for the queue node.
  2. Read the bytes from the file into an allocated buffer.
  3. Send the value back in the allocated oneshot.

vs.

  1. Perform hand off described above.
  2. Immediately read into a stack allocated buffer.

If you think this is worthwhile, I can do it. Part of the win of blocking is simply that you can do read / writes with &mut [u8].

/// token (`WorkerId`) is handed off to that running thread. If none are found,
/// a new thread is spawned.
///
/// This state is manages the exchange. A thread that is idle, not assigned to a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This state manages the exchange"

Copy link
Member

@olix0r olix0r left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally lgtm! a few clarity comments left.

i've been running tests in a loop for most of the past 24 hours without any failures.

use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed};

#[derive(Debug)]
pub(crate) struct Backup {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

///
/// * Performing synchronous file operations (reading and writing).
/// * Blocking on acquiring a mutex.
/// * Performing a CPU bound computation, like cryptographic encryption or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"CPU-bound"

/// Set the maximum number of concurrent blocking sections.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
/// this value on the smaller side.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the number of blocking calls exceeds max_blocking? Should be called out here.


/// Execute function `f` before each thread stops.
///
/// This is intended for bookkeeping and monitoring use cases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this cannot be guaranteed to run (whereas after_start is guaranteed to run before work can be scheduled?). Maybe worth mentioning explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread exits, it should run (as long as the threadpool lib itself doesn't panic).

What sort of guarantee would you expect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, i'm wondering how a process needs to manage the threadpool during process shutdown. Servers typically are waiting on some set of futures that complete once the process has received a shutdown signal. Once these futures complete, is there a way to shutdown the pool gracefully? If the pool is just dropped, are threads guaranteed to exit cleanly before the main thread exits?

@olix0r
Copy link
Member

olix0r commented Apr 15, 2018

to confirm:

The hand off process is just an atomic flag set to signal the hand off and a condvar signal if the idle thread is sleeping. At this point, it becomes the old worker thread and continues processing the work queue from the point the old thread left off.

This means that timers/reactors/etc that have thread-local state are migrated as part of this, correct?

@carllerche
Copy link
Member Author

@seanmonstar I guess my question is what do you want to measure and what do you hope to learn?

@carllerche
Copy link
Member Author

@seanmonstar I committed a benchmark that tests blocking vs. using threadpool to offload.

running 2 tests
test blocking::cpu_bound        ... bench:   2,849,965 ns/iter (+/- 234,371)
test message_passing::cpu_bound ... bench:   5,638,165 ns/iter (+/- 379,287)

And nothing has been tuned on my end yet.

I would expect the improvements be even more pronounced depending on the use case. A big part of the win of using blocking is that there is no need to message pass arguments or return values.

@carllerche
Copy link
Member Author

@olix0r Yes, the timer, reactor, etc... (i.e, the Park implementation) is migrated over to the new thread. Threadpool requires that T: Park + Send, so this is safe.

@carllerche carllerche merged commit 61d635e into tokio-rs:master Apr 15, 2018
@carllerche carllerche deleted the threadpool-blocking branch May 11, 2018 18:46
@jonhoo
Copy link
Contributor

jonhoo commented May 23, 2018

Should the blocking! macro also be re-exported through tokio::executor::thread_pool?

@carllerche
Copy link
Member Author

@jonhoo I'm actually thinking that tokio::executor should be deprecated in favor of accessing the sub crates directly. I'm not sure if there are many cases for that module to be used in end applications (vs. using the runtime directly).

@legokichi
Copy link

pub(crate) fn transition_from_blocking(&self) {
// TODO: Attempt to take ownership of the worker again.
}

When is this implemented?

@carllerche
Copy link
Member Author

It is not implemented but is not critical to the execution. It would be a perf improvement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants