Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Oct 9, 2016
1 parent 6d7b960 commit 6d5d12e
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 63 deletions.
53 changes: 34 additions & 19 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Channel{T} <: AbstractChannel
data::Array{T,1}
sz_max::UInt # maximum size of channel

# Used when sz_max == 0
# Used when sz_max == 0, i.e., an unbuffered channel.
takers::Array{Condition}

function Channel(sz::Float64)
Expand All @@ -32,6 +32,10 @@ Channel(sz) = Channel{Any}(sz)

closed_exception() = InvalidStateException("Channel is closed.", :closed)

const UNBUFFERED_CHANNEL = Type{Val{false}}
const BUFFERED_CHANNEL = Type{Val{true}}
isbuffered(c::Channel) = c.sz_max==0 ? Val{false} : Val{true}

"""
close(c::Channel)
Expand All @@ -56,13 +60,16 @@ end
put!(c::Channel, v)
Appends an item `v` to the channel `c`. Blocks if the channel is full.
For unbuffered channels, blocks until a `take!` is performed by a different
task.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
put!(c,v,Val{c.sz_max==0})
put!(c,v,isbuffered(c))
end

function put!(c::Channel, v, ::Type{Val{false}})
function put!(c::Channel, v, ::BUFFERED_CHANNEL)
while length(c.data) == c.sz_max
wait(c.cond_put)
end
Expand All @@ -72,7 +79,7 @@ function put!(c::Channel, v, ::Type{Val{false}})
end

# 0-sized channel
function put!(c::Channel, v, ::Type{Val{true}})
function put!(c::Channel, v, ::UNBUFFERED_CHANNEL)
while length(c.takers) == 0
notify(c.cond_take, nothing, true, false) # Required to handle wait() on 0-sized channels
wait(c.cond_put)
Expand All @@ -84,21 +91,30 @@ end

push!(c::Channel, v) = put!(c, v)

fetch(c::Channel) = fetch(c, Val{c.sz_max==0})
function fetch(c::Channel, ::Type{Val{false}})
"""
fetch(c::Channel)
Waits for and gets the first available item from the channel. Does not
remove the item. `fetch` is unsupported on an unbuffered (0-size) channel.
"""
fetch(c::Channel) = fetch(c, isbuffered(c))
function fetch(c::Channel, ::BUFFERED_CHANNEL)
wait(c)
c.data[1]
end
fetch(c::Channel, ::Type{Val{true}}) = throw(ErrorException("`fetch` on a 0-sized Channel is not supported."))
fetch(c::Channel, ::UNBUFFERED_CHANNEL) = throw(ErrorException("`fetch` is not supported on an unbuffered Channel."))


"""
take!(c::Channel)
Removes and returns a value from a `Channel`. Blocks till data is available.
For unbuffered channels, blocks until a `put!` is performed by a different
task.
"""
take!(c::Channel) = take!(c, Val{c.sz_max==0})
function take!(c::Channel, ::Type{Val{false}})
take!(c::Channel) = take!(c, isbuffered(c))
function take!(c::Channel, ::BUFFERED_CHANNEL)
wait(c)
v = shift!(c.data)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
Expand All @@ -108,7 +124,7 @@ end
shift!(c::Channel) = take!(c)

# 0-size channel
function take!(c::Channel, ::Type{Val{true}})
function take!(c::Channel, ::UNBUFFERED_CHANNEL)
!isopen(c) && throw(closed_exception())
cond_taker = Condition()
push!(c.takers, cond_taker)
Expand All @@ -128,16 +144,15 @@ end
"""
isready(c::Channel)
Determine whether a `Channel` has a value stored to it.
For 0-sized channels returns true if there are tasks waiting
on a `put!`
Determine whether a `Channel` has a value stored to it. Returns
immediately, does not block.
`isready` on `Channel`s is non-blocking.
For unbuffered channels returns `true` if there are tasks waiting
on a `put!`.
"""
isready(c::Channel) = n_avail(c, Val{c.sz_max==0}) > 0
n_avail(c::Channel, ::Type{Val{false}}) = length(c.data)
n_avail(c::Channel, ::Type{Val{true}}) = n_waiters(c.cond_put)
isready(c::Channel) = n_avail(c, isbuffered(c)) > 0
n_avail(c::Channel, ::BUFFERED_CHANNEL) = length(c.data)
n_avail(c::Channel, ::UNBUFFERED_CHANNEL) = n_waiters(c.cond_put)

function wait(c::Channel)
while !isready(c)
Expand All @@ -155,7 +170,7 @@ end

eltype{T}(::Type{Channel{T}}) = T

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c, Val{c.sz_max==0})))")
show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c, isbuffered(c))))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}()
function done(c::Channel, state::Ref)
Expand Down
11 changes: 4 additions & 7 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1434,14 +1434,11 @@ endof
"""
Channel{T}(sz::Int)
Constructs a `Channel` that can hold a maximum of `sz` objects of type `T`. `put!` calls on
a full channel block till an object is removed with `take!`.
Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects
of type `T`. `put!` calls on a full channel block till an object is removed with `take!`.
`Channel(0)` constructs a Channel without a backing store. Consequently a `put!` on a
0-sized channel will block till another task calls a `take!` on it. And vice-versa.
`isready` on a 0-sized channel returns true if there are any tasks blocked on a `put!`
`fetch` is unsupported on a 0-sized channel.
`Channel(0)` constructs an unbuffered channel. `put!` blocks till a matching `take!` is called.
And vice-versa.
Other constructors:
Expand Down
1 change: 0 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,6 @@ Waits and fetches a value from `x` depending on the type of `x`. Does not remove
is an exception, throws a `RemoteException` which captures the remote exception and backtrace.
* `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are
same as for a `Future` .
* `Channel` : Wait for and get the first available item from the channel.
"""
fetch(x::ANY) = x

Expand Down
77 changes: 41 additions & 36 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,54 @@ Tasks

.. Docstring generated from Julia source
Constructs a ``Channel`` that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block till an object is removed with ``take!``\ .
Constructs a ``Channel`` with an internal buffer that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block till an object is removed with ``take!``\ .

``Channel(0)`` constructs a Channel without a backing store. Consequently a ``put!`` on a 0-sized channel will block till another task calls a ``take!`` on it. And vice-versa.

``isready`` on a 0-sized channel returns true if there are any tasks blocked on a ``put!`` ``fetch`` is unsupported on a 0-sized channel.
``Channel(0)`` constructs an unbuffered channel. ``put!`` blocks till a matching ``take!`` is called. And vice-versa.

Other constructors:

* ``Channel(Inf)`` - equivalent to ``Channel{Any}(typemax(UInt))``
* ``Channel(sz)`` equivalent to ``Channel{Any}(sz)``

.. function:: put!(c::Channel, v)

.. Docstring generated from Julia source
Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full.

For unbuffered channels, blocks until a ``take!`` is performed by a different task.

.. function:: take!(c::Channel)

.. Docstring generated from Julia source
Removes and returns a value from a ``Channel``\ . Blocks till data is available.

For unbuffered channels, blocks until a ``put!`` is performed by a different task.

.. function:: isready(c::Channel)

.. Docstring generated from Julia source
Determine whether a ``Channel`` has a value stored to it. Returns immediately, does not block.

For unbuffered channels returns ``true`` if there are tasks waiting on a ``put!``\ .

.. function:: fetch(c::Channel)

.. Docstring generated from Julia source
Waits for and gets the first available item from the channel. Does not remove the item. ``fetch`` is unsupported on an unbuffered (0-size) channel.

.. function:: close(c::Channel)

.. Docstring generated from Julia source
Closes a channel. An exception is thrown by:

* ``put!`` on a closed channel.
* ``take!`` and ``fetch`` on an empty, closed channel.

General Parallel Computing Support
----------------------------------

Expand Down Expand Up @@ -340,7 +377,6 @@ General Parallel Computing Support

* ``Future``\ : Wait for and get the value of a Future. The fetched value is cached locally. Further calls to ``fetch`` on the same reference return the cached value. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace.
* ``RemoteChannel``\ : Wait for and get the value of a remote reference. Exceptions raised are same as for a ``Future`` .
* ``Channel`` : Wait for and get the first available item from the channel.

.. function:: remotecall_wait(f, id::Integer, args...; kwargs...)

Expand All @@ -366,34 +402,12 @@ General Parallel Computing Support
Store a value to a ``Future`` ``rr``\ . ``Future``\ s are write-once remote references. A ``put!`` on an already set ``Future`` throws an ``Exception``\ . All asynchronous remote calls return ``Future``\ s and set the value to the return value of the call upon completion.

.. function:: put!(c::Channel, v)

.. Docstring generated from Julia source
Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full.

.. function:: take!(rr::RemoteChannel, args...)

.. Docstring generated from Julia source
Fetch value(s) from a remote channel, removing the value(s) in the processs.

.. function:: take!(c::Channel)

.. Docstring generated from Julia source
Removes and returns a value from a ``Channel``\ . Blocks till data is available.

.. function:: isready(c::Channel)

.. Docstring generated from Julia source
Determine whether a ``Channel`` has a value stored to it.

For 0-sized channels returns true if there are tasks waiting on a ``put!``

``isready`` on ``Channel``\ s is non-blocking.

.. function:: isready(rr::RemoteChannel, args...)

.. Docstring generated from Julia source
Expand All @@ -414,15 +428,6 @@ General Parallel Computing Support
@async put!(c, remotecall_fetch(long_computation, p))
isready(c) # will not block
.. function:: close(c::Channel)

.. Docstring generated from Julia source
Closes a channel. An exception is thrown by:

* ``put!`` on a closed channel.
* ``take!`` and ``fetch`` on an empty, closed channel.

.. function:: WorkerPool(workers)

.. Docstring generated from Julia source
Expand Down

0 comments on commit 6d5d12e

Please sign in to comment.