Skip to content

Add MPI integration to datadeps #586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 45 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bfe698a
Backup commit
fda-tome Aug 27, 2024
f14b505
DArray: Finished distribute and collect for MPI
fda-tome Sep 30, 2024
4437e22
Changes to accomodate new python compatibility interface
fda-tome Oct 1, 2024
bc1d9e5
Update for benchmarks
fda-tome Aug 30, 2024
f425e08
Collect and Distribute changes
fda-tome Sep 17, 2024
360a229
fixups on prints and processors
fda-tome Sep 18, 2024
26bbac2
DArray: Finished distribute and collect for MPI
fda-tome Sep 30, 2024
4fad091
dagdebug: Add JULIA_DAGGER_DEBUG config variable
jpsamaroo Oct 2, 2024
d6960ce
datadeps: Use short_name to sort procs
jpsamaroo Oct 2, 2024
d50a3fc
fixup! Backup commit
jpsamaroo Oct 2, 2024
fbfe255
chunks/datadeps: Support manual type, pass proc+space to tochunk
jpsamaroo Oct 2, 2024
6c63216
datadeps: Don't skip copy on non-writedep
jpsamaroo Oct 2, 2024
c2d9edc
Minor changes to tasks
fda-tome Oct 3, 2024
4a861ee
Assining MPI_UID to distribute
fda-tome Oct 4, 2024
fbac34f
MPI: Change recv_yield arg order
jpsamaroo Oct 22, 2024
530fbf0
chunks: tochunk checks against input Chunk proc/space
jpsamaroo Oct 22, 2024
4c3f44d
MPI: Add compat entry
jpsamaroo Oct 22, 2024
c1cb77a
datadeps: Simplify remotecall_endpoint logic
jpsamaroo Oct 22, 2024
1cc4af2
datadeps/MPI: Add uniformity checks
jpsamaroo Oct 22, 2024
72177fb
MPI: Always bcast result type/space in execute!
jpsamaroo Oct 22, 2024
36f9c25
mpi: Make check_uniform optional
jpsamaroo Oct 30, 2024
3e91fea
MPI: Check status of non-blocking calls
jpsamaroo Nov 18, 2024
baa69da
DaggerMPI: rebase commit 2
fda-tome Jan 10, 2025
7237f35
DArray: Finished distribute and collect for MPI
fda-tome Sep 30, 2024
0d5eab3
Changes to accomodate new python compatibility interface
fda-tome Oct 1, 2024
db22446
Update for benchmarks
fda-tome Aug 30, 2024
d1579bd
Collect and Distribute changes
fda-tome Sep 17, 2024
54d86bc
fixups on prints and processors
fda-tome Sep 18, 2024
32186c1
DArray: Finished distribute and collect for MPI
fda-tome Sep 30, 2024
fade77e
datadeps: Use short_name to sort procs
jpsamaroo Oct 2, 2024
65b5970
fixup! Backup commit
jpsamaroo Oct 2, 2024
46dea0f
chunks/datadeps: Support manual type, pass proc+space to tochunk
jpsamaroo Oct 2, 2024
b3fbb55
datadeps: Don't skip copy on non-writedep
jpsamaroo Oct 2, 2024
74af569
Minor changes to tasks
fda-tome Oct 3, 2024
29c7f64
Assining MPI_UID to distribute
fda-tome Oct 4, 2024
9862ace
MPI: Change recv_yield arg order
jpsamaroo Oct 22, 2024
9fb1cd3
chunks: tochunk checks against input Chunk proc/space
jpsamaroo Oct 22, 2024
5ce961c
MPI: Add compat entry
jpsamaroo Oct 22, 2024
a933afb
datadeps: Simplify remotecall_endpoint logic
jpsamaroo Oct 22, 2024
49bdda9
datadeps/MPI: Add uniformity checks
jpsamaroo Oct 22, 2024
03934fc
MPI: Always bcast result type/space in execute!
jpsamaroo Oct 22, 2024
05c14db
mpi: Make check_uniform optional
jpsamaroo Oct 30, 2024
50a2dfd
DaggerMPI: pull commit
fda-tome Jan 10, 2025
b4e2611
DaggerMPI: final PR commit
fda-tome Jan 10, 2025
fc5e15f
DaggerMPI: changes to occupancy for execute and copy
fda-tome Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -4,11 +4,14 @@ version = "0.18.14"

