Skip to content

Commit

Permalink
deprecate produce, consume and task iteration
Browse files Browse the repository at this point in the history
added tests, added channeled_tasks
  • Loading branch information
amitmurthy committed Jan 5, 2017
1 parent cbc6670 commit 250bcb8
Show file tree
Hide file tree
Showing 14 changed files with 431 additions and 268 deletions.
90 changes: 79 additions & 11 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Other constructors:
type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol
state::Tuple

data::Array{T,1}
sz_max::Int # maximum size of channel
Expand All @@ -39,7 +39,7 @@ type Channel{T} <: AbstractChannel
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0))
new(Condition(), Condition(), (:open,nothing), Array{T}(0), sz, Array{Condition}(0))
end

# deprecated empty constructor
Expand All @@ -60,6 +60,12 @@ closed_exception() = InvalidStateException("Channel is closed.", :closed)

isbuffered(c::Channel) = c.sz_max==0 ? false : true

function check_channel_state(c::Channel)
if !isopen(c)
isa(c.state[2], Exception) && throw(c.state[2])
throw(closed_exception())
end
end
"""
close(c::Channel)
Expand All @@ -69,11 +75,71 @@ Closes a channel. An exception is thrown by:
* [`take!`](@ref) and [`fetch`](@ref) on an empty, closed channel.
"""
function close(c::Channel)
c.state = :closed
c.state = (:closed, nothing)
notify_error(c::Channel, closed_exception())
nothing
end
isopen(c::Channel) = (c.state == :open)
isopen(c::Channel) = (c.state[1] == :open)

"""
bind(chnl::Channel, task::Task)
Associates the lifetime of `chnl` with a task.
Channel `chnl` is automatically closed when the task terminates.
Any uncaught exception in the task is propagated to all waiters on `chnl`.
The `chnl` object can be explicitly closed independent of task termination.
Terminating tasks have no effect on already closed Channel objects.
When a channel is bound to multiple tasks, the first task to terminate will
close the channel. When multiple channels are bound to the same task,
termination of the task will close all channels.
"""
function bind(c::Channel, task::Task)
ref = WeakRef(c)
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
c
end

"""
channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
A convenience method to create `n` channels and bind them to tasks started
from the provided functions in a single call. Each `func` must accept `n` arguments
which are the created channels. Channel types and sizes may be specified via
keyword arguments `ctypes` and `csizes` respectively. If unspecified, all channels are
of type `Channel{Any}(0)`.
Returns a tuple, `(Array{Channel}, Array{Task})`, of the created channels and tasks.
"""
function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
@assert length(csizes) == n
@assert length(ctypes) == n

chnls = map(i->Channel{ctypes[i]}(csizes[i]), 1:n)
tasks=Task[Task(()->f(chnls...)) for f in funcs]

# bind all tasks to all channels and schedule them
foreach(t -> foreach(c -> bind(c,t), chnls), tasks)
foreach(t->schedule(t), tasks)

yield() # Allow scheduled tasks to run

return (chnls, tasks)
end

function close_chnl_on_taskdone(t::Task, ref::WeakRef)
if ref.value !== nothing
c = ref.value
!isopen(c) && return
if istaskfailed(t)
c.state = (:closed, task_result(t))
notify_error(c, task_result(t))
else
close(c)
end
end
end

type InvalidStateException <: Exception
msg::AbstractString
Expand All @@ -89,8 +155,10 @@ For unbuffered channels, blocks until a [`take!`](@ref) is performed by a differ
task.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
check_channel_state(c)
isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v)
yield()
v
end

function put_buffered(c::Channel, v)
Expand Down Expand Up @@ -148,7 +216,7 @@ shift!(c::Channel) = take!(c)

# 0-size channel
function take_unbuffered(c::Channel)
!isopen(c) && throw(closed_exception())
check_channel_state(c)
cond_taker = Condition()
push!(c.takers, cond_taker)
notify(c.cond_put, nothing, false, false)
Expand Down Expand Up @@ -178,7 +246,7 @@ n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)

function wait(c::Channel)
while !isready(c)
!isopen(c) && throw(closed_exception())
check_channel_state(c)
wait(c.cond_take)
end
nothing
Expand All @@ -194,14 +262,14 @@ eltype{T}(::Type{Channel{T}}) = T

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")

type ChannelState{T}
type ChannelIterState{T}
hasval::Bool
val::T
ChannelState(x) = new(x)
ChannelIterState(x) = new(x)
end

start{T}(c::Channel{T}) = ChannelState{T}(false)
function done(c::Channel, state::ChannelState)
start{T}(c::Channel{T}) = ChannelIterState{T}(false)
function done(c::Channel, state::ChannelIterState)
try
# we are waiting either for more data or channel to be closed
state.hasval && return false
Expand Down
96 changes: 96 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1483,4 +1483,100 @@ end
# Calling promote_op is likely a bad idea, so deprecate its convenience wrapper promote_eltype_op
@deprecate promote_eltype_op(op, As...) promote_op(op, map(eltype, As)...)


