Skip to content

Commit

Permalink
Merge pull request #554 from JuliaParallel/jps/dtaskfailedexception
Browse files Browse the repository at this point in the history
More `Thunk` to `DTask` renames
  • Loading branch information
jpsamaroo authored Jul 24, 2024
2 parents 52a97dd + 57e2209 commit 5b5f816
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 68 deletions.
8 changes: 4 additions & 4 deletions docs/src/api-dagger/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ addprocs!
rmprocs!
```

## Thunk Execution Environment Functions
## DTask Execution Environment Functions

These functions are used within the function called by a `Thunk`.
These functions are used within the function called by a `DTask`.

```@docs
in_thunk
thunk_processor
in_task
task_processor
```

### Dynamic Scheduler Control Functions
Expand Down
2 changes: 1 addition & 1 deletion docs/src/darray.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ julia> Dagger.chunks(DZ)
DTask (finished) DTask (finished)
julia> Dagger.chunks(fetch(DZ))
2×2 Matrix{Union{Thunk, Dagger.Chunk}}:
2×2 Matrix{Union{DTask, Dagger.Chunk}}:
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(4, 8, 0x0000000000004e20), ThreadProc(4, 1), AnyScope(), true) … Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(2, 5, 0x0000000000004e20), ThreadProc(2, 1), AnyScope(), true)
Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(5, 5, 0x0000000000004e20), ThreadProc(5, 1), AnyScope(), true) Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(3, 3, 0x0000000000004e20), ThreadProc(3, 1), AnyScope(), true)
```
Expand Down
8 changes: 4 additions & 4 deletions docs/src/scopes.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using VideoIO, Distributed

function get_handle()
handle = VideoIO.opencamera()
proc = Dagger.thunk_processor()
proc = Dagger.task_processor()
scope = Dagger.scope(worker=myid()) # constructs a `ProcessScope`
return Dagger.tochunk(handle, proc, scope)
end
Expand Down Expand Up @@ -78,7 +78,7 @@ function generate()
fill!(arr, 1)
Mmap.sync!(arr)
# Note: Dagger.scope() does not yet support node scopes
Dagger.tochunk(path, Dagger.thunk_processor(), NodeScope())
Dagger.tochunk(path, Dagger.task_processor(), NodeScope())
end

function consume(path)
Expand Down Expand Up @@ -120,7 +120,7 @@ function generate_secrets()
secrets = open("/shared/secret_results.txt", "r") do io
String(read(io))
end
Dagger.tochunk(secrets, Dagger.thunk_processor(), secrets_scope)
Dagger.tochunk(secrets, Dagger.task_processor(), secrets_scope)
end

summarize(secrets) = occursin("QA Pass", secrets)
Expand All @@ -144,7 +144,7 @@ constraints). For example:
ps2 = ProcessScope(2)
ps3 = ProcessScope(3)

generate(scope) = Dagger.tochunk(rand(64), Dagger.thunk_processor(), scope)
generate(scope) = Dagger.tochunk(rand(64), Dagger.task_processor(), scope)

d2 = Dagger.@spawn generate(ps2) # Run on process 2
d3 = Dagger.@spawn generate(ps3) # Run on process 3
Expand Down
4 changes: 2 additions & 2 deletions src/array/alloc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ function partition(p::AbstractBlocks, dom::ArrayDomain)
end

function allocate_array(f, T, idx, sz)
new_f = allocate_array_func(thunk_processor(), f)
new_f = allocate_array_func(task_processor(), f)
return new_f(idx, T, sz)
end
function allocate_array(f, T, sz)
new_f = allocate_array_func(thunk_processor(), f)
new_f = allocate_array_func(task_processor(), f)
return new_f(T, sz)
end
allocate_array_func(::Processor, f) = f
Expand Down
6 changes: 3 additions & 3 deletions src/array/cholesky.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
LinearAlgebra.cholcopy(A::DArray{T,2}) where T = copy(A)
function potrf_checked!(uplo, A, info_arr)
_A, info = move(thunk_processor(), LAPACK.potrf!)(uplo, A)
_A, info = move(task_processor(), LAPACK.potrf!)(uplo, A)
if info != 0
fill!(info_arr, info)
throw(PosDefException(info))
Expand Down Expand Up @@ -41,7 +41,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{UpperTriangular}) where T
end
end
catch err
err isa ThunkFailedException || rethrow()
err isa DTaskFailedException || rethrow()
err = Dagger.Sch.unwrap_nested_exception(err.ex)
err isa PosDefException || rethrow()
end
Expand Down Expand Up @@ -82,7 +82,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{LowerTriangular}) where T
end
end
catch err
err isa ThunkFailedException || rethrow()
err isa DTaskFailedException || rethrow()
err = Dagger.Sch.unwrap_nested_exception(err.ex)
err isa PosDefException || rethrow()
end
Expand Down
2 changes: 2 additions & 0 deletions src/dtask.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export DTask

