diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index c89e6f880..dfd082720 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -55,8 +55,10 @@ MPI.Neighbor_alltoallv! ```@docs MPI.Reduce! MPI.Reduce +MPI.IReduce! MPI.Allreduce! MPI.Allreduce +MPI.IAllreduce! MPI.Scan! MPI.Scan MPI.Exscan! diff --git a/src/collective.jl b/src/collective.jl index 9366fbf26..af951ea08 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -716,6 +716,57 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T} end end +## IReduce + +""" + IReduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + IReduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + +Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and +stores the result in `recvbuf` on the process of rank `root`. + +On non-root processes `recvbuf` is ignored, and can be `nothing`. + +To perform the reduction in place, provide a single buffer `sendrecvbuf`. + +Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. + +# See also +- [`Reduce!`](@ref) the equivalent blocking operation. +- [`IAllreduce!`](@ref) to send reduction to all ranks. +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Ireduce")) +""" +IReduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + IReduce!(sendrecvbuf, op, root, comm, req) +IReduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + IReduce!(sendbuf, recvbuf, op, root, comm, req) + +function IReduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request()) + # int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, + # MPI_Request* req) + API.MPI_Ireduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, root, comm, req) + setbuffer!(req, rbuf) + return req +end + +IReduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + IReduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req) +IReduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + IReduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req) + +# inplace +function IReduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) + if Comm_rank(comm) == root + IReduce!(IN_PLACE, buf, op, root, comm, req) + else + IReduce!(buf, nothing, op, root, comm, req) + end +end + ## Allreduce # mutating @@ -775,6 +826,45 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) = Allreduce(obj::T, op, comm::Comm) where {T} = Allreduce!(Ref(obj), Ref{T}(), op, comm)[] +## IAllreduce + +""" + IAllreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + IAllreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + +Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing +the result in the `recvbuf` of all processes in the group. + +If only one `sendrecvbuf` buffer is provided, then the operation is performed in-place. + +Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. + +# See also +- [`Allreduce!`](@ref) the equivalent blocking operation. +- [`IReduce!`](@ref) to send reduction to a single rank. +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Iallreduce")) +""" +function IAllreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request()) + @assert isnull(req) + # int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, + # MPI_Request* req) + API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req) + setbuffer!(req, rbuf) + return req +end +IAllreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = + IAllreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) +IAllreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = + IAllreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) + +# inplace +IAllreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) = + IAllreduce!(IN_PLACE, rbuf, op, comm, req) + ## Scan # mutating diff --git a/test/mpi_support_test.jl b/test/mpi_support_test.jl new file mode 100644 index 000000000..0c6547cc5 --- /dev/null +++ b/test/mpi_support_test.jl @@ -0,0 +1,28 @@ +include("common.jl") + +MPI.Init() + +# Those MPI calls may be unsupported features (e.g. for GPU backends), and will raise SIGSEGV +# (or a similar signal) when called, which cannot be handled in Julia in a portable way. + +op = ARGS[1] +if op == "IAllreduce" + # IAllreduce is unsupported for CUDA with OpenMPI + UCX + # See https://docs.open-mpi.org/en/main/tuning-apps/networking/cuda.html#which-mpi-apis-do-not-work-with-cuda-aware-ucx + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + req = MPI.IAllreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) + MPI.Wait(req) + +elseif op == "IReduce" + # IAllreduce is unsupported for CUDA with OpenMPI + UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) + MPI.Wait(req) + +else + error("unknown test: $op") +end diff --git a/test/runtests.jl b/test/runtests.jl index 2a2fdd7db..81b125351 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -71,6 +71,19 @@ if Sys.isunix() include("mpiexecjl.jl") end +function is_mpi_operation_supported(mpi_op, n=nprocs) + test_file = joinpath(@__DIR__, "mpi_support_test.jl") + cmd = `$(mpiexec()) -n $n $(Base.julia_cmd()) --startup-file=no $test_file $mpi_op` + supported = success(run(ignorestatus(cmd))) + !supported && @warn "$mpi_op is unsupported with $backend_name" + return supported +end + +if ArrayType != Array # we expect that only GPU backends can have unsupported features + ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("IAllreduce") + ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("IReduce") +end + excludefiles = split(get(ENV,"JULIA_MPI_TEST_EXCLUDE",""),',') testdir = @__DIR__ diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index 6f23a2e45..c4eac32b1 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -13,6 +13,9 @@ else operators = [MPI.SUM, +, (x,y) -> 2x+y-x] end +iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true" + + for T = [Int] for dims = [1, 2, 3] send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims))) @@ -43,6 +46,23 @@ for T = [Int] vals = MPI.Allreduce(send_arr, op, MPI.COMM_WORLD) @test vals isa ArrayType{T} @test vals == comm_size .* send_arr + + # Nonblocking + recv_arr = ArrayType{T}(undef, size(send_arr)) + if iallreduce_supported + req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + end + + # Nonblocking (IN_PLACE) + recv_arr = copy(send_arr) + synchronize() + if iallreduce_supported + req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + end end end end diff --git a/test/test_reduce.jl b/test/test_reduce.jl index 6ad5d24cf..9dfd76ec7 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -9,6 +9,8 @@ const can_do_closures = Sys.ARCH !== :aarch64 && !startswith(string(Sys.ARCH), "arm") +ireduce_supported = get(ENV, "JULIA_MPI_TEST_IREDUCE", "true") == "true" + using DoubleFloats MPI.Init() @@ -116,6 +118,26 @@ for T = [Int] @test recv_arr isa ArrayType{T} @test recv_arr == sz .* view(send_arr, 2:3) end + + # Nonblocking + recv_arr = ArrayType{T}(undef, size(send_arr)) + if ireduce_supported + req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if isroot + @test recv_arr == sz .* send_arr + end + end + + # Nonblocking (IN_PLACE) + recv_arr = copy(send_arr) + if ireduce_supported + req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if isroot + @test recv_arr == sz .* send_arr + end + end end end end @@ -131,6 +153,15 @@ else @test result === nothing end +recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing +if ireduce_supported + req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if rank == root + @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) + end +end + MPI.Barrier( MPI.COMM_WORLD ) GC.gc()