Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make function argument the first in remotecallX and remote_do to ease the use of do block syntax. #13338

Merged
merged 1 commit into from
Sep 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
Julia v0.5.0 Release Notes
==========================

New language features
---------------------

Language changes
----------------

Command line option changes
---------------------------

Compiler/Runtime improvements
-----------------------------

Library improvements
--------------------

Deprecated or removed
---------------------

* The function `remotecall`, `remotecall_fetch`, and `remotecall_wait` now have the
the function argument as the first argument to allow for do-block syntax. [#13338]

Julia v0.4.0 Release Notes
==========================

Expand Down
2 changes: 1 addition & 1 deletion base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ function process_options(opts::JLOptions, args::Vector{UTF8String})
# load file immediately on all processors
if opts.load != C_NULL
@sync for p in procs()
@async remotecall_fetch(p, include, bytestring(opts.load))
@async remotecall_fetch(include, p, bytestring(opts.load))
end
end
# eval expression
Expand Down
8 changes: 8 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,11 @@ end
# 12839
const AsyncStream = IO
deprecate(:AsyncStream)

for f in (:remotecall, :remotecall_fetch, :remotecall_wait)
@eval begin
@deprecate ($f)(w::LocalProcess, f::Function, args...) ($f)(f, w::LocalProcess, args...)
@deprecate ($f)(w::Worker, f::Function, args...) ($f)(f, w::Worker, args...)
@deprecate ($f)(id::Integer, f::Function, args...) ($f)(f, id::Integer, args...)
end
end
8 changes: 4 additions & 4 deletions base/docs/helpdb.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3609,7 +3609,7 @@ Returns the index of the current worker into the `pids` vector, i.e., the list o
indexpids

doc"""
remotecall_wait(id, func, args...)
remotecall_wait(func, id, args...)

Perform `wait(remotecall(...))` in one message.
"""
Expand Down Expand Up @@ -6259,7 +6259,7 @@ Search for the first occurrence of the given characters within the given string.
search

doc"""
remotecall_fetch(id, func, args...)
remotecall_fetch(func, id, args...)

Perform `fetch(remotecall(...))` in one message. Any remote exceptions are captured in a `RemoteException` and thrown.
"""
Expand Down Expand Up @@ -7370,7 +7370,7 @@ Determine whether a `RemoteRef` has a value stored to it. Note that this functio
If the argument `RemoteRef` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `RemoteRef` as a proxy:

rr = RemoteRef()
@async put!(rr, remotecall_fetch(p, long_computation))
@async put!(rr, remotecall_fetch(long_computation, p))
isready(rr) # will not block
"""
isready
Expand Down Expand Up @@ -9490,7 +9490,7 @@ Read all available data on the stream, blocking the task only if no data is avai
readavailable

doc"""
remotecall(id, func, args...)
remotecall(func, id, args...)

Call a function asynchronously on the given arguments on the specified process. Returns a `RemoteRef`.
"""
Expand Down
8 changes: 4 additions & 4 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function find_in_node_path(name, srcpath, node::Int=1)
if myid() == node
find_in_path(name, srcpath)
else
remotecall_fetch(node, find_in_path, name, srcpath)
remotecall_fetch(find_in_path, node, name, srcpath)
end
end

Expand Down Expand Up @@ -67,7 +67,7 @@ function _require_from_serialized(node::Int, mod::Symbol, path_to_try::ByteStrin
if node == myid()
content = open(readbytes, path_to_try)
else
content = remotecall_fetch(node, open, readbytes, path_to_try)
content = remotecall_fetch(open, node, readbytes, path_to_try)
end
restored = _include_from_serialized(content)
if restored !== nothing
Expand All @@ -83,7 +83,7 @@ function _require_from_serialized(node::Int, mod::Symbol, path_to_try::ByteStrin
myid() == 1 && recompile_stale(mod, path_to_try)
restored = ccall(:jl_restore_incremental, Any, (Ptr{UInt8},), path_to_try)
else
content = remotecall_fetch(node, open, readbytes, path_to_try)
content = remotecall_fetch(open, node, readbytes, path_to_try)
restored = _include_from_serialized(content)
end
# otherwise, continue search
Expand Down Expand Up @@ -304,7 +304,7 @@ function include_from_node1(_path::AbstractString)
result = Core.include(path)
nprocs()>1 && sleep(0.005)
else
result = include_string(remotecall_fetch(1, readall, path), path)
result = include_string(remotecall_fetch(readall, 1, path), path)
end
finally
if prev === nothing
Expand Down
2 changes: 1 addition & 1 deletion base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port
end

function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(pid, exit) # For TCP based transports this will result in a close of the socket
remote_do(exit, pid) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
nothing
end
Expand Down
72 changes: 36 additions & 36 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ function flush_gc_msgs(w::Worker)
msgs = copy(w.add_msgs)
if !isempty(msgs)
empty!(w.add_msgs)
remote_do(w, add_clients, msgs)
remote_do(add_clients, w, msgs)
end

msgs = copy(w.del_msgs)
if !isempty(msgs)
empty!(w.del_msgs)
#print("sending delete of $msgs\n")
remote_do(w, del_clients, msgs)
remote_do(del_clients, w, msgs)
end
end

Expand Down Expand Up @@ -293,7 +293,7 @@ get_bind_addr(w::LocalProcess) = LPROC.bind_addr
function get_bind_addr(w::Worker)
if isnull(w.config.bind_addr)
if w.id != myid()
w.config.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id)
w.config.bind_addr = remotecall_fetch(get_bind_addr, w.id, w.id)
end
end
get(w.config.bind_addr)
Expand All @@ -317,7 +317,7 @@ function procs(pid::Integer)
Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)]
end
else
remotecall_fetch(1, procs, pid)
remotecall_fetch(procs, 1, pid)
end
end

Expand Down Expand Up @@ -425,7 +425,7 @@ function deregister_worker(pg, pid)
if PGRP.topology != :all_to_all
for rpid in workers()
try
remote_do(rpid, deregister_worker, pid)
remote_do(deregister_worker, rpid, pid)
catch
end
end
Expand Down Expand Up @@ -494,10 +494,10 @@ function RemoteRef(pid::Integer=myid())
end

function RemoteRef(f::Function, pid::Integer=myid())
remotecall_fetch(pid, (f, rrid) -> begin
remotecall_fetch(pid, f, next_rrid_tuple()) do f, rrid
rv=lookup_ref(rrid, f)
RemoteRef{typeof(rv.c)}(myid(), rrid[1], rrid[2])
end, f, next_rrid_tuple())
end
end

hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h))
Expand All @@ -522,7 +522,7 @@ function isready(rr::RemoteRef, args...)
if rr.where == myid()
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c, args...), rid)
remotecall_fetch(id->isready(lookup_ref(rid).c, args...), rr.where, rid)
end
end

Expand Down Expand Up @@ -698,28 +698,28 @@ function local_remotecall_thunk(f, args)
# f(map(localize_ref,args)...)
end

function remotecall(w::LocalProcess, f, args...)
function remotecall(f, w::LocalProcess, args...)
rr = RemoteRef(w)
schedule_call(rr2id(rr), local_remotecall_thunk(f,args))
rr
end

function remotecall(w::Worker, f, args...)
function remotecall(f, w::Worker, args...)
rr = RemoteRef(w)
#println("$(myid()) asking for $rr")
send_msg(w, CallMsg{:call}(f, args, rr2id(rr)))
rr
end

remotecall(id::Integer, f, args...) = remotecall(worker_from_id(id), f, args...)
remotecall(f, id::Integer, args...) = remotecall(f, worker_from_id(id), args...)

# faster version of fetch(remotecall(...))
function remotecall_fetch(w::LocalProcess, f, args...)
function remotecall_fetch(f, w::LocalProcess, args...)
v=run_work_thunk(local_remotecall_thunk(f,args), false)
isa(v, RemoteException) ? throw(v) : v
end

function remotecall_fetch(w::Worker, f, args...)
function remotecall_fetch(f, w::Worker, args...)
# can be weak, because the program will have no way to refer to the Ref
# itself, it only gets the result.
oid = next_rrid_tuple()
Expand All @@ -731,13 +731,13 @@ function remotecall_fetch(w::Worker, f, args...)
isa(v, RemoteException) ? throw(v) : v
end

remotecall_fetch(id::Integer, f, args...) =
remotecall_fetch(worker_from_id(id), f, args...)
remotecall_fetch(f, id::Integer, args...) =
remotecall_fetch(f, worker_from_id(id), args...)

# faster version of wait(remotecall(...))
remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...))
remotecall_wait(f, w::LocalProcess, args...) = wait(remotecall(f, w, args...))

function remotecall_wait(w::Worker, f, args...)
function remotecall_wait(f, w::Worker, args...)
prid = next_rrid_tuple()
rv = lookup_ref(prid)
rv.waitingfor = w.id
Expand All @@ -748,10 +748,10 @@ function remotecall_wait(w::Worker, f, args...)
rr
end

remotecall_wait(id::Integer, f, args...) =
remotecall_wait(worker_from_id(id), f, args...)
remotecall_wait(f, id::Integer, args...) =
remotecall_wait(f, worker_from_id(id), args...)

function remote_do(w::LocalProcess, f, args...)
function remote_do(f, w::LocalProcess, args...)
# the LocalProcess version just performs in local memory what a worker
# does when it gets a :do message.
# same for other messages on LocalProcess.
Expand All @@ -760,20 +760,20 @@ function remote_do(w::LocalProcess, f, args...)
nothing
end

function remote_do(w::Worker, f, args...)
function remote_do(f, w::Worker, args...)
send_msg(w, RemoteDoMsg(f, args))
nothing
end

remote_do(id::Integer, f, args...) = remote_do(worker_from_id(id), f, args...)
remote_do(f, id::Integer, args...) = remote_do(f, worker_from_id(id), args...)

# have the owner of rr call f on it
function call_on_owner(f, rr::RemoteRef, args...)
rid = rr2id(rr)
if rr.where == myid()
f(rid, args...)
else
remotecall_fetch(rr.where, f, rid, args...)
remotecall_fetch(f, rr.where, rid, args...)
end
end

Expand Down Expand Up @@ -818,7 +818,7 @@ function deliver_result(sock::IO, msg, oid, value)
elseif wid == 1
exit(1)
else
remote_do(1, rmprocs, wid)
remote_do(rmprocs, 1, wid)
end
end
end
Expand Down Expand Up @@ -1125,7 +1125,7 @@ function addprocs(manager::ClusterManager; kwargs...)
# function returns to the caller.
all_w = workers()
for pid in all_w
remote_do(pid, set_valid_processes, all_w)
remote_do(set_valid_processes, pid, all_w)
end

sort!(launched_q)
Expand Down Expand Up @@ -1169,7 +1169,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
exeflags = get(fromconfig.exeflags, ``)
cmd = `$exename $exeflags`

new_addresses = remotecall_fetch(frompid, launch_additional, cnt, cmd)
new_addresses = remotecall_fetch(launch_additional, frompid, cnt, cmd)
for address in new_addresses
(bind_addr, port) = address

Expand All @@ -1183,7 +1183,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
let wconfig=wconfig
@async begin
pid = create_worker(manager, wconfig)
remote_do(frompid, redirect_output_from_additional_worker, pid, port)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
end
end
Expand Down Expand Up @@ -1317,7 +1317,7 @@ let nextidx = 0
end
end

spawnat(p, thunk) = sync_add(remotecall(p, thunk))
spawnat(p, thunk) = sync_add(remotecall(thunk, p))

spawn_somewhere(thunk) = spawnat(chooseproc(thunk),thunk)

Expand All @@ -1335,21 +1335,21 @@ macro fetch(expr)
expr = localize_vars(:(()->($expr)), false)
quote
thunk = $(esc(expr))
remotecall_fetch(chooseproc(thunk), thunk)
remotecall_fetch(thunk, chooseproc(thunk))
end
end

macro fetchfrom(p, expr)
expr = localize_vars(:(()->($expr)), false)
:(remotecall_fetch($(esc(p)), $(esc(expr))))
:(remotecall_fetch($(esc(expr)), $(esc(p))))
end

macro everywhere(ex)
quote
sync_begin()
thunk = ()->(eval(Main,$(Expr(:quote,ex))); nothing)
for pid in workers()
async_run_thunk(()->remotecall_fetch(pid, thunk))
async_run_thunk(()->remotecall_fetch(thunk, pid))
yield() # ensure that the remotecall_fetch has been started
end

Expand All @@ -1365,7 +1365,7 @@ end
function pmap_static(f, lsts...)
np = nprocs()
n = length(lsts[1])
Any[ remotecall(PGRP.workers[(i-1)%np+1].id, f, map(L->L[i], lsts)...) for i = 1:n ]
Any[ remotecall(f, PGRP.workers[(i-1)%np+1].id, map(L->L[i], lsts)...) for i = 1:n ]
end

pmap(f) = f()
Expand Down Expand Up @@ -1427,7 +1427,7 @@ function pmap(f, lsts...; err_retry=true, err_stop=false, pids = workers())
(idx, fvals) = tasklet
busy_workers[pididx] = true
try
results[idx] = remotecall_fetch(wpid, f, fvals...)
results[idx] = remotecall_fetch(f, wpid, fvals...)
catch ex
if err_retry
push!(retryqueue, (idx,fvals, ex))
Expand Down Expand Up @@ -1482,7 +1482,7 @@ function preduce(reducer, f, N::Int)

w_exec = Task[]
for (idx,pid) in enumerate(all_w)
t = Task(()->remotecall_fetch(pid, f, first(chunks[idx]), last(chunks[idx])))
t = Task(()->remotecall_fetch(f, pid, first(chunks[idx]), last(chunks[idx])))
schedule(t)
push!(w_exec, t)
end
Expand Down Expand Up @@ -1625,7 +1625,7 @@ end

function check_same_host(pids)
if myid() != 1
return remotecall_fetch(1, check_same_host, pids)
return remotecall_fetch(check_same_host, 1, pids)
else
# We checkfirst if all test pids have been started using the local manager,
# else we check for the same bind_to addr. This handles the special case
Expand Down Expand Up @@ -1663,5 +1663,5 @@ function getindex(r::RemoteRef, args...)
if r.where == myid()
return getindex(fetch(r), args...)
end
return remotecall_fetch(r.where, getindex, r, args...)
return remotecall_fetch(getindex, r.where, r, args...)
end
Loading