## produce, consume, and task iteration
# NOTE: When removing produce/consume, also remove field Task.consumers and related code in
# task.jl and event.jl

function produce(v)
depwarn("produce is now deprecated. Use Channels for inter-task communication.", :produce)

ct = current_task()
local empty, t, q
while true
q = ct.consumers
if isa(q,Task)
t = q
ct.consumers = nothing
empty = true
break
elseif isa(q,Condition) && !isempty(q.waitq)
t = shift!(q.waitq)
empty = isempty(q.waitq)
break
end
wait()
end

t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
if empty
schedule_and_wait(t, v)
while true
# wait until there are more consumers
q = ct.consumers
if isa(q,Task)
return q.result
elseif isa(q,Condition) && !isempty(q.waitq)
return q.waitq[1].result
end
wait()
end
else
schedule(t, v)
# make sure `t` runs before us. otherwise, the producer might
# finish before `t` runs again, causing it to see the producer
# as done, causing done(::Task, _) to miss the value `v`.
# see issue #7727
yield()
return q.waitq[1].result
end
end
produce(v...) = produce(v)

function consume(P::Task, values...)
depwarn("consume is now deprecated. Use Channels for inter-task communication.", :consume)

if istaskdone(P)
return wait(P)
end

ct = current_task()
ct.result = length(values)==1 ? values[1] : values

#### un-optimized version
#if P.consumers === nothing
# P.consumers = Condition()
#end
#push!(P.consumers.waitq, ct)
# optimized version that avoids the queue for 1 consumer
if P.consumers === nothing || (isa(P.consumers,Condition)&&isempty(P.consumers.waitq))
P.consumers = ct
else
if isa(P.consumers, Task)
t = P.consumers
P.consumers = Condition()
push!(P.consumers.waitq, t)
end
push!(P.consumers.waitq, ct)
end

P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice
end

function start(t::Task)
depwarn(string("Task iteration is now deprecated.",
" Use Channels for inter-task communication. ",
" A for-loop on a Channel object is terminated by calling `close` on the object."), :taskfor)
nothing
end
function done(t::Task, val)
t.result = consume(t)
istaskdone(t)
end
next(t::Task, val) = (t.result, nothing)
iteratorsize(::Type{Task}) = SizeUnknown()
iteratoreltype(::Type{Task}) = EltypeUnknown()

isempty(::Task) = error("isempty not defined for Tasks")

# End deprecations scheduled for 0.6
16 changes: 0 additions & 16 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,6 @@ Show every part of the representation of a value.
"""
dump

"""
consume(task, values...)
Receive the next value passed to `produce` by the specified task. Additional arguments may
be passed, to be returned from the last `produce` call in the producer.
"""
consume

"""
cummax(A, [dim])
Expand Down Expand Up @@ -1482,14 +1474,6 @@ the topmost backend that does not throw a `MethodError`).
"""
pushdisplay

"""
produce(value)
Send the given value to the last `consume` call, switching to the consumer task. If the next
`consume` call passes any values, they are returned by `produce`.
"""
produce

"""
StackOverflowError()
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ export
sizeof,

# tasks and conditions
channeled_tasks,
Condition,
consume,
current_task,
Expand Down
18 changes: 11 additions & 7 deletions base/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,10 @@ function walkdir(root; topdown=true, follow_symlinks=false, onerror=throw)
catch err
isa(err, SystemError) || throw(err)
onerror(err)
#Need to return an empty task to skip the current root folder
return Task(()->())
# Need to return an empty closed channel to skip the current root folder
chnl = Channel(0)
close(chnl)
return chnl
end
dirs = Array{eltype(content)}(0)
files = Array{eltype(content)}(0)
Expand All @@ -467,23 +469,25 @@ function walkdir(root; topdown=true, follow_symlinks=false, onerror=throw)
end
end

function _it()
function _it(chnl)
if topdown
produce(root, dirs, files)
put!(chnl, (root, dirs, files))
end
for dir in dirs
path = joinpath(root,dir)
if follow_symlinks || !islink(path)
for (root_l, dirs_l, files_l) in walkdir(path, topdown=topdown, follow_symlinks=follow_symlinks, onerror=onerror)
produce(root_l, dirs_l, files_l)
put!(chnl, (root_l, dirs_l, files_l))
end
end
end
if !topdown
produce(root, dirs, files)
put!(chnl, (root, dirs, files))
end
end
Task(_it)

chnls, _ = channeled_tasks(1, _it)
return chnls[1]
end

function unlink(p::AbstractString)
Expand Down
Loading

0 comments on commit 250bcb8

Please sign in to comment.