Skip to content

Commit

Permalink
Channel constructor requires an explict size.
Browse files Browse the repository at this point in the history
Move channel tests into its own file.
Implement 0-sized channels.
  • Loading branch information
amitmurthy committed Oct 7, 2016
1 parent 9f80eab commit 7a24ca1
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 79 deletions.
78 changes: 65 additions & 13 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,33 @@

abstract AbstractChannel

const DEF_CHANNEL_SZ=32

type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol

data::Array{T,1}
sz_max::Int # maximum size of channel
sz_max::UInt # maximum size of channel

# Used when sz_max == 0
takers::Array{Condition}

function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
new(Condition(), Condition(), :open, Array{T}(0), sz_max)
function Channel(sz::Float64)
if sz == Inf
Channel{T}(typemax(UInt))
else
Channel{T}(convert(UInt, sz))
end
end
function Channel(sz::Integer)
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0))
end
end

Channel(sz::Int = DEF_CHANNEL_SZ) = Channel{Any}(sz)
Channel(sz) = Channel{Any}(sz)

closed_exception() = InvalidStateException("Channel is closed.", :closed)

Expand Down Expand Up @@ -49,6 +59,10 @@ Appends an item `v` to the channel `c`. Blocks if the channel is full.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
put!(c,v,Val{c.sz_max==0})
end

function put!(c::Channel, v, ::Type{Val{false}})
while length(c.data) == c.sz_max
wait(c.cond_put)
end
Expand All @@ -57,19 +71,34 @@ function put!(c::Channel, v)
v
end

# 0-sized channel
function put!(c::Channel, v, ::Type{Val{true}})
while length(c.takers) == 0
notify(c.cond_take, nothing, true, false) # Required to handle wait() on 0-sized channels
wait(c.cond_put)
end
cond_taker = shift!(c.takers)
notify(cond_taker, v, false, false)
v
end

push!(c::Channel, v) = put!(c, v)

function fetch(c::Channel)
fetch(c::Channel) = fetch(c, Val{c.sz_max==0})
function fetch(c::Channel, ::Type{Val{false}})
wait(c)
c.data[1]
end
fetch(c::Channel, ::Type{Val{true}}) = throw(ErrorException("`fetch` on a 0-sized Channel is not supported."))


"""
take!(c::Channel)
Removes and returns a value from a `Channel`. Blocks till data is available.
"""
function take!(c::Channel)
take!(c::Channel) = take!(c, Val{c.sz_max==0})
function take!(c::Channel, ::Type{Val{false}})
wait(c)
v = shift!(c.data)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
Expand All @@ -78,13 +107,37 @@ end

shift!(c::Channel) = take!(c)

# 0-size channel
function take!(c::Channel, ::Type{Val{true}})
!isopen(c) && throw(closed_exception())
cond_taker = Condition()
push!(c.takers, cond_taker)
notify(c.cond_put, nothing, false, false)
try
return wait(cond_taker)
catch e
if isa(e, InterruptException)
# remove self from the list of takers
filter!(x -> x != cond_taker, c.takers)
else
rethrow(e)
end
end
end

"""
isready(c::Channel)
Determine whether a `Channel` has a value stored to it.
For 0-sized channels returns true if there are tasks waiting
on a `put!`
`isready` on `Channel`s is non-blocking.
"""
isready(c::Channel) = n_avail(c) > 0
isready(c::Channel) = n_avail(c, Val{c.sz_max==0}) > 0
n_avail(c::Channel, ::Type{Val{false}}) = length(c.data)
n_avail(c::Channel, ::Type{Val{true}}) = n_waiters(c.cond_put)

function wait(c::Channel)
while !isready(c)
Expand All @@ -97,13 +150,12 @@ end
function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
foreach(x->notify_error(x, err), c.takers)
end

eltype{T}(::Type{Channel{T}}) = T

n_avail(c::Channel) = length(c.data)

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")
show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c, Val{c.sz_max==0})))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}()
function done(c::Channel, state::Ref)
Expand Down
10 changes: 8 additions & 2 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1437,10 +1437,16 @@ endof
Constructs a `Channel` that can hold a maximum of `sz` objects of type `T`. `put!` calls on
a full channel block till an object is removed with `take!`.
`Channel(0)` constructs a Channel without a backing store. Consequently a `put!` on a
0-sized channel will block till another task calls a `take!` on it. And vice-versa.
`isready` on a 0-sized channel returns true if there are any tasks blocked on a `put!`
`fetch` is unsupported on a 0-sized channel.
Other constructors:
- `Channel()` - equivalent to `Channel{Any}(32)`
- `Channel(sz::Int)` equivalent to `Channel{Any}(sz)`
- `Channel(Inf)` - equivalent to `Channel{Any}(typemax(UInt))`
- `Channel(sz)` equivalent to `Channel{Any}(sz)`
"""
Channel

Expand Down
1 change: 1 addition & 0 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ notify1(c::Condition, arg=nothing) = notify(c, arg, all=false)
notify_error(c::Condition, err) = notify(c, err, error=true)
notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)

n_waiters(c::Condition) = length(c.waitq)

# schedule an expression to run asynchronously, with minimal ceremony
"""
Expand Down
1 change: 1 addition & 0 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1640,3 +1640,4 @@ Internals
.. Docstring generated from Julia source
Compile the given function ``f`` for the argument tuple (of types) ``args``\ , but do not execute it.

14 changes: 11 additions & 3 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,14 @@ Tasks
Constructs a ``Channel`` that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block till an object is removed with ``take!``\ .

``Channel(0)`` constructs a Channel without a backing store. Consequently a ``put!`` on a 0-sized channel will block till another task calls a ``take!`` on it. And vice-versa.

