Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoteRefs refer to any type of AbstractChannel #12385

Merged
merged 1 commit into from
Aug 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export
Markdown,

# Types
AbstractChannel,
AbstractMatrix,
AbstractSparseArray,
AbstractSparseMatrix,
Expand Down
127 changes: 48 additions & 79 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ function deregister_worker(pg, pid)

# throw exception to tasks waiting for this pid
for (id,rv) in tonotify
notify_error(rv.full, ProcessExitedException())
notify_error(rv.c, ProcessExitedException())
delete!(pg.refs, id)
end
end
Expand All @@ -476,45 +476,51 @@ type RemoteRef
finalizer(r, send_del_client)
r
end
end

REQ_ID::Int = 0
function RemoteRef(pid::Integer)
rr = RemoteRef(pid, myid(), REQ_ID)
REQ_ID += 1
rr
end
let REF_ID::Int = 1
global next_ref_id
next_ref_id() = (id = REF_ID; REF_ID += 1; id)

global next_rrid_tuple
next_rrid_tuple() = (myid(),next_ref_id())
end

RemoteRef(w::LocalProcess) = RemoteRef(w.id)
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef() = RemoteRef(myid())
RemoteRef(w::LocalProcess) = RemoteRef(w.id)
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef(pid::Integer=myid()) = RemoteRef(pid, myid(), next_ref_id())

global next_id
next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id)
function RemoteRef(f::Function, pid::Integer=myid())
remotecall_fetch(pid, f-> begin
rr = RemoteRef()
lookup_ref(rr2id(rr), f)
rr
end, f)
end

hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::RemoteRef, s::RemoteRef) = (r.whence==s.whence && r.id==s.id)

rr2id(r::RemoteRef) = (r.whence, r.id)

lookup_ref(id) = lookup_ref(PGRP, id)
function lookup_ref(pg, id)
lookup_ref(id, f=def_rv_channel) = lookup_ref(PGRP, id, f)
function lookup_ref(pg, id, f)
rv = get(pg.refs, id, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue()
rv = RemoteValue(f)
pg.refs[id] = rv
push!(rv.clientset, id[1])
end
rv
end

function isready(rr::RemoteRef)
function isready(rr::RemoteRef, args...)
rid = rr2id(rr)
if rr.where == myid()
lookup_ref(rid).done
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(rr.where, id->lookup_ref(id).done, rid)
remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c, args...), rid)
end
end

Expand Down Expand Up @@ -607,38 +613,16 @@ function deserialize(s::SerializationState, t::Type{RemoteRef})
end

# data stored by the owner of a RemoteRef
def_rv_channel() = Channel(1)
type RemoteValue
done::Bool
result
full::Condition # waiting for a value
empty::Condition # waiting for value to be removed
c::AbstractChannel
clientset::IntSet
waitingfor::Int # processor we need to hear from to fill this, or 0

RemoteValue() = new(false, nothing, Condition(), Condition(), IntSet(), 0)
RemoteValue(f::Function) = new(f(), IntSet(), 0)
end

function work_result(rv::RemoteValue)
v = rv.result
if isa(v,WeakRef)
v = v.value
end
v
end

function wait_full(rv::RemoteValue)
while !rv.done
wait(rv.full)
end
return work_result(rv)
end

function wait_empty(rv::RemoteValue)
while rv.done
wait(rv.empty)
end
return nothing
end
wait(rv::RemoteValue) = wait(rv.c)

## core messages: do, call, fetch, wait, ref, put! ##
type RemoteException <: Exception
Expand Down Expand Up @@ -670,7 +654,7 @@ function run_work_thunk(rv::RemoteValue, thunk)
end

function schedule_call(rid, thunk)
rv = RemoteValue()
rv = RemoteValue(def_rv_channel)
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid[1])
schedule(@task(run_work_thunk(rv,thunk)))
Expand Down Expand Up @@ -736,11 +720,11 @@ end
function remotecall_fetch(w::Worker, f, args...)
# can be weak, because the program will have no way to refer to the Ref
# itself, it only gets the result.
oid = next_id()
oid = next_rrid_tuple()
rv = lookup_ref(oid)
rv.waitingfor = w.id
send_msg(w, CallMsg{:call_fetch}(f, args, oid))
v = wait_full(rv)
v = take!(rv)
delete!(PGRP.refs, oid)
isa(v, RemoteException) ? throw(v) : v
end
Expand All @@ -752,12 +736,12 @@ remotecall_fetch(id::Integer, f, args...) =
remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...))

