Skip to content

Commit

Permalink
add TaskFailedException to propagate backtrace of failed task in `w…
Browse files Browse the repository at this point in the history
…ait` (#32814)
  • Loading branch information
JeffBezanson authored Aug 13, 2019
1 parent a56a6e7 commit 44d54fe
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
hash, ==, kill, close, isopen, showerror

# imports for use
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes,
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, @sync_add,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered
Expand Down Expand Up @@ -74,7 +74,7 @@ function _require_callback(mod::Base.PkgId)
# broadcast top-level (e.g. from Main) import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
@async remotecall_wait(p) do
@sync_add remotecall(p) do
Base.require(mod)
nothing
end
Expand Down
2 changes: 1 addition & 1 deletion src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ An exception is raised if a global constant is requested to be cleared.
"""
function clear!(syms, pids=workers(); mod=Main)
@sync for p in pids
@async remotecall_wait(clear_impl!, p, syms, mod)
@sync_add remotecall(clear_impl!, p, syms, mod)
end
end
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
Expand Down
2 changes: 1 addition & 1 deletion src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ function remotecall_eval(m::Module, procs, ex)
if pid == myid()
run_locally += 1
else
@async remotecall_wait(Core.eval, pid, m, ex)
@sync_add remotecall(Core.eval, pid, m, ex)
end
end
yield() # ensure that the remotecall_fetch have had a chance to start
Expand Down
14 changes: 8 additions & 6 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,11 @@ try
catch ex
@test typeof(ex) == CompositeException
@test length(ex) == 5
@test typeof(ex.exceptions[1]) == CapturedException
@test typeof(ex.exceptions[1].ex) == ErrorException
@test typeof(ex.exceptions[1]) == TaskFailedException
@test typeof(ex.exceptions[1].task.exception) == ErrorException
# test start, next, and done
for (i, i_ex) in enumerate(ex)
@test i == parse(Int, i_ex.ex.msg)
@test i == parse(Int, i_ex.task.exception.msg)
end
# test showerror
err_str = sprint(showerror, ex)
Expand Down Expand Up @@ -738,7 +738,7 @@ end # full-test

let t = @task 42
schedule(t, ErrorException(""), error=true)
@test_throws ErrorException Base.wait(t)
@test_throws TaskFailedException(t) Base.wait(t)
end

# issue #8207
Expand Down Expand Up @@ -964,13 +964,15 @@ let (p, p2) = filter!(p -> p != myid(), procs())
if procs isa Int
ex = Any[excpt]
else
ex = Any[ (ex::CapturedException).ex for ex in (excpt::CompositeException).exceptions ]
ex = (excpt::CompositeException).exceptions
end
for (p, ex) in zip(procs, ex)
local p
if procs isa Int || p != myid()
@test (ex::RemoteException).pid == p
ex = ((ex::RemoteException).captured::CapturedException).ex
else
ex = (ex::TaskFailedException).task.exception
end
@test (ex::ErrorException).msg == msg
end
Expand Down Expand Up @@ -1165,7 +1167,7 @@ for (addp_testf, expected_errstr, env) in testruns
close(stdout_in)
@test isempty(fetch(stdout_txt))
@test isa(ex, CompositeException)
@test ex.exceptions[1].ex.msg == expected_errstr
@test ex.exceptions[1].task.exception.msg == expected_errstr
end
end

Expand Down

0 comments on commit 44d54fe

Please sign in to comment.