From 6acf497236c715a5a1b6775cb0eefce7f98b12fe Mon Sep 17 00:00:00 2001 From: Jeff Bezanson Date: Wed, 7 Aug 2019 01:14:13 -0400 Subject: [PATCH] add `FailedTaskException` to propagate backtrace of failed task in `wait` --- NEWS.md | 2 + base/exports.jl | 1 + base/task.jl | 81 +++++++++++++++++---- stdlib/Distributed/src/Distributed.jl | 4 +- stdlib/Distributed/src/clusterserialize.jl | 2 +- stdlib/Distributed/src/macros.jl | 2 +- stdlib/Distributed/test/distributed_exec.jl | 14 ++-- stdlib/Serialization/test/runtests.jl | 4 +- test/channels.jl | 4 +- test/exceptions.jl | 4 +- test/worlds.jl | 3 +- 11 files changed, 89 insertions(+), 32 deletions(-) diff --git a/NEWS.md b/NEWS.md index 98f669ea08eeb..b6172e79763b7 100644 --- a/NEWS.md +++ b/NEWS.md @@ -37,6 +37,8 @@ New library functions Standard library changes ------------------------ +* When `wait` (or `@sync` etc.) is called on a failing `Task`, the exception is propagated as a + `FailedTaskException` wrapping the task ([#32814]). * `Regex` can now be multiplied (`*`) and exponentiated (`^`), like strings ([#23422]). * `Cmd` interpolation (``` `$(x::Cmd) a b c` ``` where) now propagates `x`'s process flags (environment, flags, working directory, etc) if `x` is the first interpolant and errors diff --git a/base/exports.jl b/base/exports.jl index 22b2cca2546e7..d41a3e4a168f6 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -119,6 +119,7 @@ export # Exceptions CapturedException, CompositeException, + FailedTaskException, DimensionMismatch, EOFError, InvalidStateException, diff --git a/base/task.jl b/base/task.jl index 88055d851943e..81550d3a69f14 100644 --- a/base/task.jl +++ b/base/task.jl @@ -56,6 +56,31 @@ function showerror(io::IO, ex::CompositeException) end end +""" + FailedTaskException + +This exception is thrown by a `wait(t)` call when task `t` fails. +`FailedTaskException` wraps the failed task `t`. +""" +struct FailedTaskException <: Exception + task::Task +end + +function showerror(io::IO, ex::FailedTaskException) + stacks = [] + while isa(ex.task.exception, FailedTaskException) + pushfirst!(stacks, ex.task.backtrace) + ex = ex.task.exception + end + println(io, "FailedTaskException:") + showerror(io, ex.task.exception, ex.task.backtrace) + if !isempty(stacks) + for bt in stacks + show_backtrace(io, bt) + end + end +end + function show(io::IO, t::Task) print(io, "Task ($(t.state)) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))") end @@ -204,9 +229,8 @@ function task_local_storage(body::Function, key, val) end end -# NOTE: you can only wait for scheduled tasks -function wait(t::Task) - t === current_task() && error("deadlock detected: cannot wait on current task") +# just wait for a task to be done, no error propagation +function _wait(t::Task) if !istaskdone(t) lock(t.donenotify) try @@ -217,8 +241,14 @@ function wait(t::Task) unlock(t.donenotify) end end + nothing +end + +function wait(t::Task) + t === current_task() && error("deadlock detected: cannot wait on current task") + _wait(t) if istaskfailed(t) - throw(t.exception) + throw(FailedTaskException(t)) end nothing end @@ -240,22 +270,32 @@ end ## lexically-scoped waiting for multiple items function sync_end(refs) - c_ex = CompositeException() + local c_ex + defined = false for r in refs - try - wait(r) - catch - if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r)) - rethrow() + if isa(r, Task) + _wait(r) + if istaskfailed(r) + if !defined + defined = true + c_ex = CompositeException() + end + push!(c_ex, FailedTaskException(r)) end - finally - if isa(r, Task) && istaskfailed(r) - push!(c_ex, CapturedException(task_result(r), r.backtrace)) + else + try + wait(r) + catch e + if !defined + defined = true + c_ex = CompositeException() + end + push!(c_ex, e) end end end - if !isempty(c_ex) + if defined throw(c_ex) end nothing @@ -301,6 +341,17 @@ macro async(expr) end end +# add a wait-able object to the sync pool +macro sync_add(expr) + var = esc(sync_varname) + quote + local ref = $(esc(expr)) + if $(Expr(:isdefined, var)) + push!($var, ref) + end + ref + end +end function register_taskdone_hook(t::Task, hook) tls = get_task_tls(t) @@ -324,7 +375,7 @@ function task_done_hook(t::Task) try if !isempty(donenotify.waitq) handled = true - notify(donenotify, result, true, err) + notify(donenotify) end finally unlock(donenotify) diff --git a/stdlib/Distributed/src/Distributed.jl b/stdlib/Distributed/src/Distributed.jl index c1d6cc5d5ffff..504d21f97406f 100644 --- a/stdlib/Distributed/src/Distributed.jl +++ b/stdlib/Distributed/src/Distributed.jl @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, hash, ==, kill, close, isopen, showerror # imports for use -using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, +using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, @sync_add, VERSION_STRING, binding_module, atexit, julia_exename, julia_cmd, AsyncGenerator, acquire, release, invokelatest, shell_escape_posixly, uv_error, something, notnothing, isbuffered @@ -74,7 +74,7 @@ function _require_callback(mod::Base.PkgId) # broadcast top-level (e.g. from Main) import/using from node 1 (only) @sync for p in procs() p == 1 && continue - @async remotecall_wait(p) do + @sync_add remotecall(p) do Base.require(mod) nothing end diff --git a/stdlib/Distributed/src/clusterserialize.jl b/stdlib/Distributed/src/clusterserialize.jl index e21ac32dc39d6..6bca816687af3 100644 --- a/stdlib/Distributed/src/clusterserialize.jl +++ b/stdlib/Distributed/src/clusterserialize.jl @@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared. """ function clear!(syms, pids=workers(); mod=Main) @sync for p in pids - @async remotecall_wait(clear_impl!, p, syms, mod) + @sync_add remotecall(clear_impl!, p, syms, mod) end end clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod) diff --git a/stdlib/Distributed/src/macros.jl b/stdlib/Distributed/src/macros.jl index e3b40c209a176..d15332a110a53 100644 --- a/stdlib/Distributed/src/macros.jl +++ b/stdlib/Distributed/src/macros.jl @@ -220,7 +220,7 @@ function remotecall_eval(m::Module, procs, ex) if pid == myid() run_locally += 1 else - @async remotecall_wait(Core.eval, pid, m, ex) + @sync_add remotecall(Core.eval, pid, m, ex) end end yield() # ensure that the remotecall_fetch have had a chance to start diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index b76a9414cd558..aa8ebba8dd837 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -420,11 +420,11 @@ try catch ex @test typeof(ex) == CompositeException @test length(ex) == 5 - @test typeof(ex.exceptions[1]) == CapturedException - @test typeof(ex.exceptions[1].ex) == ErrorException + @test typeof(ex.exceptions[1]) == FailedTaskException + @test typeof(ex.exceptions[1].task.exception) == ErrorException # test start, next, and done for (i, i_ex) in enumerate(ex) - @test i == parse(Int, i_ex.ex.msg) + @test i == parse(Int, i_ex.task.exception.msg) end # test showerror err_str = sprint(showerror, ex) @@ -738,7 +738,7 @@ end # full-test let t = @task 42 schedule(t, ErrorException(""), error=true) - @test_throws ErrorException Base.wait(t) + @test_throws FailedTaskException Base.wait(t) end # issue #8207 @@ -964,13 +964,15 @@ let (p, p2) = filter!(p -> p != myid(), procs()) if procs isa Int ex = Any[excpt] else - ex = Any[ (ex::CapturedException).ex for ex in (excpt::CompositeException).exceptions ] + ex = (excpt::CompositeException).exceptions end for (p, ex) in zip(procs, ex) local p if procs isa Int || p != myid() @test (ex::RemoteException).pid == p ex = ((ex::RemoteException).captured::CapturedException).ex + else + ex = (ex::FailedTaskException).task.exception end @test (ex::ErrorException).msg == msg end @@ -1165,7 +1167,7 @@ for (addp_testf, expected_errstr, env) in testruns close(stdout_in) @test isempty(fetch(stdout_txt)) @test isa(ex, CompositeException) - @test ex.exceptions[1].ex.msg == expected_errstr + @test ex.exceptions[1].task.exception.msg == expected_errstr end end diff --git a/stdlib/Serialization/test/runtests.jl b/stdlib/Serialization/test/runtests.jl index dc44c897f8153..119bc85218971 100644 --- a/stdlib/Serialization/test/runtests.jl +++ b/stdlib/Serialization/test/runtests.jl @@ -361,12 +361,12 @@ end struct MyErrorTypeTest <: Exception end create_serialization_stream() do s # user-defined type array t = Task(()->throw(MyErrorTypeTest())) - @test_throws MyErrorTypeTest Base.wait(schedule(t)) + @test_throws FailedTaskException Base.wait(schedule(t)) + @test isa(t.exception, MyErrorTypeTest) serialize(s, t) seek(s, 0) r = deserialize(s) @test r.state == :failed - @test isa(t.exception, MyErrorTypeTest) end # corner case: undefined inside immutable struct diff --git a/test/channels.jl b/test/channels.jl index 29a0ab52631da..ed4030b5b528b 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -323,12 +323,12 @@ end ct = current_task() testerr = ErrorException("expected") @async Base.throwto(t, testerr) - @test try + @test (try Base.wait(t) false catch ex ex - end === testerr + end).task.exception === testerr end @testset "Timer / AsyncCondition triggering and race #12719" begin diff --git a/test/exceptions.jl b/test/exceptions.jl index ee7f7e1b8849a..e47862c8312d1 100644 --- a/test/exceptions.jl +++ b/test/exceptions.jl @@ -274,12 +274,12 @@ end @test catch_stack(t, include_bt=false) == [ErrorException("A"), ErrorException("B")] # Exception stacks for tasks which never get the chance to start t = @task nothing - @test try + @test (try @async Base.throwto(t, ErrorException("expected")) wait(t) catch e e - end == ErrorException("expected") + end).task.exception == ErrorException("expected") @test length(catch_stack(t)) == 1 @test length(catch_stack(t)[1][2]) > 0 # backtrace is nonempty # Exception stacks should not be accessed on concurrently running tasks diff --git a/test/worlds.jl b/test/worlds.jl index a1bda0ff9a4e5..33065f7198b7c 100644 --- a/test/worlds.jl +++ b/test/worlds.jl @@ -139,9 +139,10 @@ h265() = true loc_h265 = "$(@__FILE__):$(@__LINE__() - 1)" @test h265() @test_throws MethodError put_n_take!(h265, ()) -@test_throws MethodError fetch(t265) +@test_throws FailedTaskException fetch(t265) @test istaskdone(t265) let ex = t265.exception + @test ex isa MethodError @test ex.f == h265 @test ex.args == () @test ex.world == wc265