From fbfc811f7023b5f3237506e4ab2d82d65920daeb Mon Sep 17 00:00:00 2001 From: Andreas Noack Date: Sun, 27 Sep 2015 23:50:06 -0400 Subject: [PATCH] Make function argument the first in remotecallX and remote_do to ease the use of do block syntax. --- NEWS.md | 24 ++++++ base/client.jl | 2 +- base/deprecated.jl | 8 ++ base/docs/helpdb.jl | 8 +- base/loading.jl | 8 +- base/managers.jl | 2 +- base/multi.jl | 72 +++++++++--------- base/require.jl | 4 +- base/sharedarray.jl | 39 ++++++---- doc/stdlib/parallel.rst | 8 +- examples/clustermanager/simple/test_simple.jl | 2 +- examples/hpl.jl | 10 +-- test/examples.jl | 5 +- test/netload/memtest.jl | 2 +- test/parallel.jl | 74 +++++++++++++------ test/topology.jl | 10 ++- 16 files changed, 177 insertions(+), 101 deletions(-) diff --git a/NEWS.md b/NEWS.md index b989233f52dd7..ecf85b6cdba40 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,27 @@ +Julia v0.5.0 Release Notes +========================== + +New language features +--------------------- + +Language changes +---------------- + +Command line option changes +--------------------------- + +Compiler/Runtime improvements +----------------------------- + +Library improvements +-------------------- + +Deprecated or removed +--------------------- + + * The function `remotecall`, `remotecall_fetch`, and `remotecall_wait` now have the + the function argument as the first argument to allow for do-block syntax. [#13338] + Julia v0.4.0 Release Notes ========================== diff --git a/base/client.jl b/base/client.jl index 8303bbdb6a204..a83893ee32ccf 100644 --- a/base/client.jl +++ b/base/client.jl @@ -243,7 +243,7 @@ function process_options(opts::JLOptions, args::Vector{UTF8String}) # load file immediately on all processors if opts.load != C_NULL @sync for p in procs() - @async remotecall_fetch(p, include, bytestring(opts.load)) + @async remotecall_fetch(include, p, bytestring(opts.load)) end end # eval expression diff --git a/base/deprecated.jl b/base/deprecated.jl index f72e9ff61e46d..df4a5a144f831 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -834,3 +834,11 @@ end # 12839 const AsyncStream = IO deprecate(:AsyncStream) + +for f in (:remotecall, :remotecall_fetch, :remotecall_wait) + @eval begin + @deprecate ($f)(w::LocalProcess, f::Function, args...) ($f)(f, w::LocalProcess, args...) + @deprecate ($f)(w::Worker, f::Function, args...) ($f)(f, w::Worker, args...) + @deprecate ($f)(id::Integer, f::Function, args...) ($f)(f, id::Integer, args...) + end +end \ No newline at end of file diff --git a/base/docs/helpdb.jl b/base/docs/helpdb.jl index 23dbcac1673c6..9392455e1d9a7 100644 --- a/base/docs/helpdb.jl +++ b/base/docs/helpdb.jl @@ -3609,7 +3609,7 @@ Returns the index of the current worker into the `pids` vector, i.e., the list o indexpids doc""" - remotecall_wait(id, func, args...) + remotecall_wait(func, id, args...) Perform `wait(remotecall(...))` in one message. """ @@ -6259,7 +6259,7 @@ Search for the first occurrence of the given characters within the given string. search doc""" - remotecall_fetch(id, func, args...) + remotecall_fetch(func, id, args...) Perform `fetch(remotecall(...))` in one message. Any remote exceptions are captured in a `RemoteException` and thrown. """ @@ -7370,7 +7370,7 @@ Determine whether a `RemoteRef` has a value stored to it. Note that this functio If the argument `RemoteRef` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `RemoteRef` as a proxy: rr = RemoteRef() - @async put!(rr, remotecall_fetch(p, long_computation)) + @async put!(rr, remotecall_fetch(long_computation, p)) isready(rr) # will not block """ isready @@ -9490,7 +9490,7 @@ Read all available data on the stream, blocking the task only if no data is avai readavailable doc""" - remotecall(id, func, args...) + remotecall(func, id, args...) Call a function asynchronously on the given arguments on the specified process. Returns a `RemoteRef`. """ diff --git a/base/loading.jl b/base/loading.jl index fd6ab8755f89a..b5b05e23b9ef5 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -30,7 +30,7 @@ function find_in_node_path(name, srcpath, node::Int=1) if myid() == node find_in_path(name, srcpath) else - remotecall_fetch(node, find_in_path, name, srcpath) + remotecall_fetch(find_in_path, node, name, srcpath) end end @@ -67,7 +67,7 @@ function _require_from_serialized(node::Int, mod::Symbol, path_to_try::ByteStrin if node == myid() content = open(readbytes, path_to_try) else - content = remotecall_fetch(node, open, readbytes, path_to_try) + content = remotecall_fetch(open, node, readbytes, path_to_try) end restored = _include_from_serialized(content) if restored !== nothing @@ -83,7 +83,7 @@ function _require_from_serialized(node::Int, mod::Symbol, path_to_try::ByteStrin myid() == 1 && recompile_stale(mod, path_to_try) restored = ccall(:jl_restore_incremental, Any, (Ptr{UInt8},), path_to_try) else - content = remotecall_fetch(node, open, readbytes, path_to_try) + content = remotecall_fetch(open, node, readbytes, path_to_try) restored = _include_from_serialized(content) end # otherwise, continue search @@ -304,7 +304,7 @@ function include_from_node1(_path::AbstractString) result = Core.include(path) nprocs()>1 && sleep(0.005) else - result = include_string(remotecall_fetch(1, readall, path), path) + result = include_string(remotecall_fetch(readall, 1, path), path) end finally if prev === nothing diff --git a/base/managers.jl b/base/managers.jl index 5102eecb70097..4a5a8f9730c72 100644 --- a/base/managers.jl +++ b/base/managers.jl @@ -357,7 +357,7 @@ function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port end function kill(manager::ClusterManager, pid::Int, config::WorkerConfig) - remote_do(pid, exit) # For TCP based transports this will result in a close of the socket + remote_do(exit, pid) # For TCP based transports this will result in a close of the socket # at our end, which will result in a cleanup of the worker. nothing end diff --git a/base/multi.jl b/base/multi.jl index 3378654457d59..5f977eccd0f03 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -185,14 +185,14 @@ function flush_gc_msgs(w::Worker) msgs = copy(w.add_msgs) if !isempty(msgs) empty!(w.add_msgs) - remote_do(w, add_clients, msgs) + remote_do(add_clients, w, msgs) end msgs = copy(w.del_msgs) if !isempty(msgs) empty!(w.del_msgs) #print("sending delete of $msgs\n") - remote_do(w, del_clients, msgs) + remote_do(del_clients, w, msgs) end end @@ -293,7 +293,7 @@ get_bind_addr(w::LocalProcess) = LPROC.bind_addr function get_bind_addr(w::Worker) if isnull(w.config.bind_addr) if w.id != myid() - w.config.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id) + w.config.bind_addr = remotecall_fetch(get_bind_addr, w.id, w.id) end end get(w.config.bind_addr) @@ -317,7 +317,7 @@ function procs(pid::Integer) Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)] end else - remotecall_fetch(1, procs, pid) + remotecall_fetch(procs, 1, pid) end end @@ -425,7 +425,7 @@ function deregister_worker(pg, pid) if PGRP.topology != :all_to_all for rpid in workers() try - remote_do(rpid, deregister_worker, pid) + remote_do(deregister_worker, rpid, pid) catch end end @@ -494,10 +494,10 @@ function RemoteRef(pid::Integer=myid()) end function RemoteRef(f::Function, pid::Integer=myid()) - remotecall_fetch(pid, (f, rrid) -> begin + remotecall_fetch(pid, f, next_rrid_tuple()) do f, rrid rv=lookup_ref(rrid, f) RemoteRef{typeof(rv.c)}(myid(), rrid[1], rrid[2]) - end, f, next_rrid_tuple()) + end end hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) @@ -522,7 +522,7 @@ function isready(rr::RemoteRef, args...) if rr.where == myid() isready(lookup_ref(rid).c, args...) else - remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c, args...), rid) + remotecall_fetch(id->isready(lookup_ref(rid).c, args...), rr.where, rid) end end @@ -698,28 +698,28 @@ function local_remotecall_thunk(f, args) # f(map(localize_ref,args)...) end -function remotecall(w::LocalProcess, f, args...) +function remotecall(f, w::LocalProcess, args...) rr = RemoteRef(w) schedule_call(rr2id(rr), local_remotecall_thunk(f,args)) rr end -function remotecall(w::Worker, f, args...) +function remotecall(f, w::Worker, args...) rr = RemoteRef(w) #println("$(myid()) asking for $rr") send_msg(w, CallMsg{:call}(f, args, rr2id(rr))) rr end -remotecall(id::Integer, f, args...) = remotecall(worker_from_id(id), f, args...) +remotecall(f, id::Integer, args...) = remotecall(f, worker_from_id(id), args...) # faster version of fetch(remotecall(...)) -function remotecall_fetch(w::LocalProcess, f, args...) +function remotecall_fetch(f, w::LocalProcess, args...) v=run_work_thunk(local_remotecall_thunk(f,args), false) isa(v, RemoteException) ? throw(v) : v end -function remotecall_fetch(w::Worker, f, args...) +function remotecall_fetch(f, w::Worker, args...) # can be weak, because the program will have no way to refer to the Ref # itself, it only gets the result. oid = next_rrid_tuple() @@ -731,13 +731,13 @@ function remotecall_fetch(w::Worker, f, args...) isa(v, RemoteException) ? throw(v) : v end -remotecall_fetch(id::Integer, f, args...) = - remotecall_fetch(worker_from_id(id), f, args...) +remotecall_fetch(f, id::Integer, args...) = + remotecall_fetch(f, worker_from_id(id), args...) # faster version of wait(remotecall(...)) -remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...)) +remotecall_wait(f, w::LocalProcess, args...) = wait(remotecall(f, w, args...)) -function remotecall_wait(w::Worker, f, args...) +function remotecall_wait(f, w::Worker, args...) prid = next_rrid_tuple() rv = lookup_ref(prid) rv.waitingfor = w.id @@ -748,10 +748,10 @@ function remotecall_wait(w::Worker, f, args...) rr end -remotecall_wait(id::Integer, f, args...) = - remotecall_wait(worker_from_id(id), f, args...) +remotecall_wait(f, id::Integer, args...) = + remotecall_wait(f, worker_from_id(id), args...) -function remote_do(w::LocalProcess, f, args...) +function remote_do(f, w::LocalProcess, args...) # the LocalProcess version just performs in local memory what a worker # does when it gets a :do message. # same for other messages on LocalProcess. @@ -760,12 +760,12 @@ function remote_do(w::LocalProcess, f, args...) nothing end -function remote_do(w::Worker, f, args...) +function remote_do(f, w::Worker, args...) send_msg(w, RemoteDoMsg(f, args)) nothing end -remote_do(id::Integer, f, args...) = remote_do(worker_from_id(id), f, args...) +remote_do(f, id::Integer, args...) = remote_do(f, worker_from_id(id), args...) # have the owner of rr call f on it function call_on_owner(f, rr::RemoteRef, args...) @@ -773,7 +773,7 @@ function call_on_owner(f, rr::RemoteRef, args...) if rr.where == myid() f(rid, args...) else - remotecall_fetch(rr.where, f, rid, args...) + remotecall_fetch(f, rr.where, rid, args...) end end @@ -818,7 +818,7 @@ function deliver_result(sock::IO, msg, oid, value) elseif wid == 1 exit(1) else - remote_do(1, rmprocs, wid) + remote_do(rmprocs, 1, wid) end end end @@ -1125,7 +1125,7 @@ function addprocs(manager::ClusterManager; kwargs...) # function returns to the caller. all_w = workers() for pid in all_w - remote_do(pid, set_valid_processes, all_w) + remote_do(set_valid_processes, pid, all_w) end sort!(launched_q) @@ -1169,7 +1169,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch exeflags = get(fromconfig.exeflags, ``) cmd = `$exename $exeflags` - new_addresses = remotecall_fetch(frompid, launch_additional, cnt, cmd) + new_addresses = remotecall_fetch(launch_additional, frompid, cnt, cmd) for address in new_addresses (bind_addr, port) = address @@ -1183,7 +1183,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch let wconfig=wconfig @async begin pid = create_worker(manager, wconfig) - remote_do(frompid, redirect_output_from_additional_worker, pid, port) + remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) end end @@ -1317,7 +1317,7 @@ let nextidx = 0 end end -spawnat(p, thunk) = sync_add(remotecall(p, thunk)) +spawnat(p, thunk) = sync_add(remotecall(thunk, p)) spawn_somewhere(thunk) = spawnat(chooseproc(thunk),thunk) @@ -1335,13 +1335,13 @@ macro fetch(expr) expr = localize_vars(:(()->($expr)), false) quote thunk = $(esc(expr)) - remotecall_fetch(chooseproc(thunk), thunk) + remotecall_fetch(thunk, chooseproc(thunk)) end end macro fetchfrom(p, expr) expr = localize_vars(:(()->($expr)), false) - :(remotecall_fetch($(esc(p)), $(esc(expr)))) + :(remotecall_fetch($(esc(expr)), $(esc(p)))) end macro everywhere(ex) @@ -1349,7 +1349,7 @@ macro everywhere(ex) sync_begin() thunk = ()->(eval(Main,$(Expr(:quote,ex))); nothing) for pid in workers() - async_run_thunk(()->remotecall_fetch(pid, thunk)) + async_run_thunk(()->remotecall_fetch(thunk, pid)) yield() # ensure that the remotecall_fetch has been started end @@ -1365,7 +1365,7 @@ end function pmap_static(f, lsts...) np = nprocs() n = length(lsts[1]) - Any[ remotecall(PGRP.workers[(i-1)%np+1].id, f, map(L->L[i], lsts)...) for i = 1:n ] + Any[ remotecall(f, PGRP.workers[(i-1)%np+1].id, map(L->L[i], lsts)...) for i = 1:n ] end pmap(f) = f() @@ -1427,7 +1427,7 @@ function pmap(f, lsts...; err_retry=true, err_stop=false, pids = workers()) (idx, fvals) = tasklet busy_workers[pididx] = true try - results[idx] = remotecall_fetch(wpid, f, fvals...) + results[idx] = remotecall_fetch(f, wpid, fvals...) catch ex if err_retry push!(retryqueue, (idx,fvals, ex)) @@ -1482,7 +1482,7 @@ function preduce(reducer, f, N::Int) w_exec = Task[] for (idx,pid) in enumerate(all_w) - t = Task(()->remotecall_fetch(pid, f, first(chunks[idx]), last(chunks[idx]))) + t = Task(()->remotecall_fetch(f, pid, first(chunks[idx]), last(chunks[idx]))) schedule(t) push!(w_exec, t) end @@ -1625,7 +1625,7 @@ end function check_same_host(pids) if myid() != 1 - return remotecall_fetch(1, check_same_host, pids) + return remotecall_fetch(check_same_host, 1, pids) else # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case @@ -1663,5 +1663,5 @@ function getindex(r::RemoteRef, args...) if r.where == myid() return getindex(fetch(r), args...) end - return remotecall_fetch(r.where, getindex, r, args...) + return remotecall_fetch(getindex, r.where, r, args...) end diff --git a/base/require.jl b/base/require.jl index d8dcff0f9e9cb..d852239225e28 100644 --- a/base/require.jl +++ b/base/require.jl @@ -23,7 +23,7 @@ function find_in_path(name::AbstractString) end find_in_node1_path(name) = myid()==1 ? - find_in_path(name) : remotecall_fetch(1, find_in_path, name) + find_in_path(name) : remotecall_fetch(find_in_path, 1, name) # Store list of files and their load time package_list = (ByteString=>Float64)[] @@ -94,7 +94,7 @@ function include_from_node1(path::AbstractString) result = Core.include(path) nprocs()>1 && sleep(0.005) else - result = include_string(remotecall_fetch(1, readall, path), path) + result = include_string(remotecall_fetch(readall, 1, path), path) end finally if prev == nothing diff --git a/base/sharedarray.jl b/base/sharedarray.jl index ab29e05e8b47b..24b1679bbe855 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -44,14 +44,17 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[]) else # The shared array is created on a remote machine.... shmmem_create_pid = pids[1] - remotecall_fetch(pids[1], () -> begin shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end) + remotecall_fetch(pids[1]) do + shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR) + nothing + end end func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR) refs = Array(RemoteRef, length(pids)) for (i, p) in enumerate(pids) - refs[i] = remotecall(p, func_mapshmem) + refs[i] = remotecall(func_mapshmem, p) end # Wait till all the workers have mapped the segment @@ -64,7 +67,7 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[]) if onlocalhost rc = shm_unlink(shm_seg_name) else - rc = remotecall_fetch(shmmem_create_pid, shm_unlink, shm_seg_name) + rc = remotecall_fetch(shm_unlink, shmmem_create_pid, shm_seg_name) end systemerror("Error unlinking shmem segment " * shm_seg_name, rc != 0) end @@ -85,14 +88,14 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[]) if isa(init, Function) @sync begin for p in pids - @async remotecall_wait(p, init, S) + @async remotecall_wait(init, p, S) end end end finally if shm_seg_name != "" - remotecall_fetch(shmmem_create_pid, shm_unlink, shm_seg_name) + remotecall_fetch(shm_unlink, shmmem_create_pid, shm_seg_name) end end S @@ -108,7 +111,7 @@ function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,In pids, onlocalhost = shared_pids(pids) # If not supplied, determine the appropriate mode - have_file = onlocalhost ? isfile(filename) : remotecall_fetch(pids[1], isfile, filename) + have_file = onlocalhost ? isfile(filename) : remotecall_fetch(isfile, pids[1], filename) if mode == nothing mode = have_file ? "r+" : "w+" end @@ -127,14 +130,20 @@ function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,In local s if onlocalhost s = func_mmap(mode) - refs[1] = remotecall(pids[1], () -> func_mmap(workermode)) + refs[1] = remotecall(pids[1]) do + func_mmap(workermode) + end else - refs[1] = remotecall_wait(pids[1], () -> func_mmap(mode)) + refs[1] = remotecall_wait(pids[1]) do + func_mmap(mode) + end end # Populate the rest of the workers for i = 2:length(pids) - refs[i] = remotecall(pids[i], () -> func_mmap(workermode)) + refs[i] = remotecall(pids[i]) do + func_mmap(workermode) + end end # Wait till all the workers have mapped the segment @@ -157,7 +166,7 @@ function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,In if isa(init, Function) @sync begin for p in pids - @async remotecall_wait(p, init, S) + @async remotecall_wait(init, p, S) end end end @@ -175,7 +184,9 @@ function reshape{T,N}(a::SharedArray{T}, dims::NTuple{N,Int}) (length(a) != prod(dims)) && throw(DimensionMismatch("dimensions must be consistent with array size")) refs = Array(RemoteRef, length(a.pids)) for (i, p) in enumerate(a.pids) - refs[i] = remotecall(p, (r,d)->reshape(fetch(r),d), a.refs[i], dims) + refs[i] = remotecall(p, a.refs[i], dims) do r,d + reshape(fetch(r),d) + end end A = SharedArray{T,N}(dims, a.pids, refs, a.segname) @@ -288,7 +299,7 @@ function fill!(S::SharedArray, v) vT = convert(eltype(S), v) f = S->fill!(S.loc_subarr_1d, vT) @sync for p in procs(S) - @async remotecall_wait(p, f, S) + @async remotecall_wait(f, p, S) end return S end @@ -296,7 +307,7 @@ end function rand!{T}(S::SharedArray{T}) f = S->map!(x->rand(T), S.loc_subarr_1d) @sync for p in procs(S) - @async remotecall_wait(p, f, S) + @async remotecall_wait(f, p, S) end return S end @@ -304,7 +315,7 @@ end function randn!(S::SharedArray) f = S->map!(x->randn(), S.loc_subarr_1d) @sync for p in procs(S) - @async remotecall_wait(p, f, S) + @async remotecall_wait(f, p, S) end return S end diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 6075736808aa8..8a9652ef42c11 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -264,7 +264,7 @@ General Parallel Computing Support If ``err_retry`` is ``true``, it retries a failed application of ``f`` on a different worker. If ``err_stop`` is ``true``, it takes precedence over the value of ``err_retry`` and ``pmap`` stops execution on the first error. -.. function:: remotecall(id, func, args...) +.. function:: remotecall(func, id, args...) .. Docstring generated from Julia source @@ -296,13 +296,13 @@ General Parallel Computing Support * ``RemoteRef``\ : Wait for and get the value of a remote reference. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace. * ``Channel`` : Wait for and get the first available item from the channel. -.. function:: remotecall_wait(id, func, args...) +.. function:: remotecall_wait(func, id, args...) .. Docstring generated from Julia source Perform ``wait(remotecall(...))`` in one message. -.. function:: remotecall_fetch(id, func, args...) +.. function:: remotecall_fetch(func, id, args...) .. Docstring generated from Julia source @@ -343,7 +343,7 @@ General Parallel Computing Support .. code-block:: julia rr = RemoteRef() - @async put!(rr, remotecall_fetch(p, long_computation)) + @async put!(rr, remotecall_fetch(long_computation, p)) isready(rr) # will not block .. function:: close(Channel) diff --git a/examples/clustermanager/simple/test_simple.jl b/examples/clustermanager/simple/test_simple.jl index 42268d58514af..faca7e168fc98 100644 --- a/examples/clustermanager/simple/test_simple.jl +++ b/examples/clustermanager/simple/test_simple.jl @@ -5,7 +5,7 @@ include(cmanpath) npids = addprocs(UnixDomainCM(2)) assert(length(npids) == 2) -test_pids = [remotecall_fetch(x, myid) for x in npids] +test_pids = [remotecall_fetch(myid, x) for x in npids] assert(npids == test_pids) rmprocs(npids; waitfor=1.0) diff --git a/examples/hpl.jl b/examples/hpl.jl index 463da04d76c00..e67c085452716 100644 --- a/examples/hpl.jl +++ b/examples/hpl.jl @@ -251,15 +251,15 @@ function hpl_par2(A::Matrix, b::Vector) for i = 1:nB #println("C=$(convert(Array, C))") ##### ##panel factorization - panel_p = remotecall_fetch(C.pmap[i], panel_factor_par2, C, i, n) + panel_p = remotecall_fetch(panel_factor_par2, C.pmap[i], C, i, n) ## Apply permutation from pivoting for j = (i+1):nB - depend[i,j] = remotecall(C.pmap[j], permute, C, i, j, panel_p, n, false) + depend[i,j] = remotecall(permute, C.pmap[j], C, i, j, panel_p, n, false) end ## Special case for last column if i == nB - depend[nB,nB] = remotecall(C.pmap[nB], permute, C, i, nB+1, panel_p, n, true) + depend[nB,nB] = remotecall(permute, C.pmap[nB], C, i, nB+1, panel_p, n, true) end ##Trailing updates @@ -276,13 +276,13 @@ function hpl_par2(A::Matrix, b::Vector) for j=(i+1):nB dep = depend[i,j] - depend[j,i] = remotecall(C.pmap[j], trailing_update_par2, C, L_II, C_KI, i, j, n, false, dep) + depend[j,i] = remotecall(trailing_update_par2, C.pmap[j], C, L_II, C_KI, i, j, n, false, dep) end ## Special case for last column if i == nB dep = depend[nB,nB] - remotecall_fetch(C.pmap[nB], trailing_update_par2, C, L_II, C_KI, i, nB+1, n, true, dep) + remotecall_fetch(trailing_update_par2, C.pmap[nB], C, L_II, C_KI, i, nB+1, n, true, dep) else #enforce dependencies for nonspecial case for j=(i+1):nB diff --git a/test/examples.jl b/test/examples.jl index 0425fa15aace0..764d0e1d82666 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -55,7 +55,10 @@ include(dc_path) w_set=filter!(x->x != myid(), workers()) pid = length(w_set) > 0 ? w_set[1] : myid() -remotecall_fetch(pid, f->(include(f); nothing), dc_path) +remotecall_fetch(pid, dc_path) do f + include(f) + nothing +end dc=RemoteRef(()->DictChannel(), pid) @test typeof(dc) == RemoteRef{DictChannel} diff --git a/test/netload/memtest.jl b/test/netload/memtest.jl index aea2239a06f18..627b03ea2a215 100644 --- a/test/netload/memtest.jl +++ b/test/netload/memtest.jl @@ -54,7 +54,7 @@ end function mtest_remotecall_fetch() for i in 1:10^5 - remotecall_fetch(1, myid) + remotecall_fetch(myid, 1) end gc() end diff --git a/test/parallel.jl b/test/parallel.jl index e0a15ed78af98..8d5d14cfa9b1a 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -4,7 +4,9 @@ using Base.Test if nworkers() < 3 - remotecall_fetch(1, () -> addprocs(3 - nworkers())) + remotecall_fetch(1) do + addprocs(3 - nworkers()) + end end id_me = myid() @@ -47,7 +49,9 @@ end function check_pids_all(S::SharedArray) pidtested = falses(size(S)) for p in procs(S) - idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], S) + idxes_in_p = remotecall_fetch(p, S) do D + parentindexes(D.loc_subarr_1d)[1] + end @test all(sdata(S)[idxes_in_p] .== p) pidtested[idxes_in_p] = true end @@ -60,18 +64,26 @@ a = convert(Array, d) partsums = Array(Int, length(procs(d))) @sync begin for (i, p) in enumerate(procs(d)) - @async partsums[i] = remotecall_fetch(p, D->sum(D.loc_subarr_1d), d) + @async partsums[i] = remotecall_fetch(p, d) do D + sum(D.loc_subarr_1d) + end end end @test sum(a) == sum(partsums) d = Base.shmem_rand(dims) for p in procs(d) - idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], d) + idxes_in_p = remotecall_fetch(p, d) do D + parentindexes(D.loc_subarr_1d)[1] + end idxf = first(idxes_in_p) idxl = last(idxes_in_p) d[idxf] = Float64(idxf) - rv = remotecall_fetch(p, (D,idxf,idxl) -> begin assert(D[idxf] == Float64(idxf)); D[idxl] = Float64(idxl); D[idxl]; end, d,idxf,idxl) + rv = remotecall_fetch(p, d,idxf,idxl) do D,idxf,idxl + assert(D[idxf] == Float64(idxf)) + D[idxl] = Float64(idxl) + D[idxl] + end @test d[idxl] == rv end @@ -90,7 +102,9 @@ a = rand(dims) d = SharedArray(Int, dims; init = D->fill!(D.loc_subarr_1d, myid())) for p in procs(d) - idxes_in_p = remotecall_fetch(p, D -> parentindexes(D.loc_subarr_1d)[1], d) + idxes_in_p = remotecall_fetch(p, d) do D + parentindexes(D.loc_subarr_1d)[1] + end idxf = first(idxes_in_p) idxl = last(idxes_in_p) @test d[idxf] == p @@ -115,7 +129,9 @@ S = SharedArray(fn, Int, sz) @test length(procs(S)) > 1 @sync begin for p in procs(S) - @async remotecall_wait(p, D->fill!(D.loc_subarr_1d, myid()), S) + @async remotecall_wait(p, S) do D + fill!(D.loc_subarr_1d, myid()) + end end end check_pids_all(S) @@ -187,7 +203,7 @@ s = copy(sdata(d)) ds = deepcopy(d) @test ds == d pids_d = procs(d) -remotecall_fetch(pids_d[findfirst(id->(id != myid()), pids_d)], setindex!, d, 1.0, 1:10) +remotecall_fetch(setindex!, pids_d[findfirst(id->(id != myid()), pids_d)], d, 1.0, 1:10) @test ds != d @test s != d @@ -220,8 +236,8 @@ map!(x->1, d) @test d[1,:] == fill(2, 1, 10) # Boundary cases where length(S) <= length(pids) -@test 2.0 == remotecall_fetch(id_other, D->D[2], Base.shmem_fill(2.0, 2; pids=[id_me, id_other])) -@test 3.0 == remotecall_fetch(id_other, D->D[1], Base.shmem_fill(3.0, 1; pids=[id_me, id_other])) +@test 2.0 == remotecall_fetch(D->D[2], id_other, Base.shmem_fill(2.0, 2; pids=[id_me, id_other])) +@test 3.0 == remotecall_fetch(D->D[1], id_other, Base.shmem_fill(3.0, 1; pids=[id_me, id_other])) # Test @parallel load balancing - all processors should get either M or M+1 # iterations out of the loop range for some M. @@ -265,15 +281,15 @@ end # Testing buffered and unbuffered reads # This large array should write directly to the socket a = ones(10^6) -@test a == remotecall_fetch(id_other, (x)->x, a) +@test a == remotecall_fetch((x)->x, id_other, a) # Not a bitstype, should be buffered s = [randstring() for x in 1:10^5] -@test s == remotecall_fetch(id_other, (x)->x, s) +@test s == remotecall_fetch((x)->x, id_other, s) #large number of small requests num_small_requests = 10000 -@test fill(id_other, num_small_requests) == [remotecall_fetch(id_other, myid) for i in 1:num_small_requests] +@test fill(id_other, num_small_requests) == [remotecall_fetch(myid, id_other) for i in 1:num_small_requests] # test parallel sends of large arrays from multiple tasks to the same remote worker ntasks = 10 @@ -283,7 +299,7 @@ for rr in rr_list @async let rr=rr try for i in 1:10 - @test a == remotecall_fetch(id_other, (x)->x, a) + @test a == remotecall_fetch((x)->x, id_other, a) yield() end put!(rr, :OK) @@ -377,7 +393,7 @@ catch ex end try - remotecall_fetch(id_other, ()->throw(ErrorException("foobar"))) + remotecall_fetch(()->throw(ErrorException("foobar")), id_other) catch ex @test typeof(ex) == RemoteException @test typeof(ex.captured) == CapturedException @@ -396,7 +412,7 @@ DoFullTest = Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0")))) if DoFullTest # pmap tests # needs at least 4 processors dedicated to the below tests - ppids = remotecall_fetch(1, ()->addprocs(4)) + ppids = remotecall_fetch(()->addprocs(4), 1) s = "abcdefghijklmnopqrstuvwxyz"; ups = uppercase(s); @test ups == bytestring(UInt8[UInt8(c) for c in pmap(x->uppercase(x), s)]) @@ -438,13 +454,15 @@ if DoFullTest println("Testing exception printing on remote worker from a `remote_do` call") println("Please ensure the remote error and backtrace is displayed on screen") - Base.remote_do(id_other, ()->throw(ErrorException("TESTING EXCEPTION ON REMOTE DO. PLEASE IGNORE"))) + Base.remote_do(id_other) do + throw(ErrorException("TESTING EXCEPTION ON REMOTE DO. PLEASE IGNORE")) + end sleep(0.5) # Give some time for the above error to be printed @unix_only begin function test_n_remove_pids(new_pids) for p in new_pids - w_in_remote = sort(remotecall_fetch(p, workers)) + w_in_remote = sort(remotecall_fetch(workers, p)) try @test intersect(new_pids, w_in_remote) == new_pids catch e @@ -456,7 +474,9 @@ if DoFullTest end end - @test :ok == remotecall_fetch(1, (p)->rmprocs(p; waitfor=5.0), new_pids) + @test :ok == remotecall_fetch(1, new_pids) do p + rmprocs(p; waitfor=5.0) + end end print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n") @@ -473,22 +493,30 @@ if DoFullTest end print("\nTesting SSH addprocs with $(length(hosts)) workers...\n") - new_pids = remotecall_fetch(1, (h, sf) -> addprocs(h; sshflags=sf), hosts, sshflags) + new_pids = remotecall_fetch(1, hosts, sshflags) do h, sf + addprocs(h; sshflags=sf) + end @test length(new_pids) == length(hosts) test_n_remove_pids(new_pids) print("\nMixed ssh addprocs with :auto\n") - new_pids = sort(remotecall_fetch(1, (h, sf) -> addprocs(h; sshflags=sf), ["localhost", ("127.0.0.1", :auto), "localhost"], sshflags)) + new_pids = sort(remotecall_fetch(1, ["localhost", ("127.0.0.1", :auto), "localhost"], sshflags) do h, sf + addprocs(h; sshflags=sf) + end) @test length(new_pids) == (2 + Sys.CPU_CORES) test_n_remove_pids(new_pids) print("\nMixed ssh addprocs with numeric counts\n") - new_pids = sort(remotecall_fetch(1, (h, sf) -> addprocs(h; sshflags=sf), [("localhost", 2), ("127.0.0.1", 2), "localhost"], sshflags)) + new_pids = sort(remotecall_fetch(1, [("localhost", 2), ("127.0.0.1", 2), "localhost"], sshflags) do h, sf + addprocs(h; sshflags=sf) + end) @test length(new_pids) == 5 test_n_remove_pids(new_pids) print("\nssh addprocs with tunnel\n") - new_pids = sort(remotecall_fetch(1, (h, sf) -> addprocs(h; tunnel=true, sshflags=sf), [("localhost", num_workers)], sshflags)) + new_pids = sort(remotecall_fetch(1, [("localhost", num_workers)], sshflags) do h, sf + addprocs(h; tunnel=true, sshflags=sf) + end) @test length(new_pids) == num_workers test_n_remove_pids(new_pids) diff --git a/test/topology.jl b/test/topology.jl index 70a5e9792060c..0c3015c33901c 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -2,7 +2,7 @@ include("testdefs.jl") addprocs(4; topology="master_slave") -@test_throws RemoteException remotecall_fetch(2, ()->remotecall_fetch(3,myid)) +@test_throws RemoteException remotecall_fetch(()->remotecall_fetch(3,myid), 2) function test_worker_counts() # check if the nprocs/nworkers/workers are the same on the remaining workers @@ -11,7 +11,9 @@ function test_worker_counts() ws=sort(workers()) for p in workers() - @test (true, true, true) == remotecall_fetch(p, (x,y,z)->(x==nprocs(), y==nworkers(), z==sort(workers())), np, nw, ws) + @test (true, true, true) == remotecall_fetch(p, np, nw, ws) for (x,y,z) + (x==nprocs(), y==nworkers(), z==sort(workers())) + end end end @@ -75,9 +77,9 @@ for p1 in workers() i1 = map_pid_ident[p1] i2 = map_pid_ident[p2] if (iseven(i1) && iseven(i2)) || (isodd(i1) && isodd(i2)) - @test p2 == remotecall_fetch(p1, p->remotecall_fetch(p,myid), p2) + @test p2 == remotecall_fetch(p->remotecall_fetch(p,myid), p1, p2) else - @test_throws RemoteException remotecall_fetch(p1, p->remotecall_fetch(p,myid), p2) + @test_throws RemoteException remotecall_fetch(p->remotecall_fetch(p,myid), p1, p2) end end end