Skip to content

Commit

Permalink
Add Channel constructor keyword argument to @spawn new Task in para…
Browse files Browse the repository at this point in the history
…llel (#32872)

Add a parameter to the Channel constructor to allow the Tasks it creates
to be scheduled on multiple threads.

Examples:
```julia
    # Spawn a Task to handle each input request
    ch = Channel{String}(0, spawn=true) do ch
        handle_request(ch, request)
    end
```

Adds a manual check that users don't use the `spawn=` keyword argument
with the deprecated keyword arguments `ctype=` or `csize=`.
  • Loading branch information
NHDaly authored and JeffBezanson committed Aug 15, 2019
1 parent 1f564ce commit af1979b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ Multi-threading changes
([#32309], [#32174], [#31981], [#32421]).
* The global random number generator (`GLOBAL_RNG`) is now thread-safe (and thread-local) ([#32407]).
* New experimental `Threads.@spawn` macro that runs a task on any available thread ([#32600]).
* New `Channel(f::Function)` constructor param (`spawn=true`) to schedule the created Task on
any available thread, matching the behavior of `Threads.@spawn` ([#32872]).
* Simplified the `Channel` constructor, which is now easier to read and more idiomatic julia.
The old constructor (which used kwargs) is still available, but use is discouraged ([#30855], [#32818]).
The old constructor (which used keyword arguments) is still available, but use is discouraged.
([#30855], [#32818]).

Build system changes
--------------------
Expand Down
39 changes: 30 additions & 9 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Channel(sz=0) = Channel{Any}(sz)

# special constructors
"""
Channel{T=Any}(func::Function, size=0; taskref=nothing)
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
Create a new task from `func`, bind it to a new channel of type
`T` and size `size`, and schedule the task, all in a single call.
Expand All @@ -65,6 +65,9 @@ Create a new task from `func`, bind it to a new channel of type
If you need a reference to the created task, pass a `Ref{Task}` object via
the keyword argument `taskref`.
If `spawn = true`, the Task created for `func` may be scheduled on another thread
in parallel, equivalent to creating a task via [`Threads.@spawn`](@ref).
Return a `Channel`.
# Examples
Expand Down Expand Up @@ -105,11 +108,12 @@ true
```
!!! compat "Julia 1.3"
This constructor was added in Julia 1.3. Earlier versions of Julia used kwargs
to set `size` and `T`, but those constructors are deprecated.
The `spawn=` parameter was added in Julia 1.3. This constructor was added in Julia 1.3.
In earlier versions of Julia, Channel used keyword arguments to set `size` and `T`, but
those constructors are deprecated.
```jldoctest
julia> chnl = Channel{Char}(1) do ch
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
Expand All @@ -120,20 +124,37 @@ julia> String(collect(chnl))
"hello world"
```
"""
function Channel{T}(func::Function, size=0; taskref=nothing) where T
function Channel{T}(func::Function, size=0; taskref=nothing, spawn=false) where T
chnl = Channel{T}(size)
task = Task(() -> func(chnl))
task.sticky = !spawn
bind(chnl, task)
yield(task) # immediately start it

if spawn
schedule(task) # start it on (potentially) another thread
else
yield(task) # immediately start it, yielding the current thread
end
isa(taskref, Ref{Task}) && (taskref[] = task)
return chnl
end
Channel(func::Function, args...; kwargs...) = Channel{Any}(func, args...; kwargs...)

# This constructor is deprecated as of Julia v1.3, and should not be used.
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
return Channel{ctype}(func, csize; taskref=taskref)
# (Note that this constructor also matches `Channel(::Function)` w/out any kwargs, which is
# of course not deprecated.)
# We use `nothing` default values to check which arguments were set in order to throw the
# deprecation warning if users try to use `spawn=` with `ctype=` or `csize=`.
function Channel(func::Function; ctype=nothing, csize=nothing, taskref=nothing, spawn=nothing)
# The spawn= keyword argument was added in Julia v1.3, and cannot be used with the
# deprecated keyword arguments `ctype=` or `csize=`.
if (ctype !== nothing || csize !== nothing) && spawn !== nothing
throw(ArgumentError("Cannot set `spawn=` in the deprecated constructor `Channel(f; ctype=Any, csize=0)`. Please use `Channel{T=Any}(f, size=0; taskref=nothing, spawn=false)` instead!"))
end
# Set the actual default values for the arguments.
ctype === nothing && (ctype = Any)
csize === nothing && (csize = 0)
spawn === nothing && (spawn = false)
return Channel{ctype}(func, csize; taskref=taskref, spawn=spawn)
end

closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
7 changes: 7 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ end
@test isopen(c)
@test collect(c) == 1:100
end
@testset "Multithreaded task constructors" begin
taskref = Ref{Task}()
c = Channel(spawn=true, taskref=taskref) do c; put!(c, 0); end
# Test that the task is using the multithreaded scheduler
@test taskref[].sticky == false
@test collect(c) == [0]
end

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
Expand Down

0 comments on commit af1979b

Please sign in to comment.