Skip to content

Commit

Permalink
Merge pull request #13338 from JuliaLang/anj/remote
Browse files Browse the repository at this point in the history
Make function argument the first in remotecallX and remote_do to ease the use  of do block syntax.
  • Loading branch information
andreasnoack committed Sep 29, 2015
2 parents f71e449 + fbfc811 commit 363585f
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 101 deletions.
24 changes: 24 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
Julia v0.5.0 Release Notes
==========================

New language features
---------------------

Language changes
----------------

Command line option changes
---------------------------

Compiler/Runtime improvements
-----------------------------

Library improvements
--------------------

Deprecated or removed
---------------------

* The function `remotecall`, `remotecall_fetch`, and `remotecall_wait` now have the
the function argument as the first argument to allow for do-block syntax. [#13338]

Julia v0.4.0 Release Notes
==========================

Expand Down
2 changes: 1 addition & 1 deletion base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ function process_options(opts::JLOptions, args::Vector{UTF8String})
# load file immediately on all processors
if opts.load != C_NULL
@sync for p in procs()
@async remotecall_fetch(p, include, bytestring(opts.load))
@async remotecall_fetch(include, p, bytestring(opts.load))
end
end
# eval expression
Expand Down
8 changes: 8 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,11 @@ end
# 12839
const AsyncStream = IO
deprecate(:AsyncStream)

for f in (:remotecall, :remotecall_fetch, :remotecall_wait)
@eval begin
@deprecate ($f)(w::LocalProcess, f::Function, args...) ($f)(f, w::LocalProcess, args...)
@deprecate ($f)(w::Worker, f::Function, args...) ($f)(f, w::Worker, args...)
@deprecate ($f)(id::Integer, f::Function, args...) ($f)(f, id::Integer, args...)
end
end
8 changes: 4 additions & 4 deletions base/docs/helpdb.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3609,7 +3609,7 @@ Returns the index of the current worker into the `pids` vector, i.e., the list o
indexpids

doc"""
remotecall_wait(id, func, args...)
remotecall_wait(func, id, args...)
Perform `wait(remotecall(...))` in one message.
"""
Expand Down Expand Up @@ -6259,7 +6259,7 @@ Search for the first occurrence of the given characters within the given string.
search

doc"""
remotecall_fetch(id, func, args...)
remotecall_fetch(func, id, args...)
Perform `fetch(remotecall(...))` in one message. Any remote exceptions are captured in a `RemoteException` and thrown.
"""
Expand Down Expand Up @@ -7370,7 +7370,7 @@ Determine whether a `RemoteRef` has a value stored to it. Note that this functio
If the argument `RemoteRef` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `RemoteRef` as a proxy:
rr = RemoteRef()
@async put!(rr, remotecall_fetch(p, long_computation))
@async put!(rr, remotecall_fetch(long_computation, p))
isready(rr) # will not block
"""
isready
Expand Down Expand Up @@ -9490,7 +9490,7 @@ Read all available data on the stream, blocking the task only if no data is avai
readavailable

doc"""
remotecall(id, func, args...)
remotecall(func, id, args...)
Call a function asynchronously on the given arguments on the specified process. Returns a `RemoteRef`.
"""
Expand Down
8 changes: 4 additions & 4 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function find_in_node_path(name, srcpath, node::Int=1)
if myid() == node
find_in_path(name, srcpath)
else
remotecall_fetch(node, find_in_path, name, srcpath)
remotecall_fetch(find_in_path, node, name, srcpath)
end
end

Expand Down Expand Up @@ -67,7 +67,7 @@ function _require_from_serialized(node::Int, mod::Symbol, path_to_try::ByteStrin
if node == myid()
content = open(readbytes, path_to_try)
else
content = remotecall_fetch(node, open, readbytes, path_to_try)
content = remotecall_fetch(open, node, readbytes, path_to_try)
end
restored = _include_from_serialized(content)
if restored !== nothing
Expand All @@ -83,7 +83,7 @@ function _require_from_serialized(node::Int, mod::Symbol, path_to_try::ByteStrin
myid() == 1 && recompile_stale(mod, path_to_try)
restored = ccall(:jl_restore_incremental, Any, (Ptr{UInt8},), path_to_try)
else
content = remotecall_fetch(node, open, readbytes, path_to_try)
content = remotecall_fetch(open, node, readbytes, path_to_try)
restored = _include_from_serialized(content)
end
# otherwise, continue search
Expand Down Expand Up @@ -304,7 +304,7 @@ function include_from_node1(_path::AbstractString)
result = Core.include(path)
nprocs()>1 && sleep(0.005)
else
result = include_string(remotecall_fetch(1, readall, path), path)
result = include_string(remotecall_fetch(readall, 1, path), path)
end
finally
if prev === nothing
Expand Down
2 changes: 1 addition & 1 deletion base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port
end

function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(pid, exit) # For TCP based transports this will result in a close of the socket
remote_do(exit, pid) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
nothing
end
Expand Down
72 changes: 36 additions & 36 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ function flush_gc_msgs(w::Worker)
msgs = copy(w.add_msgs)
if !isempty(msgs)
empty!(w.add_msgs)
remote_do(w, add_clients, msgs)
remote_do(add_clients, w, msgs)
end

msgs = copy(w.del_msgs)
if !isempty(msgs)
empty!(w.del_msgs)
#print("sending delete of $msgs\n")
remote_do(w, del_clients, msgs)
remote_do(del_clients, w, msgs)
end
end

Expand Down Expand Up @@ -293,7 +293,7 @@ get_bind_addr(w::LocalProcess) = LPROC.bind_addr
function get_bind_addr(w::Worker)
if isnull(w.config.bind_addr)
if w.id != myid()
w.config.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id)
w.config.bind_addr = remotecall_fetch(get_bind_addr, w.id, w.id)
end
end
get(w.config.bind_addr)
Expand All @@ -317,7 +317,7 @@ function procs(pid::Integer)
Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)]
end
else
remotecall_fetch(1, procs, pid)
remotecall_fetch(procs, 1, pid)
end
end

Expand Down Expand Up @@ -425,7 +425,7 @@ function deregister_worker(pg, pid)
if PGRP.topology != :all_to_all
for rpid in workers()
try
remote_do(rpid, deregister_worker, pid)
remote_do(deregister_worker, rpid, pid)
catch
end
end
Expand Down Expand Up @@ -494,10 +494,10 @@ function RemoteRef(pid::Integer=myid())
end

function RemoteRef(f::Function, pid::Integer=myid())
remotecall_fetch(pid, (f, rrid) -> begin
remotecall_fetch(pid, f, next_rrid_tuple()) do f, rrid
rv=lookup_ref(rrid, f)
RemoteRef{typeof(rv.c)}(myid(), rrid[1], rrid[2])
end, f, next_rrid_tuple())
end
end

hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h))
Expand All @@ -522,7 +522,7 @@ function isready(rr::RemoteRef, args...)
if rr.where == myid()
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c, args...), rid)
remotecall_fetch(id->isready(lookup_ref(rid).c, args...), rr.where, rid)
end
end

Expand Down Expand Up @@ -698,28 +698,28 @@ function local_remotecall_thunk(f, args)
# f(map(localize_ref,args)...)
end

function remotecall(w::LocalProcess, f, args...)
function remotecall(f, w::LocalProcess, args...)
rr = RemoteRef(w)
schedule_call(rr2id(rr), local_remotecall_thunk(f,args))
rr
end

function remotecall(w::Worker, f, args...)
function remotecall(f, w::Worker, args...)
rr = RemoteRef(w)
#println("$(myid()) asking for $rr")
send_msg(w, CallMsg{:call}(f, args, rr2id(rr)))
rr
end

remotecall(id::Integer, f, args...) = remotecall(worker_from_id(id), f, args...)
remotecall(f, id::Integer, args...) = remotecall(f, worker_from_id(id), args...)

# faster version of fetch(remotecall(...))
function remotecall_fetch(w::LocalProcess, f, args...)
function remotecall_fetch(f, w::LocalProcess, args...)
v=run_work_thunk(local_remotecall_thunk(f,args), false)
isa(v, RemoteException) ? throw(v) : v
end

function remotecall_fetch(w::Worker, f, args...)
function remotecall_fetch(f, w::Worker, args...)
# can be weak, because the program will have no way to refer to the Ref
# itself, it only gets the result.
oid = next_rrid_tuple()
Expand All @@ -731,13 +731,13 @@ function remotecall_fetch(w::Worker, f, args...)
isa(v, RemoteException) ? throw(v) : v
end

remotecall_fetch(id::Integer, f, args...) =
remotecall_fetch(worker_from_id(id), f, args...)
remotecall_fetch(f, id::Integer, args...) =
remotecall_fetch(f, worker_from_id(id), args...)

# faster version of wait(remotecall(...))
remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...))
remotecall_wait(f, w::LocalProcess, args...) = wait(remotecall(f, w, args...))

function remotecall_wait(w::Worker, f, args...)
function remotecall_wait(f, w::Worker, args...)
prid = next_rrid_tuple()
rv = lookup_ref(prid)
rv.waitingfor = w.id
Expand All @@ -748,10 +748,10 @@ function remotecall_wait(w::Worker, f, args...)
rr
end

remotecall_wait(id::Integer, f, args...) =
remotecall_wait(worker_from_id(id), f, args...)
remotecall_wait(f, id::Integer, args...) =
remotecall_wait(f, worker_from_id(id), args...)

function remote_do(w::LocalProcess, f, args...)
function remote_do(f, w::LocalProcess, args...)
# the LocalProcess version just performs in local memory what a worker
# does when it gets a :do message.
# same for other messages on LocalProcess.
Expand All @@ -760,20 +760,20 @@ function remote_do(w::LocalProcess, f, args...)
nothing
end

function remote_do(w::Worker, f, args...)
function remote_do(f, w::Worker, args...)
send_msg(w, RemoteDoMsg(f, args))
nothing
end

remote_do(id::Integer, f, args...) = remote_do(worker_from_id(id), f, args...)
remote_do(f, id::Integer, args...) = remote_do(f, worker_from_id(id), args...)

# have the owner of rr call f on it
function call_on_owner(f, rr::RemoteRef, args...)
rid = rr2id(rr)
if rr.where == myid()
f(rid, args...)
else
remotecall_fetch(rr.where, f, rid, args...)
remotecall_fetch(f, rr.where, rid, args...)
end
end

Expand Down Expand Up @@ -818,7 +818,7 @@ function deliver_result(sock::IO, msg, oid, value)
elseif wid == 1
exit(1)
else
remote_do(1, rmprocs, wid)
remote_do(rmprocs, 1, wid)
end
end
end
Expand Down Expand Up @@ -1125,7 +1125,7 @@ function addprocs(manager::ClusterManager; kwargs...)
# function returns to the caller.
all_w = workers()
for pid in all_w
remote_do(pid, set_valid_processes, all_w)
remote_do(set_valid_processes, pid, all_w)
end

sort!(launched_q)
Expand Down Expand Up @@ -1169,7 +1169,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
exeflags = get(fromconfig.exeflags, ``)
cmd = `$exename $exeflags`

new_addresses = remotecall_fetch(frompid, launch_additional, cnt, cmd)
new_addresses = remotecall_fetch(launch_additional, frompid, cnt, cmd)
for address in new_addresses
(bind_addr, port) = address

Expand All @@ -1183,7 +1183,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
let wconfig=wconfig
@async begin
pid = create_worker(manager, wconfig)
remote_do(frompid, redirect_output_from_additional_worker, pid, port)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
end
end
Expand Down Expand Up @@ -1317,7 +1317,7 @@ let nextidx = 0
end
end

spawnat(p, thunk) = sync_add(remotecall(p, thunk))
spawnat(p, thunk) = sync_add(remotecall(thunk, p))

spawn_somewhere(thunk) = spawnat(chooseproc(thunk),thunk)

Expand All @@ -1335,21 +1335,21 @@ macro fetch(expr)
expr = localize_vars(:(()->($expr)), false)
quote
thunk = $(esc(expr))
remotecall_fetch(chooseproc(thunk), thunk)
remotecall_fetch(thunk, chooseproc(thunk))
end
end

macro fetchfrom(p, expr)
expr = localize_vars(:(()->($expr)), false)
:(remotecall_fetch($(esc(p)), $(esc(expr))))
:(remotecall_fetch($(esc(expr)), $(esc(p))))
end

macro everywhere(ex)
quote
sync_begin()
thunk = ()->(eval(Main,$(Expr(:quote,ex))); nothing)
for pid in workers()
async_run_thunk(()->remotecall_fetch(pid, thunk))
async_run_thunk(()->remotecall_fetch(thunk, pid))
yield() # ensure that the remotecall_fetch has been started
end

Expand All @@ -1365,7 +1365,7 @@ end
function pmap_static(f, lsts...)
np = nprocs()
n = length(lsts[1])
Any[ remotecall(PGRP.workers[(i-1)%np+1].id, f, map(L->L[i], lsts)...) for i = 1:n ]
Any[ remotecall(f, PGRP.workers[(i-1)%np+1].id, map(L->L[i], lsts)...) for i = 1:n ]
end

pmap(f) = f()
Expand Down Expand Up @@ -1427,7 +1427,7 @@ function pmap(f, lsts...; err_retry=true, err_stop=false, pids = workers())
(idx, fvals) = tasklet
busy_workers[pididx] = true
try
results[idx] = remotecall_fetch(wpid, f, fvals...)
results[idx] = remotecall_fetch(f, wpid, fvals...)
catch ex
if err_retry
push!(retryqueue, (idx,fvals, ex))
Expand Down Expand Up @@ -1482,7 +1482,7 @@ function preduce(reducer, f, N::Int)

w_exec = Task[]
for (idx,pid) in enumerate(all_w)
t = Task(()->remotecall_fetch(pid, f, first(chunks[idx]), last(chunks[idx])))
t = Task(()->remotecall_fetch(f, pid, first(chunks[idx]), last(chunks[idx])))
schedule(t)
push!(w_exec, t)
end
Expand Down Expand Up @@ -1625,7 +1625,7 @@ end

function check_same_host(pids)
if myid() != 1
return remotecall_fetch(1, check_same_host, pids)
return remotecall_fetch(check_same_host, 1, pids)
else
# We checkfirst if all test pids have been started using the local manager,
# else we check for the same bind_to addr. This handles the special case
Expand Down Expand Up @@ -1663,5 +1663,5 @@ function getindex(r::RemoteRef, args...)
if r.where == myid()
return getindex(fetch(r), args...)
end
return remotecall_fetch(r.where, getindex, r, args...)
return remotecall_fetch(getindex, r.where, r, args...)
end
Loading

0 comments on commit 363585f

Please sign in to comment.