-
Notifications
You must be signed in to change notification settings - Fork 506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ThreadPool::broadcast #492
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this overall. In terms of what we are committing to publicly, I suppose it's this:
You have the ability to schedule a closure to run exactly once on each thread. The priority is sort of undefined (as is typical for Rayon).
That doesn't seem like too much of a commitment. The main question is whether that behavior is well-defined if the size of the threadpool were to ever become dynamic, but I am more and more dubious of such a thing ever happening, and certainly not without some form of opt-in.
I wonder if it'd be useful to give the closure the number of threads as well? I guess that is readily accessible from the pool
, so that's why we don't, right?
One thing to strengthen this is that racing broadcasts will be consistently ordered. That is, given simultaneous broadcasts A and B, then if one thread sees A before B, they all will, and vice versa. So for example, if both are setting the same TLS, every thread will set them in the same order. It doesn't guarantee that every A will complete before any B starts, but you could use a We don't have to promise this, but I think it's powerful if we do.
I'm also skeptical of this happening. Internally, we could at least make sure that threads are never removed while they have a broadcast waiting, but I guess new threads would just be left out.
Yeah, it's easy to get, although that could be racy if there are dynamic threads. We could supply that value to indicate the number of times the broadcast will be called -- the number we're actually queuing -- independent of any new threads that may pop up. |
So actually, I think this is a case where it would make sense for us to supply a |
550: add bridge from Iterator to ParallelIterator r=cuviper a=QuietMisdreavus Half of #46 This started getting reviewed in QuietMisdreavus/polyester#6, but i decided to move my work to Rayon proper. This PR adds a new trait, `AsParallel`, an implementation on `Iterator + Send`, and an iterator adapter `IterParallel` that implements `ParallelIterator` with a similar "cache items as you go" methodology as Polyester. I introduced a new trait because `ParallelIterator` was implemented on `Range`, which is itself an `Iterator`. The basic idea is that you would start with a quick sequential `Iterator`, call `.as_parallel()` on it, and be able to use `ParallelIterator` adapters after that point, to do more expensive processing in multiple threads. The design of `IterParallel` is like this: * `IterParallel` defers background work to `IterParallelProducer`, which implements `UnindexedProducer`. * `IterParallelProducer` will split as many times as there are threads in the current pool. (I've been told that #492 is a better way to organize this, but until that's in, this is how i wrote it. `>_>`) * When folding items, `IterParallelProducer` keeps a `Stealer` from `crossbeam-deque` (added as a dependency, but using the same version as `rayon-core`) to access a deque of items that have already been loaded from the iterator. * If the `Stealer` is empty, a worker will attempt to lock the Mutex to access the source `Iterator` and the `Deque`. * If the Mutex is already locked, it will call `yield_now`. The implementation in polyester used a `synchronoise::SignalEvent` but i've been told that worker threads should not block. In lieu of #548, a regular spin-loop was chosen instead. * If the Mutex is available, the worker will load a number of items from the iterator (currently (number of threads * number of threads * 2)) before closing the Mutex and continuing. * (If the Mutex is poisoned, the worker will just... stop. Is there a recommended approach here? `>_>`) This design is effectively a first brush, has [the same caveats as polyester](https://docs.rs/polyester/0.1.0/polyester/trait.Polyester.html#implementation-note), probably needs some extra features in rayon-core, and needs some higher-level docs before i'm willing to let it go. However, i'm putting it here because it was not in the right place when i talked to @cuviper about it last time. Co-authored-by: QuietMisdreavus <grey@quietmisdreavus.net> Co-authored-by: Niko Matsakis <niko@alum.mit.edu>
fd4cad2
to
4e05307
Compare
I've rebased and added a context type. Let me know what you think! |
I'd like to use this in rustc to collect thread local data (complements my |
@Zoxc I've rebased this again, if you'd like to try it out with rustc. |
There's 2 variations on this which may be useful too. I wonder if there's room for some generic code here. |
Those variations make sense to me. The They could all use the same injection queues, at least. |
6ec01f6
to
b6f72f4
Compare
I rebased again and added The last thing I'm thinking about is a specification of when a broadcast will run. Currently, it runs when the local deque is empty, before looking for jobs elsewhere. I can think of multiple options, from high to low "priority":
I'm not certain that the current choice is the best. These could all be supported if we kept distinct queues, and perhaps an enum argument on the broadcast methods to indicate the user's choice, but maybe that's overkill. |
It sounds like something that could be tuned, but I agree that it sounds like overkill because people won't know what to pick to get it to Just Work. You could support an enum, or not, but the most important thing is that it's as fast as possible both when it's the only thing executing on the thread pool and when the thread pool is juggling lots and lots of work. Basically, even if you do have an enum for user choice, please have (and recommend) an API where the user does not have to choose, so that they can benefit from a tried and tested compromise. "as fast as possible" may mean many things. Broadcasts completing before other work, or a compromise between timely execution and not ruining the efficiency of other tasks running on the thread pool. I'd personally advocate for the latter as a sane default. |
A broadcast runs the closure on every thread in the pool, then collects the results. It's scheduled somewhat like a very soft interrupt -- it won't preempt a thread's local work, but will run before it goes to steal from any other threads. This can be used when you want to precisely split your work per-thread, or to set or retrieve some thread-local data in the pool, e.g. rayon-rs#483.
Here's the current API summary: pub fn broadcast<OP, R>(op: OP) -> Vec<R>
where
OP: Fn(BroadcastContext<'_>) -> R + Sync,
R: Send;
pub fn spawn_broadcast<OP>(op: OP)
where
OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static;
pub struct BroadcastContext<'a> { .. }
impl<'a> BroadcastContext<'a> {
pub fn index(&self) -> usize;
pub fn num_threads(&self) -> usize;
}
impl<'a> fmt::Debug for BroadcastContext<'a> { .. }
impl<'scope> Scope<'scope> {
pub fn spawn_broadcast<BODY>(&self, body: BODY)
where
BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope;
}
impl<'scope> ScopeFifo<'scope> {
pub fn spawn_broadcast<BODY>(&self, body: BODY)
where
BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope;
}
impl ThreadPool {
pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
where
OP: Fn(BroadcastContext<'_>) -> R + Sync,
R: Send;
pub fn spawn_broadcast<OP>(&self, op: OP)
where
OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static;
} I think that's pretty safe, and the current "priority" (before remote work-stealing) still feels like a reasonable default. |
bors r+ |
A broadcast runs the closure on every thread in the pool, then collects
the results. It's scheduled somewhat like a very soft interrupt -- it
won't preempt a thread's local work, but will run before it goes to
steal from any other threads.
This can be used when you want to precisely split your work per-thread,
or to set or retrieve some thread-local data in the pool, e.g. #483.