Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Channel constructor keyword argument to @spawn new Task in parallel #32872

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ Multi-threading changes
([#32309], [#32174], [#31981], [#32421]).
* The global random number generator (`GLOBAL_RNG`) is now thread-safe (and thread-local) ([#32407]).
* New experimental `Threads.@spawn` macro that runs a task on any available thread ([#32600]).
* New `Channel(f::Function)` constructor param (`spawn=true`) to schedule the created Task on
any available thread, matching the behavior of `Threads.@spawn` ([#32872]).
* Simplified the `Channel` constructor, which is now easier to read and more idiomatic julia.
The old constructor (which used kwargs) is still available, but use is discouraged ([#30855], [#32818]).
The old constructor (which used keyword arguments) is still available, but use is discouraged.
([#30855], [#32818]).

Build system changes
--------------------
Expand Down
39 changes: 30 additions & 9 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Channel(sz=0) = Channel{Any}(sz)

# special constructors
"""
Channel{T=Any}(func::Function, size=0; taskref=nothing)
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)

Create a new task from `func`, bind it to a new channel of type
`T` and size `size`, and schedule the task, all in a single call.
Expand All @@ -65,6 +65,9 @@ Create a new task from `func`, bind it to a new channel of type
If you need a reference to the created task, pass a `Ref{Task}` object via
the keyword argument `taskref`.

If `spawn = true`, the Task created for `func` may be scheduled on another thread
in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref).

Return a `Channel`.

# Examples
Expand Down Expand Up @@ -105,11 +108,12 @@ true
```

!!! compat "Julia 1.3"
This constructor was added in Julia 1.3. Earlier versions of Julia used kwargs
to set `size` and `T`, but those constructors are deprecated.
The `spawn=` parameter was added in Julia 1.3. This constructor was added in Julia 1.3.
NHDaly marked this conversation as resolved.
Show resolved Hide resolved
In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but
those constructors are deprecated.

```jldoctest
julia> chnl = Channel{Char}(1) do ch
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
Expand All @@ -120,20 +124,37 @@ julia> String(collect(chnl))
"hello world"
```
"""
function Channel{T}(func::Function, size=0; taskref=nothing) where T
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T
NHDaly marked this conversation as resolved.
Show resolved Hide resolved
chnl = Channel{T}(size)
task = Task(() -> func(chnl))
task.sticky = !spawn
bind(chnl, task)
yield(task) # immediately start it

if spawn
schedule(task) # start it on (potentially) another thread
else
yield(task) # immediately start it, yielding the current thread
end
isa(taskref, Ref{Task}) && (taskref[] = task)
return chnl
end
Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs...)

# This constructor is deprecated as of Julia v1.3, and should not be used.
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
return Channel{ctype}(func, csize; taskref=taskref)
# (Note that this constructor also matches `Channel(::Function)` w/out any kwargs, which is
# of course not deprecated.)
# We use `nothing` default values to check which arguments were set in order to throw the
# deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`.
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing)
# The spawn= keyword argument was added in Julia v1.3, and cannot be used with the
# deprecated keyword arguments `ctype=` or `csize=`.
if (ctype !== nothing || csize !== nothing) && spawn !== nothing
throw(ArgumentError("Cannot set `spawn=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false)` instead!"))
end
# Set the actual default values for the arguments.
ctype === nothing && (ctype = Any)
csize === nothing && (csize = 0)
spawn === nothing && (spawn = false)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn)
end

closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
7 changes: 7 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ end
@test isopen(c)
@test collect(c) == 1:100
end
@testset "Multithreaded task constructors" begin
taskref = Ref{Task}()
c = Channel(spawn=true, taskref=taskref) do c; put!(c, 0); end
# Test that the task is using the multithreaded scheduler
@test taskref[].sticky == false
@test collect(c) == [0]
end
NHDaly marked this conversation as resolved.
Show resolved Hide resolved

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
Expand Down