Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IReduce! and IAllreduce! #827

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/reference/collective.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ MPI.Neighbor_alltoallv!
```@docs
MPI.Reduce!
MPI.Reduce
MPI.IReduce!
MPI.Allreduce!
MPI.Allreduce
MPI.IAllreduce!
MPI.Scan!
MPI.Scan
MPI.Exscan!
Expand Down
90 changes: 90 additions & 0 deletions src/collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,57 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T}
end
end

## IReduce

"""
IReduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0)
IReduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0)

Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and
stores the result in `recvbuf` on the process of rank `root`.

On non-root processes `recvbuf` is ignored, and can be `nothing`.

To perform the reduction in place, provide a single buffer `sendrecvbuf`.

Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction.

# See also
- [`Reduce!`](@ref) the equivalent blocking operation.
- [`IAllreduce!`](@ref) to send reduction to all ranks.
- [`Op`](@ref) for details on reduction operators.

# External links
$(_doc_external("MPI_Ireduce"))
"""
IReduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) =
IReduce!(sendrecvbuf, op, root, comm, req)
IReduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) =
IReduce!(sendbuf, recvbuf, op, root, comm, req)

function IReduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request())
# int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm,
# MPI_Request* req)
API.MPI_Ireduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, root, comm, req)
setbuffer!(req, rbuf)
return req
end

IReduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) =
IReduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req)
IReduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) =
IReduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req)

# inplace
function IReduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request())
if Comm_rank(comm) == root
IReduce!(IN_PLACE, buf, op, root, comm, req)
else
IReduce!(buf, nothing, op, root, comm, req)
end
end

## Allreduce

# mutating
Expand Down Expand Up @@ -775,6 +826,45 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) =
Allreduce(obj::T, op, comm::Comm) where {T} =
Allreduce!(Ref(obj), Ref{T}(), op, comm)[]

## IAllreduce

"""
IAllreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()])
IAllreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()])

Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing
the result in the `recvbuf` of all processes in the group.

If only one `sendrecvbuf` buffer is provided, then the operation is performed in-place.

Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction.

# See also
- [`Allreduce!`](@ref) the equivalent blocking operation.
- [`IReduce!`](@ref) to send reduction to a single rank.
- [`Op`](@ref) for details on reduction operators.

# External links
$(_doc_external("MPI_Iallreduce"))
"""
function IAllreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
# int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
# MPI_Request* req)
API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req)
setbuffer!(req, rbuf)
return req
end
IAllreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) =
IAllreduce!(rbuf, Op(op, eltype(rbuf)), comm, req)
IAllreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) =
IAllreduce!(RBuffer(sendbuf, recvbuf), op, comm, req)

# inplace
IAllreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) =
IAllreduce!(IN_PLACE, rbuf, op, comm, req)

## Scan

# mutating
Expand Down
28 changes: 28 additions & 0 deletions test/mpi_support_test.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
include("common.jl")

MPI.Init()

# Those MPI calls may be unsupported features (e.g. for GPU backends), and will raise SIGSEGV
# (or a similar signal) when called, which cannot be handled in Julia in a portable way.

op = ARGS[1]
if op == "IAllreduce"
# IAllreduce is unsupported for CUDA with OpenMPI + UCX
# See https://docs.open-mpi.org/en/main/tuning-apps/networking/cuda.html#which-mpi-apis-do-not-work-with-cuda-aware-ucx
send_arr = ArrayType(zeros(Int, 1))
recv_arr = ArrayType{Int}(undef, 1)
synchronize()
req = MPI.IAllreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD)
MPI.Wait(req)

elseif op == "IReduce"
# IAllreduce is unsupported for CUDA with OpenMPI + UCX
send_arr = ArrayType(zeros(Int, 1))
recv_arr = ArrayType{Int}(undef, 1)
synchronize()
req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0)
MPI.Wait(req)

else
error("unknown test: $op")
end
13 changes: 13 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ if Sys.isunix()
include("mpiexecjl.jl")
end

function is_mpi_operation_supported(mpi_op, n=nprocs)
test_file = joinpath(@__DIR__, "mpi_support_test.jl")
cmd = `$(mpiexec()) -n $n $(Base.julia_cmd()) --startup-file=no $test_file $mpi_op`
supported = success(run(ignorestatus(cmd)))
!supported && @warn "$mpi_op is unsupported with $backend_name"
return supported
end

if ArrayType != Array # we expect that only GPU backends can have unsupported features
ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("IAllreduce")
ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("IReduce")
end

excludefiles = split(get(ENV,"JULIA_MPI_TEST_EXCLUDE",""),',')

testdir = @__DIR__
Expand Down
20 changes: 20 additions & 0 deletions test/test_allreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ else
operators = [MPI.SUM, +, (x,y) -> 2x+y-x]
end

iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true"


for T = [Int]
for dims = [1, 2, 3]
send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims)))
Expand Down Expand Up @@ -43,6 +46,23 @@ for T = [Int]
vals = MPI.Allreduce(send_arr, op, MPI.COMM_WORLD)
@test vals isa ArrayType{T}
@test vals == comm_size .* send_arr

# Nonblocking
recv_arr = ArrayType{T}(undef, size(send_arr))
if iallreduce_supported
req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD)
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr
end

# Nonblocking (IN_PLACE)
recv_arr = copy(send_arr)
synchronize()
if iallreduce_supported
req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD)
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr
end
end
end
end
Expand Down
31 changes: 31 additions & 0 deletions test/test_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const can_do_closures =
Sys.ARCH !== :aarch64 &&
!startswith(string(Sys.ARCH), "arm")

ireduce_supported = get(ENV, "JULIA_MPI_TEST_IREDUCE", "true") == "true"

using DoubleFloats

MPI.Init()
Expand Down Expand Up @@ -116,6 +118,26 @@ for T = [Int]
@test recv_arr isa ArrayType{T}
@test recv_arr == sz .* view(send_arr, 2:3)
end

# Nonblocking
recv_arr = ArrayType{T}(undef, size(send_arr))
if ireduce_supported
req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if isroot
@test recv_arr == sz .* send_arr
end
end

# Nonblocking (IN_PLACE)
recv_arr = copy(send_arr)
if ireduce_supported
req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if isroot
@test recv_arr == sz .* send_arr
end
end
end
end
end
Expand All @@ -131,6 +153,15 @@ else
@test result === nothing
end

recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing
if ireduce_supported
req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if rank == root
@test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64)
end
end

MPI.Barrier( MPI.COMM_WORLD )

GC.gc()
Expand Down
Loading