-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Threads.foreach
for convenient multithreaded Channel consumption
#34543
Conversation
threaded_foreach
for convenient multithreaded Channel consumption
It'd be really nice if we had |
base/channels.jl
Outdated
If `async` is `false`, this function will `wait` for all internally spawned tasks | ||
to complete before returning. | ||
""" | ||
function threaded_foreach(f, channel::Channel; ntasks=Threads.nthreads(), async=true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use the name Threads.foreach
, like we have Iterators.filter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ref: @vchuravy suggested to use C++ -like policy-based interface #34185 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use the name Threads.foreach, like we have Iterators.filter?
That would be nice.
One tricky detail (but not an actual problem) is that writing code that uses Threads.@spawn
from within Threads
is pretty wonky right now because the macro depends on functions that aren't defined until later (in tasks.jl
) and Channel
s also aren't defined until later. Clearly can churn things around to be defined in the right order but might involve some bikeshedding to untangle the dependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I didn't know that. I guess one way to work around the problem is to just declare the "stub" function like this?
module Threads
function foreach end
...
end
# later
function Threads.foreach(f, channel::Channel)
...
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed the above change, since I like Threads.foreach
and the stub approach is way less hassle/churn than actually shuffling code :p
Folks should, of course, feel free to continue bikeshedding the name.
base/channels.jl
Outdated
tasks = map(1:ntasks) do _ | ||
Threads.@spawn begin | ||
for item in channel | ||
wait(Threads.@spawn f(item)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a bit too tangent, but FYI I noticed you can get a bit more speedup JuliaFolds/Transducers.jl#123 (comment) if you collect the elements into a buffer Vector
and then iterate over it. You may want to improve latency instead of throughput so I guess it's not always a better choice. But I wonder if it makes sense to add a "buffer size" option to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This indeed sounds like a useful option! I'd probably want to fiddle around with it in a follow-up PR, though.
threaded_foreach
for convenient multithreaded Channel consumptionThreads.foreach
for convenient multithreaded Channel consumption
Added some extremely basic tests - let me know if folks have other testing ideas and I'll pop 'em in. |
In this vein, I feel like I often want shorthand It might be nice to be able to write e.g. |
It looks like |
What this API should do for error handling? If the user function function Threads.foreach(f, channel::Channel; ntasks = Threads.nthreads())
@sync for _ in 1:ntasks
Threads.@spawn for x in channel
@sync Threads.@spawn try
f(x)
catch
close(channel)
rethrow()
end
end
end
end One question is if it is OK to close the given If we are to include generic non- function Threads.foreach(f, itr; ntasks = Threads.nthreads())
closing = Threads.Atomic{Bool}(false)
@sync begin
if itr isa Channel
# Remove this branch if we are not closing given channel
# `itr` on error.
channel = itr
else
channel = Channel{eltype(itr)}()
# Not using `Channel{}(f)` to avoid reporting error twice:
@async try
for x in itr
try
put!(channel, x)
catch
closing[] && return
closing[] = true
close(channel)
rethrow()
end
end
finally
close(channel)
end
end
for _ in 1:ntasks
Threads.@spawn for x in channel
@sync Threads.@spawn try
f(x)
catch
closing[] = true
close(channel)
rethrow()
end
end
end
end
end (I think we can avoid using |
Yeah, that'd probably be the right thing to do.
Eh, this would go against my expectation as a caller - is this generally desirable for a reason I might be missing? It makes sense to me if a use case treated "dropped" elements (i.e. elements removed from the channel but not fully consumed due to the error) as a corruption or something like that, but in that case, the caller should probably just wrap this in a try-catch and |
Oh, you probably mean to prevent other threads from taking objects before we get a chance to kill them? Hmm. Yeah, it would be ideal if there were a different mechanism for that IMO though I obviously haven't thought about this much 😅 |
It was a genuine question as I don't know which one is better.
Ah, I think this is a good point. It's easy to mimic close-on-error behavior on top of non-closing |
Bump. @JeffBezanson thoughts on this PR (especially the error-handling discussion above?) |
Re-bump. It would be really nice to be able to use this. |
(I'm thinking to add this overload to |
Go for it! 😁 |
@JeffBezanson / @vtjnash if either of you do get some time to looks at this, IMO it's essentially mergeable as-is pending approval. The only remaining decision point AFAICT (discussed above) is whether we should/need to add some exception handling, i.e. to prevent threads from taking objects from the channel before we get a chance to kill them in the event of an error. I'm not sure we do - seems like the caller can handle a good chunk of that if they need to - but would love to hear your thoughts, especially if you have any ideas that would make exception handling here more ergonomic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than the minor point about the test.
Assuming no further feedback, it seems like |
Co-authored-by: Takafumi Arakaki <aka.tkf@gmail.com>
Co-authored-by: Alex Arslan <ararslan@comcast.net>
Co-authored-by: Takafumi Arakaki <aka.tkf@gmail.com>
Co-authored-by: Takafumi Arakaki <aka.tkf@gmail.com>
Co-authored-by: Valentin Churavy <vchuravy@users.noreply.github.com>
…sary type instabilities
…de a spot in the load order for Threads overloads
52efaa5
to
dcf47e1
Compare
JuliaLang#34543) Co-authored-by: Takafumi Arakaki <aka.tkf@gmail.com> Co-authored-by: Alex Arslan <ararslan@comcast.net> Co-authored-by: Valentin Churavy <vchuravy@users.noreply.github.com>
This is a spiritual successor to #34185, but is a bit more minimal. The motivation came about when I realized #34185 was a wonky solution to a pattern that had started cropping up for me: I wanted a multithreaded
foreach(f, channel)
where I could communicate multiple results perf
call byput!
ing them individually to a closed-overChannel
(so not a direct map). While #34185 could be hackily employed for this, I thought it was overkill so I ripped this out instead. Once I started using it, I realized I liked it a bit more.Unlike #34185, it's only defined on
Channel
s, not arbitrary iterators. It could be extended to arbitrary iterators if we wanted via #34185's locking approach; I just thought it'd be better to keep things simple from the get-go. I could also loosen the dispatch constraint toAbstractChannel
and just document the expectation of threadsafe iteration.cc @tkf, who made a wonderful Transducers implementation of #34185