[deps]
Adapt = "79e6a3ab-5dfb-504d-930d-738a2a938a0e"
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f"
Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
MPIPreferences = "3da0fdf6-3ccc-4f1b-acd9-58baa6c99267"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
@@ -54,6 +57,7 @@ Distributions = "0.25"
GraphViz = "0.2"
Graphs = "1"
JSON3 = "1"
MPI = "0.20.22"
MacroTools = "0.5"
MemPool = "0.4.11"
OnlineStats = "1"
6 changes: 4 additions & 2 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
@@ -10,9 +10,9 @@ import MemPool: DRef, FileRef, poolget, poolset
import Base: collect, reduce

import LinearAlgebra
import LinearAlgebra: Adjoint, BLAS, Diagonal, Bidiagonal, Tridiagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, UnitLowerTriangular, UnitUpperTriangular, diagind, ishermitian, issymmetric
import Random
import Random: AbstractRNG
import LinearAlgebra: Adjoint, BLAS, Diagonal, Bidiagonal, Tridiagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, UnitLowerTriangular, UnitUpperTriangular, diagind, ishermitian, issymmetric, chkstride1

import UUIDs: UUID, uuid4

@@ -64,8 +64,8 @@ include("utils/scopes.jl")
include("queue.jl")
include("thunk.jl")
include("submission.jl")
include("chunks.jl")
include("memory-spaces.jl")
include("chunks.jl")

# Task scheduling
include("compute.jl")
@@ -125,6 +125,8 @@ function set_distributed_package!(value)
@set_preferences!("distributed-package" => value)
@info "Dagger.jl preference has been set, restart your Julia session for this change to take effect!"
end
# MPI
include("mpi.jl")

# Precompilation
import PrecompileTools: @compile_workload
7 changes: 5 additions & 2 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
@@ -492,9 +492,12 @@ function auto_blocks(dims::Dims{N}) where N
end
auto_blocks(A::AbstractArray{T,N}) where {T,N} = auto_blocks(size(A))

distribute(A::AbstractArray) = distribute(A, AutoBlocks())
distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} =
distribute(A::AbstractArray{T,N}, dist::Blocks{N}, ::DistributedAcceleration) where {T,N} =
_to_darray(Distribute(dist, A))

distribute(A::AbstractArray{T,N}, dist::Blocks{N}) where {T,N} =
distribute(A::AbstractArray{T,N}, dist, current_acceleration())
distribute(A::AbstractArray) = distribute(A, AutoBlocks())
distribute(A::AbstractArray, ::AutoBlocks) = distribute(A, auto_blocks(A))
function distribute(x::AbstractArray{T,N}, n::NTuple{N}) where {T,N}
p = map((d, dn)->ceil(Int, d / dn), size(x), n)
100 changes: 65 additions & 35 deletions src/chunks.jl
Original file line number Diff line number Diff line change
@@ -26,33 +26,6 @@ domain(x::Any) = UnitDomain()

###### Chunk ######

"""
Chunk

A reference to a piece of data located on a remote worker. `Chunk`s are
typically created with `Dagger.tochunk(data)`, and the data can then be
accessed from any worker with `collect(::Chunk)`. `Chunk`s are
serialization-safe, and use distributed refcounting (provided by
`MemPool.DRef`) to ensure that the data referenced by a `Chunk` won't be GC'd,
as long as a reference exists on some worker.

Each `Chunk` is associated with a given `Dagger.Processor`, which is (in a
sense) the processor that "owns" or contains the data. Calling
`collect(::Chunk)` will perform data movement and conversions defined by that
processor to safely serialize the data to the calling worker.

## Constructors
See [`tochunk`](@ref).
"""
mutable struct Chunk{T, H, P<:Processor, S<:AbstractScope}
chunktype::Type{T}
domain
handle::H
processor::P
scope::S
persist::Bool
end