"A future holding the result of a `Thunk`."
struct ThunkFuture
future::Future
Expand Down
4 changes: 2 additions & 2 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import Random: randperm
import Base: @invokelatest

import ..Dagger
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, thunk_processor, constrain, cputhreadtime
import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject
import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime
import ..Dagger: @dagdebug, @safe_lock_spin1
import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek

Expand Down
2 changes: 1 addition & 1 deletion src/sch/dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ end
function Base.fetch(h::SchedulerHandle, id::ThunkID)
future = ThunkFuture(Future(1))
exec!(_register_future!, h, future, id, true)
fetch(future; proc=thunk_processor())
fetch(future; proc=task_processor())
end
"""
Waits on a thunk to complete, and fetches its result. If `check` is set to
Expand Down
4 changes: 2 additions & 2 deletions src/sch/eager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ Allows a thunk to safely wait on another thunk by temporarily reducing its
effective occupancy to 0, which allows a newly-spawned task to run.
"""
function thunk_yield(f)
if Dagger.in_thunk()
if Dagger.in_task()
h = sch_handle()
tls = Dagger.get_tls()
proc = Dagger.thunk_processor()
proc = Dagger.task_processor()
proc_istate = proc_states(tls.sch_uid) do states
states[proc].state
end
Expand Down
6 changes: 5 additions & 1 deletion src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ end
"Marks `thunk` and all dependent thunks as failed."
function set_failed!(state, origin, thunk=origin)
filter!(x->x!==thunk, state.ready)
state.cache[thunk] = ThunkFailedException(thunk, origin, state.cache[origin])
ex = state.cache[origin]
if ex isa RemoteException
ex = ex.captured
end
state.cache[thunk] = DTaskFailedException(thunk, origin, ex)
state.errored[thunk] = true
finish_failed!(state, thunk, origin)
end
Expand Down
2 changes: 1 addition & 1 deletion src/submission.jl
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ end

# Local -> Remote
function eager_submit!(ntasks, uid, future, finalizer_ref, f, args, options)
if Dagger.in_thunk()
if Dagger.in_task()
h = Dagger.sch_handle()
return exec!(eager_submit_internal!, h, ntasks, uid, future, finalizer_ref, f, args, options, true)
elseif myid() != 1
Expand Down
16 changes: 9 additions & 7 deletions src/task-tls.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# In-Thunk Helpers

"""
thunk_processor()
task_processor()
Get the current processor executing the current thunk.
Get the current processor executing the current Dagger task.
"""
thunk_processor() = task_local_storage(:_dagger_processor)::Processor
task_processor() = task_local_storage(:_dagger_processor)::Processor
@deprecate thunk_processor() task_processor()

"""
in_thunk()
in_task()
Returns `true` if currently in a [`Thunk`](@ref) process, else `false`.
Returns `true` if currently executing in a [`DTask`](@ref), else `false`.
"""
in_thunk() = haskey(task_local_storage(), :_dagger_sch_uid)
in_task() = haskey(task_local_storage(), :_dagger_sch_uid)
@deprecate in_thunk() in_task()