function remotecall_wait(w::Worker, f, args...)
prid = next_id()
prid = next_rrid_tuple()
rv = lookup_ref(prid)
rv.waitingfor = w.id
rr = RemoteRef(w)
send_msg(w, CallWaitMsg(f, args, rr2id(rr), prid))
wait_full(rv)
wait(rv)
delete!(PGRP.refs, prid)
rr
end
Expand Down Expand Up @@ -791,36 +775,25 @@ function call_on_owner(f, rr::RemoteRef, args...)
end
end

wait_ref(rid) = (wait_full(lookup_ref(rid)); nothing)
wait(r::RemoteRef) = (call_on_owner(wait_ref, r); r)
wait_ref(rid, args...) = (wait(lookup_ref(rid).c, args...); nothing)
wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, args...); r)

fetch_ref(rid) = wait_full(lookup_ref(rid))
fetch(r::RemoteRef) = call_on_owner(fetch_ref, r)
fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...)
fetch(x::ANY) = x

# storing a value to a RemoteRef
function put!(rv::RemoteValue, val::ANY)
wait_empty(rv)
rv.result = val
rv.done = true
notify_full(rv)
rv
end
put!(rv::RemoteValue, args...) = put!(rv.c, args...)
put_ref(rid, args...) = put!(lookup_ref(rid), args...)
put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr)

put_ref(rid, v) = put!(lookup_ref(rid), v)
put!(rr::RemoteRef, val::ANY) = (call_on_owner(put_ref, rr, val); rr)
take!(rv::RemoteValue, args...) = take!(rv.c, args...)
take_ref(rid, args...) = take!(lookup_ref(rid), args...)
take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, args...)

function take!(rv::RemoteValue)
wait_full(rv)
val = rv.result
rv.done = false
rv.result = nothing
notify_empty(rv)
val
end
close_ref(rid) = (close(lookup_ref(rid).c); nothing)
close(rr::RemoteRef) = call_on_owner(close_ref, rr)

take_ref(rid) = take!(lookup_ref(rid))
take!(rr::RemoteRef) = call_on_owner(take_ref, rr)

function deliver_result(sock::IO, msg, oid, value)
#print("$(myid()) sending result $oid\n")
Expand Down Expand Up @@ -848,10 +821,6 @@ function deliver_result(sock::IO, msg, oid, value)
end
end

# notify waiters that a certain job has finished or RemoteRef has been emptied
notify_full( rv::RemoteValue) = notify(rv.full, work_result(rv))
notify_empty(rv::RemoteValue) = notify(rv.empty)

## message event handlers ##
process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = @schedule process_tcp_streams(r_stream, w_stream)

Expand Down Expand Up @@ -926,7 +895,7 @@ end
function handle_msg(msg::CallWaitMsg, r_stream, w_stream)
@schedule begin
rv = schedule_call(msg.response_oid, ()->msg.f(msg.args...))
deliver_result(w_stream, :call_wait, msg.notify_oid, wait_full(rv))
deliver_result(w_stream, :call_wait, msg.notify_oid, wait(rv))
end
end

Expand Down Expand Up @@ -1234,7 +1203,7 @@ function create_worker(manager, wconfig)
finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end)

# set when the new worker has finshed connections with all other workers
ntfy_oid = next_id()
ntfy_oid = next_rrid_tuple()
rr_ntfy_join = lookup_ref(ntfy_oid)
rr_ntfy_join.waitingfor = myid()

Expand Down Expand Up @@ -1286,7 +1255,7 @@ function create_worker(manager, wconfig)
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology))

@schedule manage(w.manager, w.id, w.config, :register)
wait_full(rr_ntfy_join)
wait(rr_ntfy_join)
delete!(PGRP.refs, ntfy_oid)

