diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index 051a569156810..29c7e521d67a8 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef where::Int whence::Int id::Int - v::Union{Some{Any}, Nothing} + lock::ReentrantLock + @atomic v::Union{Some{Any}, Nothing} Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) = - (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r)) + (r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r)) - Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances + Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances end """ @@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef) found = getkey(client_refs, r, nothing) if found !== nothing @assert r.where > 0 - if isa(r, Future) && found.v === nothing && r.v !== nothing - # we have recd the value from another source, probably a deserialized ref, send a del_client message - send_del_client(r) - found.v = r.v + if isa(r, Future) + # this is only for copying the reference from Future to RemoteRef (just created) + fv_cache = @atomic :acquire found.v + rv_cache = @atomic :monotonic r.v + if fv_cache === nothing && rv_cache !== nothing + # we have recd the value from another source, probably a deserialized ref, send a del_client message + send_del_client(r) + @lock found.lock begin + @atomicreplace found.v nothing => rv_cache + end + end end return found::typeof(r) end @@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef) send_del_client_no_lock(r) else # send_del_client only if the reference has not been set - r.v === nothing && send_del_client_no_lock(r) - r.v = nothing + v_cache = @atomic :monotonic r.v + v_cache === nothing && send_del_client_no_lock(r) + @atomic :monotonic r.v = nothing end r.where = 0 finally @@ -201,7 +210,8 @@ isready(f) # will not block ``` """ function isready(rr::Future) - rr.v === nothing || return true + v_cache = @atomic rr.v + v_cache === nothing || return true rid = remoteref_id(rr) return if rr.where == myid() @@ -354,26 +364,33 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T -serialize(s::ClusterSerializer, f::Future) = serialize(s, f, f.v === nothing) -serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) -function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) - if addclient +function serialize(s::ClusterSerializer, f::Future) + v_cache = @atomic f.v + if v_cache === nothing p = worker_id_from_socket(s.io) - (p !== rr.where) && send_add_client(rr, p) + (p !== f.where) && send_add_client(f, p) end + fc = Future((f.where, f.whence, f.id, v_cache)) # copy to be used for serialization (contains a reset lock) + invoke(serialize, Tuple{ClusterSerializer, Any}, s, fc) +end + +function serialize(s::ClusterSerializer, rr::RemoteChannel) + p = worker_id_from_socket(s.io) + (p !== rr.where) && send_add_client(rr, p) invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr) end function deserialize(s::ClusterSerializer, t::Type{<:Future}) - f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) - f2 = Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table + fc = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy + f2 = Future(fc.where, RRID(fc.whence, fc.id), fc.v) # ctor adds to client_refs table # 1) send_add_client() is not executed when the ref is being serialized # to where it exists, hence do it here. # 2) If we have received a 'fetch'ed Future or if the Future ctor found an # already 'fetch'ed instance in client_refs (Issue #25847), we should not # track it in the backing RemoteValue store. - if f2.where == myid() && f2.v === nothing + f2v_cache = @atomic f2.v + if f2.where == myid() && f2v_cache === nothing add_client(remoteref_id(f2), myid()) end f2 @@ -570,7 +587,7 @@ end Wait for a value to become available for the specified [`Future`](@ref). """ -wait(r::Future) = (r.v !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) +wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r) """ wait(r::RemoteChannel, args...) @@ -587,11 +604,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace. """ function fetch(r::Future) - r.v !== nothing && return something(r.v) - v = call_on_owner(fetch_ref, r) - r.v = Some(v) + v_cache = @atomic r.v + v_cache !== nothing && return something(v_cache) + + if r.where == myid() + rv, v_cache = @lock r.lock begin + v_cache = @atomic :monotonic r.v + rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing + rv, v_cache + end + + if v_cache !== nothing + return something(v_cache) + else + v_local = fetch(rv.c) + end + else + v_local = call_on_owner(fetch_ref, r) + end + + v_cache = @atomic r.v + + if v_cache === nothing # call_on_owner case + v_old, status = @lock r.lock begin + @atomicreplace r.v nothing => Some(v_local) + end + # status == true - when value obtained through call_on_owner + # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated + # why? local put! performs caching and putting into channel under r.lock + + # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v + v_cache = status ? v_local : v_old + end + send_del_client(r) - v + something(v_cache) end fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) @@ -615,12 +662,30 @@ A `put!` on an already set `Future` throws an `Exception`. All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ -function put!(rr::Future, v) - rr.v !== nothing && error("Future can be set only once") - call_on_owner(put_future, rr, v, myid()) - rr.v = Some(v) - rr +function put!(r::Future, v) + if r.where == myid() + rid = remoteref_id(r) + rv = lookup_ref(rid) + isready(rv) && error("Future can be set only once") + @lock r.lock begin + put!(rv, v) # this notifies the tasks waiting on the channel in fetch + set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached + end + del_client(rid, myid()) + else + @lock r.lock begin # same idea as above if there were any local tasks fetching on this Future + call_on_owner(put_future, r, v, myid()) + set_future_cache(r, v) + end + end + r end + +function set_future_cache(r::Future, v) + _, ok = @atomicreplace r.v nothing => Some(v) + ok || error("internal consistency error detected for Future") +end + function put_future(rid, v, caller) rv = lookup_ref(rid) isready(rv) && error("Future can be set only once") diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index ea301b5f80651..46a36e2eab040 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -350,6 +350,9 @@ function test_regular_io_ser(ref::Distributed.AbstractRemoteRef) v = getfield(ref2, fld) if isa(v, Number) @test v === zero(typeof(v)) + elseif fld == :lock + @test v isa ReentrantLock + @test !islocked(v) elseif v !== nothing error(string("Add test for field ", fld)) end