Skip to content

Commit

Permalink
add TaskFailedException to propagate backtrace of failed task in `w…
Browse files Browse the repository at this point in the history
…ait`
  • Loading branch information
JeffBezanson committed Aug 8, 2019
1 parent 314e355 commit 79bf746
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 36 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ New library functions
Standard library changes
------------------------

* When `wait` (or `@sync`, or `fetch`) is called on a failing `Task`, the exception is propagated as a
`TaskFailedException` wrapping the task.
This makes it possible to see the location of the original failure inside the task (as well as the
location of the `wait` call, as before) ([#32814]).
* `Regex` can now be multiplied (`*`) and exponentiated (`^`), like strings ([#23422]).
* `Cmd` interpolation (``` `$(x::Cmd) a b c` ``` where) now propagates `x`'s process flags
(environment, flags, working directory, etc) if `x` is the first interpolant and errors
Expand Down
4 changes: 2 additions & 2 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Block the current task until some event occurs, depending on the type of the arg
* [`Condition`](@ref): Wait for [`notify`](@ref) on a condition.
* `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process
can be used to determine success or failure.
* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, the
exception is propagated (re-thrown in the task that called `wait`).
* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, a
`TaskFailedException` (which wraps the failed task) is thrown.
* [`RawFD`](@ref): Wait for changes on a file descriptor (see the `FileWatching` package).
If no argument is passed, the task blocks for an undefined period. A task can only be
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export
KeyError,
MissingException,
ProcessFailedException,
TaskFailedException,
SystemError,
StringIndexError,

Expand Down
84 changes: 67 additions & 17 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ function showerror(io::IO, ex::CompositeException)
end
end

"""
TaskFailedException
This exception is thrown by a `wait(t)` call when task `t` fails.
`TaskFailedException` wraps the failed task `t`.
"""
struct TaskFailedException <: Exception
task::Task
end

function showerror(io::IO, ex::TaskFailedException)
stacks = []
while isa(ex.task.exception, TaskFailedException)
pushfirst!(stacks, ex.task.backtrace)
ex = ex.task.exception
end
println(io, "TaskFailedException:")
showerror(io, ex.task.exception, ex.task.backtrace)
if !isempty(stacks)
for bt in stacks
show_backtrace(io, bt)
end
end
end

function show(io::IO, t::Task)
print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))")
end
Expand Down Expand Up @@ -204,9 +229,8 @@ function task_local_storage(body::Function, key, val)
end
end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
t === current_task() && error("deadlock detected: cannot wait on current task")
# just wait for a task to be done, no error propagation
function _wait(t::Task)
if !istaskdone(t)
lock(t.donenotify)
try
Expand All @@ -217,8 +241,14 @@ function wait(t::Task)
unlock(t.donenotify)
end
end
nothing
end

function wait(t::Task)
t === current_task() && error("deadlock detected: cannot wait on current task")
_wait(t)
if istaskfailed(t)
throw(t.exception)
throw(TaskFailedException(t))
end
nothing
end
Expand All @@ -228,8 +258,9 @@ fetch(@nospecialize x) = x
"""
fetch(t::Task)
Wait for a Task to finish, then return its result value. If the task fails with an
exception, the exception is propagated (re-thrown in the task that called fetch).
Wait for a Task to finish, then return its result value.
If the task fails with an exception, a `TaskFailedException` (which wraps the failed task)
is thrown.
"""
function fetch(t::Task)
wait(t)
Expand All @@ -240,22 +271,32 @@ end
## lexically-scoped waiting for multiple items

function sync_end(refs)
c_ex = CompositeException()
local c_ex
defined = false
for r in refs
try
wait(r)
catch
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow()
if isa(r, Task)
_wait(r)
if istaskfailed(r)
if !defined
defined = true
c_ex = CompositeException()
end
push!(c_ex, TaskFailedException(r))
end
finally
if isa(r, Task) && istaskfailed(r)
push!(c_ex, CapturedException(task_result(r), r.backtrace))
else
try
wait(r)
catch e
if !defined
defined = true
c_ex = CompositeException()
end
push!(c_ex, e)
end
end
end

if !isempty(c_ex)
if defined
throw(c_ex)
end
nothing
Expand Down Expand Up @@ -301,6 +342,15 @@ macro async(expr)
end
end

# add a wait-able object to the sync pool
macro sync_add(expr)
var = esc(sync_varname)
quote
local ref = $(esc(expr))
push!($var, ref)
ref
end
end

function register_taskdone_hook(t::Task, hook)
tls = get_task_tls(t)
Expand All @@ -324,7 +374,7 @@ function task_done_hook(t::Task)
try
if !isempty(donenotify.waitq)
handled = true
notify(donenotify, result, true, err)
notify(donenotify)
end
finally
unlock(donenotify)
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
hash, ==, kill, close, isopen, showerror

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes,
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, @sync_add,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered
Expand Down Expand Up @@ -74,7 +74,7 @@ function _require_callback(mod::Base.PkgId)
# broadcast top-level (e.g. from Main) import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
@async remotecall_wait(p) do
@sync_add remotecall(p) do
Base.require(mod)
nothing
end
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared.
"""
function clear!(syms, pids=workers(); mod=Main)
@sync for p in pids
@async remotecall_wait(clear_impl!, p, syms, mod)
@sync_add remotecall(clear_impl!, p, syms, mod)
end
end
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ function remotecall_eval(m::Module, procs, ex)
if pid == myid()
run_locally += 1
else
@async remotecall_wait(Core.eval, pid, m, ex)
@sync_add remotecall(Core.eval, pid, m, ex)
end
end
yield() # ensure that the remotecall_fetch have had a chance to start
Expand Down
14 changes: 8 additions & 6 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,11 @@ try
catch ex
@test typeof(ex) == CompositeException
@test length(ex) == 5
@test typeof(ex.exceptions[1]) == CapturedException
@test typeof(ex.exceptions[1].ex) == ErrorException
@test typeof(ex.exceptions[1]) == TaskFailedException
@test typeof(ex.exceptions[1].task.exception) == ErrorException
# test start, next, and done
for (i, i_ex) in enumerate(ex)
@test i == parse(Int, i_ex.ex.msg)
@test i == parse(Int, i_ex.task.exception.msg)
end
# test showerror
err_str = sprint(showerror, ex)
Expand Down Expand Up @@ -738,7 +738,7 @@ end # full-test

let t = @task 42
schedule(t, ErrorException(""), error=true)
@test_throws ErrorException Base.wait(t)
@test_throws TaskFailedException(t) Base.wait(t)
end

# issue #8207
Expand Down Expand Up @@ -964,13 +964,15 @@ let (p, p2) = filter!(p -> p != myid(), procs())
if procs isa Int
ex = Any[excpt]
else
ex = Any[ (ex::CapturedException).ex for ex in (excpt::CompositeException).exceptions ]
ex = (excpt::CompositeException).exceptions
end
for (p, ex) in zip(procs, ex)
local p
if procs isa Int || p != myid()
@test (ex::RemoteException).pid == p
ex = ((ex::RemoteException).captured::CapturedException).ex
else
ex = (ex::TaskFailedException).task.exception
end
@test (ex::ErrorException).msg == msg
end
Expand Down Expand Up @@ -1165,7 +1167,7 @@ for (addp_testf, expected_errstr, env) in testruns
close(stdout_in)
@test isempty(fetch(stdout_txt))
@test isa(ex, CompositeException)
@test ex.exceptions[1].ex.msg == expected_errstr
@test ex.exceptions[1].task.exception.msg == expected_errstr
end
end

Expand Down
4 changes: 2 additions & 2 deletions stdlib/Serialization/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,12 @@ end
struct MyErrorTypeTest <: Exception end
create_serialization_stream() do s # user-defined type array
t = Task(()->throw(MyErrorTypeTest()))
@test_throws MyErrorTypeTest Base.wait(schedule(t))
@test_throws TaskFailedException(t) Base.wait(schedule(t))
@test isa(t.exception, MyErrorTypeTest)
serialize(s, t)
seek(s, 0)
r = deserialize(s)
@test r.state == :failed
@test isa(t.exception, MyErrorTypeTest)
end

# corner case: undefined inside immutable struct
Expand Down
4 changes: 2 additions & 2 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,12 @@ end
ct = current_task()
testerr = ErrorException("expected")
@async Base.throwto(t, testerr)
@test try
@test (try
Base.wait(t)
false
catch ex
ex
end === testerr
end).task.exception === testerr
end

@testset "Timer / AsyncCondition triggering and race #12719" begin
Expand Down
16 changes: 16 additions & 0 deletions test/errorshow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,19 @@ let buf = IOBuffer()
Base.show_method_candidates(buf, Base.MethodError(sin, Tuple{NoMethodsDefinedHere}))
@test length(take!(buf)) !== 0
end

# pr #32814
let t1 = @async(error(1)),
t2 = @async(wait(t1))
local e
try
wait(t2)
catch e_
e = e_
end
buf = IOBuffer()
showerror(buf, e)
s = String(take!(buf))
@test length(findall("Stacktrace:", s)) == 2
@test occursin("[1] error(::Int", s)
end
4 changes: 2 additions & 2 deletions test/exceptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ end
@test catch_stack(t, include_bt=false) == [ErrorException("A"), ErrorException("B")]
# Exception stacks for tasks which never get the chance to start
t = @task nothing
@test try
@test (try
@async Base.throwto(t, ErrorException("expected"))
wait(t)
catch e
e
end == ErrorException("expected")
end).task.exception == ErrorException("expected")
@test length(catch_stack(t)) == 1
@test length(catch_stack(t)[1][2]) > 0 # backtrace is nonempty
# Exception stacks should not be accessed on concurrently running tasks
Expand Down
3 changes: 2 additions & 1 deletion test/worlds.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ h265() = true
loc_h265 = "$(@__FILE__):$(@__LINE__() - 1)"
@test h265()
@test_throws MethodError put_n_take!(h265, ())
@test_throws MethodError fetch(t265)
@test_throws TaskFailedException(t265) fetch(t265)
@test istaskdone(t265)
let ex = t265.exception
@test ex isa MethodError
@test ex.f == h265
@test ex.args == ()
@test ex.world == wc265
Expand Down

0 comments on commit 79bf746

Please sign in to comment.