domain(c::Chunk) = c.domain
chunktype(c::Chunk) = c.chunktype
persist!(t::Chunk) = (t.persist=true; t)
@@ -77,7 +50,7 @@ function collect(ctx::Context, chunk::Chunk; options=nothing)
elseif chunk.handle isa FileRef
return poolget(chunk.handle)
else
return move(chunk.processor, OSProc(), chunk.handle)
return move(chunk.processor, default_processor(), chunk.handle)
end
end
collect(ctx::Context, ref::DRef; options=nothing) =
@@ -262,20 +235,75 @@ will be inspected to determine if it's safe to serialize; if so, the default
MemPool storage device will be used; if not, then a `MemPool.CPURAMDevice` will
be used.

`type` can be specified manually to force the type to be `Chunk{type}`.

All other kwargs are passed directly to `MemPool.poolset`.
"""
function tochunk(x::X, proc::P=OSProc(), scope::S=AnyScope(); persist=false, cache=false, device=nothing, kwargs...) where {X,P,S}

tochunk(x::X, proc::P, space::M; kwargs...) where {X,P<:Processor,M<:MemorySpace} =
tochunk(x, proc, space, AnyScope(); kwargs...)
function tochunk(x::X, proc::P, space::M, scope::S; persist=false, cache=false, device=nothing, type=X, kwargs...) where {X,P<:Processor,S,M<:MemorySpace}
if x isa Chunk
check_proc_space(x, proc, space)
return x
end
if device === nothing
device = if Sch.walk_storage_safe(x)
MemPool.GLOBAL_DEVICE[]
else
MemPool.CPURAMDevice()
end
end
ref = poolset(x; device, kwargs...)
Chunk{X,typeof(ref),P,S}(X, domain(x), ref, proc, scope, persist)
ref = tochunk_pset(x, space; device, kwargs...)
return Chunk{type,typeof(ref),P,S,typeof(space)}(type, domain(x), ref, proc, scope, space, persist)
end
tochunk(x::Union{Chunk, Thunk}, proc=nothing, scope=nothing; kwargs...) = x

function tochunk(x::X, proc::P, scope::S; persist=false, cache=false, device=nothing, type=X, kwargs...) where {X,P<:Processor,S}
if device === nothing
device = if Sch.walk_storage_safe(x)
MemPool.GLOBAL_DEVICE[]
else
MemPool.CPURAMDevice()
end
end
if x isa Chunk
check_proc_space(x, proc, x.space)
return x
end
space = default_memory_space(current_acceleration(), x)
ref = tochunk_pset(x, space; device, kwargs...)
return Chunk{type,typeof(ref),P,S,typeof(space)}(type, domain(x), ref, proc, scope, space, persist)
end
function tochunk(x::X, space::M, scope::S; persist=false, cache=false, device=nothing, type=X, kwargs...) where {X,M<:MemorySpace,S}
if device === nothing
device = if Sch.walk_storage_safe(x)
MemPool.GLOBAL_DEVICE[]
else
MemPool.CPURAMDevice()
end
end
if x isa Chunk
check_proc_space(x, x.processor, space)
return x
end
proc = default_processor(current_acceleration(), x)
ref = tochunk_pset(x, space; device, kwargs...)
return Chunk{type,typeof(ref),typeof(proc),S,M}(type, domain(x), ref, proc, scope, space, persist)
end
tochunk(x, procOrSpace; kwargs...) = tochunk(x, procOrSpace, AnyScope(); kwargs...)
tochunk(x; kwargs...) = tochunk(x, default_memory_space(current_acceleration(), x), AnyScope(); kwargs...)

check_proc_space(x, proc, space) = nothing
function check_proc_space(x::Chunk, proc, space)
if x.space !== space
throw(ArgumentError("Memory space mismatch: Chunk=$(x.space) != Requested=$space"))
end
end
function check_proc_space(x::Thunk, proc, space)
# FIXME: Validate
end

tochunk_pset(x, space::MemorySpace; device=nothing, kwargs...) = poolset(x; device, kwargs...)

function savechunk(data, dir, f)
sz = open(joinpath(dir, f), "w") do io
@@ -292,10 +320,12 @@ struct WeakChunk
wid::Int
id::Int
x::WeakRef
function WeakChunk(c::Chunk)
return new(c.handle.owner, c.handle.id, WeakRef(c))
end
end

function WeakChunk(c::Chunk)
return WeakChunk(c.handle.owner, c.handle.id, WeakRef(c))
end

unwrap_weak(c::WeakChunk) = c.x.value
function unwrap_weak_checked(c::WeakChunk)
cw = unwrap_weak(c)
Loading