Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate produce, consume and task iteration #19841

Merged
merged 1 commit into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ Library improvements

* New `iszero(x)` function to quickly check whether `x` is zero (or is all zeros, for an array) ([#19950]).

* `notify` now returns a count of tasks woken up ([#19841]).

Compiler/Runtime improvements
-----------------------------

Expand All @@ -187,6 +189,9 @@ Deprecated or removed
functions (`airyai`, `airybi`, `airyaiprime`, `airybiprimex`, `airyaix`, `airybix`,
`airyaiprimex`, `airybiprimex`) ([#18050]).

* `produce`, `consume` and iteration over a Task object has been deprecated in favor of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have been deprecated

using Channels for inter-task communication ([#19841]).

Julia v0.5.0 Release Notes
==========================

Expand Down
194 changes: 183 additions & 11 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol
excp::Nullable{Exception} # Exception to be thrown when state != :open

data::Array{T,1}
sz_max::Int # maximum size of channel
Expand All @@ -39,7 +40,7 @@ type Channel{T} <: AbstractChannel
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0))
new(Condition(), Condition(), :open, Nullable{Exception}(), Array{T}(0), sz, Array{Condition}(0))
end

# deprecated empty constructor
Expand All @@ -53,13 +54,82 @@ end

Channel(sz) = Channel{Any}(sz)

# special constructors
"""
Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
Creates a new task from `func`, binds it to a new channel of type
`ctype` and size `csize`, schedules the task, all in a single call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and schedules the task

`func` must accept the bound channel as its only argument.
If you need a reference to the created task, pass a `Ref{Task}` object via
keyword argument `taskref`.
Returns a Channel.
```jldoctest
julia> chnl = Channel(c->foreach(i->put!(c,i), 1:4));
julia> @show typeof(chnl);
Copy link
Contributor

@tkelman tkelman Jan 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@show is redundant here, just output without a semicolon

typeof(chnl) = Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
```
An example of referencing the created task:
```jldoctest
julia> taskref = Ref{Task}();
julia> chnl = Channel(c->(@show take!(c)); taskref=taskref);
julia> task = taskref[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really awkward API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of it is pass-by-reference in C. Most times since the channel is bound to the task, the user will not need the task object. But if required, can be retrieved.

The alternative would be to export a new function which would return a tuple of (Channel, Task), which in the common case would require a [1] to retrieve the Channel. A little inconvenient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Julia isn't C, I can't think of any Julia APIs that do this. A separate function for this would make more sense, might not even need to be exported if it isn't used very often. Is there any other way of identifying whether or not a Channel has a task bound to it? Something like being able to ask for boundtasks(c::Channel) might be useful if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, no Julia APIs do this yet, but it may catch on.

If a need is felt for boundtasks(c::Channel), it will need additional housekeeping in the Channel object. Not currently done (task local storage in the bound task is used to record a WeakRef to the channel).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it may catch on.

I seriously hope not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would Channel(t::Task) work? we really shouldn't be trying to imitate varargout here when there's already another clearer way of accomplishing the same result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since Task(f) expects a 0-arg function and Channel(f) expects a 1-arg function which is the created Channel itself. As this replaces task-iteration with a bound channel iteration, it is important that the task get early access to the channel and any early errors raised in the task are propagated to the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling bind separately is sufficient though?

Copy link
Contributor Author

@amitmurthy amitmurthy Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but you typically need to ensure that the Task is bound before it is scheduled to handle early errors in the task. For example,

c = Channel()
t = @schedule (foo(); while(some_condition); put!(c,data); end)
bind(c,t)
for x in c
  bar(x)
end 

can lead to the for-loop missing any exception thrown by foo. Actually, it is currently not an issue since @chedule does not yield, but it is safe to assume that this behavior can change in the future.

A safer way is

c = Channel()
t = Task(()->(foo(); while(some_condition); put!(c,data); end))
bind(c,t)
schedule(t)
for x in c
  bar(x)
end 

With the special Channel constructor, the above code block is equivalent to

for x in Channel(c->(foo(); while(some_condition); put!(c,data); end))
  bar(x)
end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not saying the latter isn't useful, but if needing to use the task afterwards is uncommon I think the former pattern is clearer and more idiomatic than passing a reference argument.

julia> @show istaskdone(task);
Copy link
Contributor

@tkelman tkelman Jan 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@show is redundant here, just output without a semicolon

istaskdone(task) = false
julia> put!(chnl, "Hello");
take!(c) = "Hello"
julia> @show istaskdone(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@show is redundant here, just output without a semicolon

istaskdone(task) = true
```
"""
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
chnl = Channel{ctype}(csize)
task = Task(()->func(chnl))
bind(chnl,task)
schedule(task)
yield()

isa(taskref, Ref{Task}) && (taskref.x = task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't taskref[] = task a more recommended way of writing this, in case the representation of Ref ever changes?

return chnl
end



# deprecated empty constructor
Channel() = Channel{Any}()

closed_exception() = InvalidStateException("Channel is closed.", :closed)

isbuffered(c::Channel) = c.sz_max==0 ? false : true

function check_channel_state(c::Channel)
if !isopen(c)
!isnull(c.excp) && throw(get(c.excp))
throw(closed_exception())
end
end
"""
close(c::Channel)
Expand All @@ -70,11 +140,110 @@ Closes a channel. An exception is thrown by:
"""
function close(c::Channel)
c.state = :closed
notify_error(c::Channel, closed_exception())
c.excp = Nullable{}(closed_exception())
notify_error(c)
nothing
end
isopen(c::Channel) = (c.state == :open)

"""
bind(chnl::Channel, task::Task)
Associates the lifetime of `chnl` with a task.
Channel `chnl` is automatically closed when the task terminates.
Any uncaught exception in the task is propagated to all waiters on `chnl`.
The `chnl` object can be explicitly closed independent of task termination.
Terminating tasks have no effect on already closed Channel objects.
When a channel is bound to multiple tasks, the first task to terminate will
close the channel. When multiple channels are bound to the same task,
termination of the task will close all channels.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of the bound channels, not absolutely all channels that might exist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is obvious it refers to the "multiple channels" mentioned before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not obvious, it sounds confusing since it's missing an article or qualifier.

```jldoctest
julia> c = Channel(0);
julia> task = @schedule foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> @show isopen(c);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@show is redundant here, just output without a semicolon

isopen(c) = false
```
```jldoctest
julia> c = Channel(0);
julia> task = @schedule (put!(c,1);error("foo"));
julia> bind(c,task);
julia> take!(c);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not suppressing the output would be worthwhile here

julia> put!(c,1);
ERROR: foo
Stacktrace:
[1] check_channel_state(::Channel{Any}) at ./channels.jl:129
[2] put!(::Channel{Any}, ::Int64) at ./channels.jl:247
```
"""
function bind(c::Channel, task::Task)
ref = WeakRef(c)
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
c
end

"""
channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
A convenience method to create `n` channels and bind them to tasks started
from the provided functions in a single call. Each `func` must accept `n` arguments
which are the created channels. Channel types and sizes may be specified via
keyword arguments `ctypes` and `csizes` respectively. If unspecified, all channels are
of type `Channel{Any}(0)`.
Returns a tuple, `(Array{Channel}, Array{Task})`, of the created channels and tasks.
"""
function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
@assert length(csizes) == n
@assert length(ctypes) == n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if users are ever going to call this, then these are user errors and should get better error messages instead of being asserts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exported yet. I found it useful and is used in one of the tests. Have left it in for now. The interface will probably be modified a bit before this is exposed.


chnls = map(i->Channel{ctypes[i]}(csizes[i]), 1:n)
tasks=Task[Task(()->f(chnls...)) for f in funcs]

# bind all tasks to all channels and schedule them
foreach(t -> foreach(c -> bind(c,t), chnls), tasks)
foreach(t->schedule(t), tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t->schedule(t) is redundant, this is just schedule


yield() # Allow scheduled tasks to run

return (chnls, tasks)
end

function close_chnl_on_taskdone(t::Task, ref::WeakRef)
if ref.value !== nothing
c = ref.value
!isopen(c) && return
if istaskfailed(t)
c.state = :closed
c.excp = Nullable{Exception}(task_result(t))
notify_error(c)
else
close(c)
end
end
end

type InvalidStateException <: Exception
msg::AbstractString
state::Symbol
Expand All @@ -89,7 +258,7 @@ For unbuffered channels, blocks until a [`take!`](@ref) is performed by a differ
task.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
check_channel_state(c)
isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v)
end

Expand All @@ -98,7 +267,9 @@ function put_buffered(c::Channel, v)
wait(c.cond_put)
end
push!(c.data, v)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.

# notify all, since some of the waiters may be on a "fetch" call.
notify(c.cond_take, nothing, true, false)
v
end

Expand All @@ -108,7 +279,7 @@ function put_unbuffered(c::Channel, v)
wait(c.cond_put)
end
cond_taker = shift!(c.takers)
notify(cond_taker, v, false, false)
notify(cond_taker, v, false, false) > 0 && yield()
v
end

Expand Down Expand Up @@ -148,7 +319,7 @@ shift!(c::Channel) = take!(c)

# 0-size channel
function take_unbuffered(c::Channel)
!isopen(c) && throw(closed_exception())
check_channel_state(c)
cond_taker = Condition()
push!(c.takers, cond_taker)
notify(c.cond_put, nothing, false, false)
Expand Down Expand Up @@ -178,7 +349,7 @@ n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)

function wait(c::Channel)
while !isready(c)
!isopen(c) && throw(closed_exception())
check_channel_state(c)
wait(c.cond_take)
end
nothing
Expand All @@ -189,19 +360,20 @@ function notify_error(c::Channel, err)
notify_error(c.cond_put, err)
foreach(x->notify_error(x, err), c.takers)
end
notify_error(c::Channel) = notify_error(c, get(c.excp))

eltype{T}(::Type{Channel{T}}) = T

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")

type ChannelState{T}
type ChannelIterState{T}
hasval::Bool
val::T
ChannelState(x) = new(x)
ChannelIterState(x) = new(x)
end

start{T}(c::Channel{T}) = ChannelState{T}(false)
function done(c::Channel, state::ChannelState)
start{T}(c::Channel{T}) = ChannelIterState{T}(false)
function done(c::Channel, state::ChannelIterState)
try
# we are waiting either for more data or channel to be closed
state.hasval && return false
Expand Down
Loading