``isready`` on a 0-sized channel returns true if there are any tasks blocked on a ``put!`` ``fetch`` is unsupported on a 0-sized channel.

Other constructors:

* ``Channel()`` - equivalent to ``Channel{Any}(32)``
* ``Channel(sz::Int)`` equivalent to ``Channel{Any}(sz)``
* ``Channel(Inf)`` - equivalent to ``Channel{Any}(typemax(UInt))``
* ``Channel(sz)`` equivalent to ``Channel{Any}(sz)``

General Parallel Computing Support
----------------------------------
Expand Down Expand Up @@ -384,7 +388,11 @@ General Parallel Computing Support

.. Docstring generated from Julia source
Determine whether a ``Channel`` has a value stored to it. ``isready`` on ``Channel``\ s is non-blocking.
Determine whether a ``Channel`` has a value stored to it.

For 0-sized channels returns true if there are tasks waiting on a ``put!``

``isready`` on ``Channel``\ s is non-blocking.

.. function:: isready(rr::RemoteChannel, args...)

Expand Down
88 changes: 88 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

# Test various constructors
c=Channel(1)
@test eltype(c) == Any
@test put!(c, 1) == 1
@test isready(c) == true
@test take!(c) == 1
@test isready(c) == false

@test eltype(Channel(1.0)) == Any

c=Channel{Int}(1)
@test eltype(c) == Int
@test_throws MethodError put!(c, "Hello")

c=Channel{Int}(Inf)
@test eltype(c) == Int
pvals = map(i->put!(c,i), 1:10^6)
tvals = Int[take!(c) for i in 1:10^6]
@test pvals == tvals

@test_throws MethodError Channel()
@test_throws ArgumentError Channel(-1)
@test_throws InexactError Channel(1.5)

# Test multiple concurrent put!/take! on a channel for different sizes
function testcpt(sz)
c = Channel{Int}(sz)
size = 0
inc() = size += 1
dec() = size -= 1
@sync for i = 1:10^4
@async (sleep(rand()); put!(c, i); inc())
@async (sleep(rand()); take!(c); dec())
end
@test size == 0
end
testcpt(0)
testcpt(1)
testcpt(32)
testcpt(Inf)

# Test multiple "for" loops waiting on the same channel which
# is closed after adding a few elements.
c=Channel(32)
results=[]
@sync begin
for i in 1:20
@async for i in c
push!(results, i)
end
end
sleep(1.0)
for i in 1:5
put!(c,i)
end
close(c)
end
@test sum(results) == 15

# Testing timedwait on multiple channels
@sync begin
rr1 = Channel(1)
rr2 = Channel(1)
rr3 = Channel(1)

callback() = all(map(isready, [rr1, rr2, rr3]))
# precompile functions which will be tested for execution time
@test !callback()
@test timedwait(callback, 0.0) === :timed_out

@async begin sleep(0.5); put!(rr1, :ok) end
@async begin sleep(1.0); put!(rr2, :ok) end
@async begin sleep(2.0); put!(rr3, :ok) end

tic()
timedwait(callback, 1.0)
et=toq()
# assuming that 0.5 seconds is a good enough buffer on a typical modern CPU
try
@test (et >= 1.0) && (et <= 1.5)
@test !isready(rr3)
catch
warn("timedwait tests delayed. et=$et, isready(rr3)=$(isready(rr3))")
end
@test isready(rr1)
end
3 changes: 2 additions & 1 deletion test/choosetests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ function choosetests(choices = [])
"markdown", "base64", "serialize", "misc", "threads",
"enums", "cmdlineargs", "i18n", "workspace", "libdl", "int",
"checked", "intset", "floatfuncs", "compile", "parallel", "inline",
"boundscheck", "error", "ambiguous", "cartesian", "asmvariant"
"boundscheck", "error", "ambiguous", "cartesian", "asmvariant",
"channels"
]

if Base.USE_GPL_LIBS
Expand Down
60 changes: 0 additions & 60 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -512,66 +512,6 @@ workloads = Int[sum(ids .== i) for i in 2:nprocs()]
# @parallel reduction should work even with very short ranges
@test @parallel(+, for i=1:2; i; end) == 3

# Testing timedwait on multiple channels
@sync begin
rr1 = Channel()
rr2 = Channel()
rr3 = Channel()

callback() = all(map(isready, [rr1, rr2, rr3]))
# precompile functions which will be tested for execution time
@test !callback()
@test timedwait(callback, 0.0) === :timed_out

@async begin sleep(0.5); put!(rr1, :ok) end
@async begin sleep(1.0); put!(rr2, :ok) end
@async begin sleep(2.0); put!(rr3, :ok) end

tic()
timedwait(callback, 1.0)
et=toq()
# assuming that 0.5 seconds is a good enough buffer on a typical modern CPU
try
@test (et >= 1.0) && (et <= 1.5)
@test !isready(rr3)
catch
warn("timedwait tests delayed. et=$et, isready(rr3)=$(isready(rr3))")
end
@test isready(rr1)
end

# Test multiple concurrent put!/take! on a channel
function testcpt()
c = Channel()
size = 0
inc() = size += 1
dec() = size -= 1
@sync for i = 1:10^4
@async (sleep(rand()); put!(c, i); inc())
@async (sleep(rand()); take!(c); dec())
end
@test size == 0
end
testcpt()

# Test multiple "for" loops waiting on the same channel which
# is closed after adding a few elements.
c=Channel()
results=[]
@sync begin
for i in 1:20
@async for i in c
push!(results, i)
end
end
sleep(1.0)
for i in 1:5
put!(c,i)
end
close(c)
end
@test sum(results) == 15

@test_throws ArgumentError sleep(-1)
@test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5)

Expand Down

0 comments on commit 7a24ca1

Please sign in to comment.