Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Channel{T}(f::Function) constructors. #30855

Merged
merged 6 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Representation of a channel passing objects of type `T`.
abstract type AbstractChannel{T} end

"""
Channel{T}(sz::Int)
Channel{T}(sz::Int=0)

Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects
of type `T`.
Expand All @@ -19,8 +19,12 @@ And vice-versa.

Other constructors:

* `Channel()`: default constructor, equivalent to `Channel{Any}(0)`
* `Channel(Inf)`: equivalent to `Channel{Any}(typemax(Int))`
* `Channel(sz)`: equivalent to `Channel{Any}(sz)`

!!! compat "Julia 1.3"
The default constructor `Channel()` and default `sz=0` were added in Julia 1.3.
"""
mutable struct Channel{T} <: AbstractChannel{T}
cond_take::Threads.Condition # waiting for data to become available
Expand All @@ -32,7 +36,7 @@ mutable struct Channel{T} <: AbstractChannel{T}
data::Vector{T}
sz_max::Int # maximum size of channel

function Channel{T}(sz::Integer) where T
function Channel{T}(sz::Integer = 0) where T
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
Expand All @@ -48,6 +52,7 @@ function Channel{T}(sz::Float64) where T
return Channel{T}(sz)
end
Channel(sz) = Channel{Any}(sz)
Channel() = Channel{Any}(0)

# special constructors
"""
Expand Down Expand Up @@ -95,6 +100,25 @@ Hello
julia> istaskdone(taskref[])
true
```

NHDaly marked this conversation as resolved.
Show resolved Hide resolved
!!! compat "Julia 1.3"
The following constructors were added in Julia 1.3.

Other constructors:
* `Channel{T}(func::Function, sz=0)`
* `Channel{T}(func::Function; csize=0, taskref=nothing)`

```jldoctest
julia> chnl = Channel{Char}(1) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(sz_max:1,sz_curr:1)

julia> String(collect(chnl))
"hello world"
```
"""
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
chnl = Channel{ctype}(csize)
Expand All @@ -105,6 +129,12 @@ function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
isa(taskref, Ref{Task}) && (taskref[] = task)
return chnl
end
function Channel{T}(f::Function, sz=0) where T
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes a method overwrite warning. This signature can be Channel{T}(f::Function, sz).

Copy link
Member Author

@NHDaly NHDaly Aug 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry! I opened #32818 to fix it!

Thanks, good catch! Sorry I didn't notice (the error comes up while building and it's easy to miss. I'll keep an eye out for that in the future!) :)

return Channel(f, csize=sz, ctype=T)
end
function Channel{T}(f::Function; csize=0, taskref=nothing) where T
return Channel(f, csize=csize, ctype=T, taskref=taskref)
end


closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
39 changes: 38 additions & 1 deletion test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ using Random
end

@testset "various constructors" begin
c = Channel()
@test eltype(c) == Any
@test c.sz_max == 0

c = Channel(1)
@test eltype(c) == Any
@test put!(c, 1) == 1
Expand All @@ -25,17 +29,50 @@ end
@test eltype(c) == Int
@test_throws MethodError put!(c, "Hello")

c = Channel{Int}()
@test eltype(c) == Int
@test c.sz_max == 0

c = Channel{Int}(Inf)
@test eltype(c) == Int
pvals = map(i->put!(c,i), 1:10^6)
tvals = Int[take!(c) for i in 1:10^6]
@test pvals == tvals

@test_throws MethodError Channel()
@test_throws ArgumentError Channel(-1)
@test_throws InexactError Channel(1.5)
end

@testset "Task constructors" begin
c = Channel(ctype=Float32, csize=2) do c; map(i->put!(c,i), 1:100); end
@test eltype(c) == Float32
@test c.sz_max == 2
@test isopen(c)
@test collect(c) == 1:100

c = Channel{Int}() do c; map(i->put!(c,i), 1:100); end
@test eltype(c) == Int
@test c.sz_max == 0
@test collect(c) == 1:100

c = Channel() do c; put!(c, 1); put!(c, "hi") end
@test c.sz_max == 0
@test collect(c) == [1, "hi"]

c = Channel{Int}(Inf) do c; put!(c,1); end
@test eltype(c) == Int
@test c.sz_max == typemax(Int)

taskref = Ref{Task}()
c = Channel{Int}(csize=0, taskref=taskref) do c; put!(c, 0); end
@test eltype(c) == Int
@test c.sz_max == 0
@test istaskstarted(taskref[])
@test !istaskdone(taskref[])
take!(c); wait(taskref[])
@test istaskdone(taskref[])
end

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
c = Channel{Int}(sz)
Expand Down