"""
get_tls()
Expand All @@ -22,7 +24,7 @@ Gets all Dagger TLS variable as a `NamedTuple`.
get_tls() = (
sch_uid=task_local_storage(:_dagger_sch_uid),
sch_handle=task_local_storage(:_dagger_sch_handle),
processor=thunk_processor(),
processor=task_processor(),
task_spec=task_local_storage(:_dagger_task_spec),
)

Expand Down
27 changes: 14 additions & 13 deletions src/thunk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,23 @@ function Base.convert(::Type{ThunkSummary}, t::WeakThunk)
return t
end

struct ThunkFailedException{E<:Exception} <: Exception
struct DTaskFailedException{E<:Exception} <: Exception
thunk::ThunkSummary
origin::ThunkSummary
ex::E
end
ThunkFailedException(thunk, origin, ex::E) where E =
ThunkFailedException{E}(convert(ThunkSummary, thunk),
DTaskFailedException(thunk, origin, ex::E) where E =
DTaskFailedException{E}(convert(ThunkSummary, thunk),
convert(ThunkSummary, origin),
ex)
function Base.showerror(io::IO, ex::ThunkFailedException)
@deprecate ThunkFailedException DTaskFailedException
function Base.showerror(io::IO, ex::DTaskFailedException)
t = ex.thunk

# Find root-cause thunk
last_tfex = ex
failed_tasks = Union{ThunkSummary,Nothing}[]
while last_tfex.ex isa ThunkFailedException
while last_tfex.ex isa DTaskFailedException
push!(failed_tasks, last_tfex.thunk)
last_tfex = last_tfex.ex
end
Expand All @@ -246,7 +247,7 @@ function Base.showerror(io::IO, ex::ThunkFailedException)
Tinputs = Any[]
for (_, input) in t.inputs
if istask(input)
push!(Tinputs, "Thunk(id=$(input.id))")
push!(Tinputs, "DTask(id=$(input.id))")
else
push!(Tinputs, input)
end
Expand All @@ -256,28 +257,28 @@ function Base.showerror(io::IO, ex::ThunkFailedException)
else
"$(t.f)($(length(Tinputs)) inputs...)"
end
return "Thunk(id=$(t.id), $t_sig)"
return "DTask(id=$(t.id), $t_sig)"
end
t_str = thunk_string(t)
o_str = thunk_string(o)
println(io, "ThunkFailedException:")
println(io, " Root Exception Type: $(typeof(root_ex))")
println(io, "DTaskFailedException:")
println(io, " Root Exception Type: $(typeof(Sch.unwrap_nested_exception(root_ex)))")
println(io, " Root Exception:")
Base.showerror(io, root_ex); println(io)
if t.id !== o.id
println(io, " Root Thunk: $o_str")
println(io, " Root Task: $o_str")
if length(failed_tasks) <= 4
for i in failed_tasks
i_str = thunk_string(i)
println(io, " Inner Thunk: $i_str")
println(io, " Inner Task: $i_str")
end
else
println(io, " ...")
println(io, " $(length(failed_tasks)) Inner Thunks...")
println(io, " $(length(failed_tasks)) Inner Tasks...")
println(io, " ...")
end
end
print(io, " This Thunk: $t_str")
print(io, " This Task: $t_str")
end

"""
Expand Down
2 changes: 1 addition & 1 deletion test/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ function test_datadeps(;args_chunks::Bool,

# Scope
exec_procs = fetch.(Dagger.spawn_datadeps(;aliasing) do
[Dagger.@spawn Dagger.thunk_processor() for i in 1:10]
[Dagger.@spawn Dagger.task_processor() for i in 1:10]
end)
unique!(exec_procs)
scope = Dagger.get_options(:scope)
Expand Down
2 changes: 1 addition & 1 deletion test/mutation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ end
x = Dagger.@mutable worker=w Ref{Int}()
@test fetch(Dagger.@spawn mutable_update!(x)) == w
wo_scope = Dagger.ProcessScope(wo)
@test_throws_unwrap Dagger.ThunkFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x))
@test_throws_unwrap Dagger.DTaskFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x))
end
end # @testset "@mutable"

Expand Down
6 changes: 3 additions & 3 deletions test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ end
end
@testset "Processor exhaustion" begin
opts = ThunkOptions(proclist=[OptOutProc])
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
@test_throws_unwrap Dagger.DTaskFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
opts = ThunkOptions(proclist=(proc)->false)
@test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
@test_throws_unwrap Dagger.DTaskFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3]))
opts = ThunkOptions(proclist=nothing)
@test collect(delayed(sum; options=opts)([1,2,3])) == 6
end
Expand Down Expand Up @@ -89,7 +89,7 @@ end

@testset "Processor TLS accessor" begin
@everywhere function mythunk(x)
typeof(Dagger.thunk_processor())
typeof(Dagger.task_processor())
end
@test collect(delayed(mythunk)(1)) === ThreadProc
end
Expand Down
2 changes: 1 addition & 1 deletion test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ end
@testset "allow errors" begin
opts = ThunkOptions(;allow_errors=true)
a = delayed(error; options=opts)("Test")
@test_throws_unwrap Dagger.ThunkFailedException collect(a)
@test_throws_unwrap Dagger.DTaskFailedException collect(a)
end
end

Expand Down
12 changes: 6 additions & 6 deletions test/scopes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

# Different nodes
for (ch1, ch2) in [(ns1_ch, ns2_ch), (ns2_ch, ns1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Process Scope" begin
Expand All @@ -75,19 +75,19 @@

# Different process
for (ch1, ch2) in [(ps1_ch, ps2_ch), (ps2_ch, ps1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end

# Same process and node
@test fetch(Dagger.@spawn process_scope_test(ps1_ch, ns1_ch)) == wid1

# Different process and node
for (ch1, ch2) in [(ps1_ch, ns2_ch), (ns2_ch, ps1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Exact Scope" begin
@everywhere exact_scope_test(ch...) = Dagger.thunk_processor()
@everywhere exact_scope_test(ch...) = Dagger.task_processor()
@test es1.parent.wid == wid1
@test es1.parent.parent.uuid == Dagger.system_uuid(wid1)
@test es2.parent.wid == wid2
Expand All @@ -104,14 +104,14 @@

# Different process, different processor
for (ch1, ch2) in [(es1_ch, es2_ch), (es2_ch, es1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end

# Same process, different processor
es1_2 = ExactScope(Dagger.ThreadProc(wid1, 2))
es1_2_ch = Dagger.tochunk(nothing, OSProc(), es1_2)
for (ch1, ch2) in [(es1_ch, es1_2_ch), (es1_2_ch, es1_ch)]
@test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
@test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2)
end
end
@testset "Union Scope" begin
Expand Down
Loading

0 comments on commit 5b5f816

Please sign in to comment.