Skip to content

Commit

Permalink
Merge pull request #30855 from NHDaly/Channel-constructors
Browse files Browse the repository at this point in the history
Add Channel{T}(f::Function) constructors.
  • Loading branch information
vchuravy authored Aug 6, 2019
2 parents 7f2922e + 64109f4 commit e891bca
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
34 changes: 32 additions & 2 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Representation of a channel passing objects of type `T`.
abstract type AbstractChannel{T} end

"""
Channel{T}(sz::Int)
Channel{T}(sz::Int=0)
Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects
of type `T`.
Expand All @@ -19,8 +19,12 @@ And vice-versa.
Other constructors:
* `Channel()`: default constructor, equivalent to `Channel{Any}(0)`
* `Channel(Inf)`: equivalent to `Channel{Any}(typemax(Int))`
* `Channel(sz)`: equivalent to `Channel{Any}(sz)`
!!! compat "Julia 1.3"
The default constructor `Channel()` and default `sz=0` were added in Julia 1.3.
"""
mutable struct Channel{T} <: AbstractChannel{T}
cond_take::Threads.Condition # waiting for data to become available
Expand All @@ -32,7 +36,7 @@ mutable struct Channel{T} <: AbstractChannel{T}
data::Vector{T}
sz_max::Int # maximum size of channel

function Channel{T}(sz::Integer) where T
function Channel{T}(sz::Integer = 0) where T
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
Expand All @@ -48,6 +52,7 @@ function Channel{T}(sz::Float64) where T
return Channel{T}(sz)
end
Channel(sz) = Channel{Any}(sz)
Channel() = Channel{Any}(0)

# special constructors
"""
Expand Down Expand Up @@ -95,6 +100,25 @@ Hello
julia> istaskdone(taskref[])
true
```
!!! compat "Julia 1.3"
The following constructors were added in Julia 1.3.
Other constructors:
* `Channel{T}(func::Function, sz=0)`
* `Channel{T}(func::Function; csize=0, taskref=nothing)`
```jldoctest
julia> chnl = Channel{Char}(1) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(sz_max:1,sz_curr:1)
julia> String(collect(chnl))
"hello world"
```
"""
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
chnl = Channel{ctype}(csize)
Expand All @@ -105,6 +129,12 @@ function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
isa(taskref, Ref{Task}) && (taskref[] = task)
return chnl
end
function Channel{T}(f::Function, sz=0) where T
return Channel(f, csize=sz, ctype=T)
end
function Channel{T}(f::Function; csize=0, taskref=nothing) where T
return Channel(f, csize=csize, ctype=T, taskref=taskref)
end


closed_exception() = InvalidStateException("Channel is closed.", :closed)
Expand Down
39 changes: 38 additions & 1 deletion test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ using Random
end

@testset "various constructors" begin
c = Channel()
@test eltype(c) == Any
@test c.sz_max == 0

c = Channel(1)
@test eltype(c) == Any
@test put!(c, 1) == 1
Expand All @@ -25,17 +29,50 @@ end
@test eltype(c) == Int
@test_throws MethodError put!(c, "Hello")

c = Channel{Int}()
@test eltype(c) == Int
@test c.sz_max == 0

c = Channel{Int}(Inf)
@test eltype(c) == Int
pvals = map(i->put!(c,i), 1:10^6)
tvals = Int[take!(c) for i in 1:10^6]
@test pvals == tvals

@test_throws MethodError Channel()
@test_throws ArgumentError Channel(-1)
@test_throws InexactError Channel(1.5)
end

@testset "Task constructors" begin
c = Channel(ctype=Float32, csize=2) do c; map(i->put!(c,i), 1:100); end
@test eltype(c) == Float32
@test c.sz_max == 2
@test isopen(c)
@test collect(c) == 1:100

c = Channel{Int}() do c; map(i->put!(c,i), 1:100); end
@test eltype(c) == Int
@test c.sz_max == 0
@test collect(c) == 1:100

c = Channel() do c; put!(c, 1); put!(c, "hi") end
@test c.sz_max == 0
@test collect(c) == [1, "hi"]

c = Channel{Int}(Inf) do c; put!(c,1); end
@test eltype(c) == Int
@test c.sz_max == typemax(Int)

taskref = Ref{Task}()
c = Channel{Int}(csize=0, taskref=taskref) do c; put!(c, 0); end
@test eltype(c) == Int
@test c.sz_max == 0
@test istaskstarted(taskref[])
@test !istaskdone(taskref[])
take!(c); wait(taskref[])
@test istaskdone(taskref[])
end

@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
c = Channel{Int}(sz)
Expand Down

0 comments on commit e891bca

Please sign in to comment.