w.id
Expand Down
7 changes: 0 additions & 7 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,6 @@ precompile(Base.normpath, (ASCIIString,))
precompile(Base.normpath, (UTF8String, UTF8String))
precompile(Base.normpath, (UTF8String,))
precompile(Base.notify, (Condition, Any))
precompile(Base.notify_empty, (Base.RemoteValue,))
precompile(Base.notify_full, (Base.RemoteValue,))
precompile(Base.open, (ASCIIString, ASCIIString))
precompile(Base.parse_input_line, (ASCIIString,))
precompile(Base.parse, (Type{Int}, ASCIIString, Int))
Expand Down Expand Up @@ -426,9 +424,6 @@ precompile(Base.uvfinalize, (Base.TTY,))
precompile(Base.vcat, (Base.LineEdit.Prompt,))
precompile(Base.wait, ())
precompile(Base.wait, (RemoteRef,))
precompile(Base.wait_empty, (Base.RemoteValue,))
precompile(Base.wait_full, (Base.RemoteValue,))
precompile(Base.work_result, (Base.RemoteValue,))
precompile(Base.write, (Base.Terminals.TTYTerminal, ASCIIString))
precompile(Base.write, (Base.Terminals.TerminalBuffer, ASCIIString))
precompile(Base.write, (IOBuffer, Vector{UInt8}))
Expand All @@ -446,7 +441,6 @@ precompile(Base.Sort.sort!, (Array{Any,1},))
precompile(Base.Sort.sort!, (Array{VersionNumber, 1}, Int, Int, Base.Sort.InsertionSortAlg, Base.Order.ForwardOrdering))
precompile(Base.info, (ASCIIString,))
precompile(Base.isempty, (Array{Void, 1},))
precompile(Base.setindex!, (Dict{Any, Any}, Base.RemoteValue, (Int, Int)))
precompile(Base.setindex!, (Dict{ByteString, VersionNumber}, VersionNumber, ASCIIString))
precompile(Base.spawn, (Cmd, (Base.TTY, Base.TTY, Base.TTY), Bool, Bool))
precompile(Base.spawn, (Cmd,))
Expand All @@ -470,7 +464,6 @@ precompile(Base.LineEdit.init_state, (Base.Terminals.TTYTerminal, Base.LineEdit.
precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.LineEdit.PrefixSearchState, Base.LineEdit.PrefixHistoryPrompt{Base.REPL.REPLHistoryProvider}))
precompile(Base.take_ref, (Tuple{Int64, Int64},))
precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool))
precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.RemoteValue, Tuple{Int64, Int64}))
precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState))
precompile(Base.schedule, (Array{Any, 1}, Task, Void))
precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any}))
Expand Down
24 changes: 24 additions & 0 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,30 @@ variable takes on all values added to the channel. An empty, closed channel
causes the ``for`` loop to terminate.


RemoteRefs and AbstractChannels
-------------------------------

A ``RemoteRef`` is a proxy for an implementation of an ``AbstractChannel``

A concrete implementation of an ``AbstractChannel`` (like ``Channel``), is required
to implement ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait``. The remote object
referred to by a ``RemoteRef()`` or ``RemoteRef(pid)`` is stored in a ``Channel{Any}(1)``,
i.e., a channel of size 1 capable of holding objects of ``Any`` type.

Methods ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait`` on a ``RemoteRef`` are proxied onto
the backing store on the remote process.

The constructor ``RemoteRef(f::Function, pid)`` allows us to construct references to channels holding
more than one value of a specific type. ``f()`` is a function executed on ``pid`` and it must return
an ``AbstractChannel``.

For example, ``RemoteRef(()->Channel{Int}(10), pid)``, will return a reference to a channel of type ``Int``
and size 10.

``RemoteRef`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple
example of this is provided in ``examples/dictchannel.jl`` which uses a dictionary as its remote store.


Shared Arrays
-------------

Expand Down
57 changes: 57 additions & 0 deletions examples/dictchannel.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

import Base: put!, wait, isready, take!, fetch

type DictChannel <: AbstractChannel
d::Dict
cond_take::Condition # waiting for data to become available
DictChannel() = new(Dict(), Condition())
end

function put!(D::DictChannel, k, v)
D.d[k] = v
notify(D.cond_take)
D
end

function take!(D::DictChannel, k)
v=fetch(D,k)
delete!(D.d, k)
v
end

isready(D::DictChannel) = length(D.d) > 1
isready(D::DictChannel, k) = haskey(D.d,k)
function fetch(D::DictChannel, k)
wait(D,k)
D.d[k]
end

function wait(D::DictChannel, k)
while !isready(D, k)
wait(D.cond_take)
end
end

# Usage:

# RemoteRef to a DictChannel on worker pid
# dc_ref=RemoteRef(()->DictChannel(), pid)

# Test if there is any data
# isready(dc_ref)

# add
# put!(dc_ref, 1, 2)

# Test if key 1 exists
# isready(dc_ref, 1)

# fetch key 1
# fetch(dc_ref, 1)

# fetch and remove key 1
# take!(dc_ref, 1)

# wait for key 3 to be added
# wait(dc_ref, 3)
Loading