Skip to content

Commit

Permalink
[Distributed] Worker local race condition between put! and fetch for …
Browse files Browse the repository at this point in the history
…Futures (#42339)

* add local_lock to Future, use it in fetch and put!
* add corrections to the remote/clientref logic
* add memory ordering guarantees
* serialize a (unlocked) copy of the future to avoid problems with the lock

Co-authored-by: Jameson Nash <vtjnash@gmail.com>
Co-authored-by: Takafumi Arakaki <aka.tkf@gmail.com>
  • Loading branch information
3 people authored Dec 3, 2021
1 parent 24afe90 commit 4d17719
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 28 deletions.
121 changes: 93 additions & 28 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
where::Int
whence::Int
id::Int
v::Union{Some{Any}, Nothing}
lock::ReentrantLock
@atomic v::Union{Some{Any}, Nothing}

Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) =
(r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
(r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r))

Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances
end

"""
Expand Down Expand Up @@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef)
found = getkey(client_refs, r, nothing)
if found !== nothing
@assert r.where > 0
if isa(r, Future) && found.v === nothing && r.v !== nothing
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
found.v = r.v
if isa(r, Future)
# this is only for copying the reference from Future to RemoteRef (just created)
fv_cache = @atomic :acquire found.v
rv_cache = @atomic :monotonic r.v
if fv_cache === nothing && rv_cache !== nothing
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
@lock found.lock begin
@atomicreplace found.v nothing => rv_cache
end
end
end
return found::typeof(r)
end
Expand All @@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef)
send_del_client_no_lock(r)
else
# send_del_client only if the reference has not been set
r.v === nothing && send_del_client_no_lock(r)
r.v = nothing
v_cache = @atomic :monotonic r.v
v_cache === nothing && send_del_client_no_lock(r)
@atomic :monotonic r.v = nothing
end
r.where = 0
finally
Expand Down Expand Up @@ -201,7 +210,8 @@ isready(f) # will not block
```
"""
function isready(rr::Future)
rr.v === nothing || return true
v_cache = @atomic rr.v
v_cache === nothing || return true

rid = remoteref_id(rr)
return if rr.where == myid()
Expand Down Expand Up @@ -354,26 +364,33 @@ end

channel_type(rr::RemoteChannel{T}) where {T} = T

serialize(s::ClusterSerializer, f::Future) = serialize(s, f, f.v === nothing)
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
function serialize(s::ClusterSerializer, f::Future)
v_cache = @atomic f.v
if v_cache === nothing
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
(p !== f.where) && send_add_client(f, p)
end
fc = Future((f.where, f.whence, f.id, v_cache)) # copy to be used for serialization (contains a reset lock)
invoke(serialize, Tuple{ClusterSerializer, Any}, s, fc)
end

function serialize(s::ClusterSerializer, rr::RemoteChannel)
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
end

function deserialize(s::ClusterSerializer, t::Type{<:Future})
f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
f2 = Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy
f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table

# 1) send_add_client() is not executed when the ref is being serialized
# to where it exists, hence do it here.
# 2) If we have received a 'fetch'ed Future or if the Future ctor found an
# already 'fetch'ed instance in client_refs (Issue #25847), we should not
# track it in the backing RemoteValue store.
if f2.where == myid() && f2.v === nothing
f2v_cache = @atomic f2.v
if f2.where == myid() && f2v_cache === nothing
add_client(remoteref_id(f2), myid())
end
f2
Expand Down Expand Up @@ -567,7 +584,7 @@ end
Wait for a value to become available for the specified [`Future`](@ref).
"""
wait(r::Future) = (r.v !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)
wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)

"""
wait(r::RemoteChannel, args...)
Expand All @@ -584,11 +601,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
"""
function fetch(r::Future)
r.v !== nothing && return something(r.v)
v = call_on_owner(fetch_ref, r)
r.v = Some(v)
v_cache = @atomic r.v
v_cache !== nothing && return something(v_cache)

if r.where == myid()
rv, v_cache = @lock r.lock begin
v_cache = @atomic :monotonic r.v
rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing
rv, v_cache
end

if v_cache !== nothing
return something(v_cache)
else
v_local = fetch(rv.c)
end
else
v_local = call_on_owner(fetch_ref, r)
end

v_cache = @atomic r.v

if v_cache === nothing # call_on_owner case
v_old, status = @lock r.lock begin
@atomicreplace r.v nothing => Some(v_local)
end
# status == true - when value obtained through call_on_owner
# status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
# why? local put! performs caching and putting into channel under r.lock

# for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
v_cache = status ? v_local : v_old
end

send_del_client(r)
v
something(v_cache)
end

fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
Expand All @@ -612,12 +659,30 @@ A `put!` on an already set `Future` throws an `Exception`.
All asynchronous remote calls return `Future`s and set the
value to the return value of the call upon completion.
"""
function put!(rr::Future, v)
rr.v !== nothing && error("Future can be set only once")
call_on_owner(put_future, rr, v, myid())
rr.v = Some(v)
rr
function put!(r::Future, v)
if r.where == myid()
rid = remoteref_id(r)
rv = lookup_ref(rid)
isready(rv) && error("Future can be set only once")
@lock r.lock begin
put!(rv, v) # this notifies the tasks waiting on the channel in fetch
set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
end
del_client(rid, myid())
else
@lock r.lock begin # same idea as above if there were any local tasks fetching on this Future
call_on_owner(put_future, r, v, myid())
set_future_cache(r, v)
end
end
r
end

function set_future_cache(r::Future, v)
_, ok = @atomicreplace r.v nothing => Some(v)
ok || error("internal consistency error detected for Future")
end

function put_future(rid, v, caller)
rv = lookup_ref(rid)
isready(rv) && error("Future can be set only once")
Expand Down
3 changes: 3 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ function test_regular_io_ser(ref::Distributed.AbstractRemoteRef)
v = getfield(ref2, fld)
if isa(v, Number)
@test v === zero(typeof(v))
elseif fld == :lock
@test v isa ReentrantLock
@test !islocked(v)
elseif v !== nothing
error(string("Add test for field ", fld))
end
Expand Down

0 comments on commit 4d17719

Please sign in to comment.