Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Distributed] Worker local race condition between put! and fetch for Futures #42339

Merged
merged 29 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9a9821d
add local_lock to Future, use it in fetch and put!
krynju Sep 21, 2021
81b3075
add corrections to the remote/clientref logic
krynju Sep 22, 2021
656546d
add additional check and ordering
krynju Sep 28, 2021
8d8712f
add some comments
krynju Sep 29, 2021
7c0dff0
fix whitespace
krynju Sep 29, 2021
64922da
add atomic and remove del_client from put
krynju Sep 29, 2021
389088b
add check in lock
krynju Sep 30, 2021
7635442
add cleanup
krynju Oct 1, 2021
50c4cd2
fix a return mistake
krynju Oct 1, 2021
92188ba
add one more ordering adjustment
krynju Oct 2, 2021
306b6e4
fix accidentaly removed line
krynju Oct 2, 2021
07bbd7c
fix unnecessary changes
krynju Oct 2, 2021
38b6419
add @atomic back to loads + acquire/release
krynju Oct 9, 2021
c5045cf
fix whitespace
krynju Oct 9, 2021
0040080
review things applied (mostly)
krynju Oct 21, 2021
87d30f6
add some comments
krynju Oct 22, 2021
4b634d3
fix comment
krynju Nov 3, 2021
2c78ed9
Apply suggestions from code review
krynju Nov 5, 2021
c93f743
fix the @lock issues
krynju Nov 6, 2021
c59408e
small adjustments
krynju Nov 6, 2021
4b8d7da
serialize adjustments
krynju Nov 17, 2021
3b68488
Update stdlib/Distributed/src/remotecall.jl
krynju Nov 17, 2021
a115a32
revert lock on serialize
krynju Nov 18, 2021
83ecafa
change assert to error, seems like the isready check doesn't check pr…
krynju Nov 18, 2021
67da4d5
Update stdlib/Distributed/src/remotecall.jl
krynju Nov 20, 2021
59d910a
Revert "Update stdlib/Distributed/src/remotecall.jl"
krynju Nov 20, 2021
4d73d33
serialize a copy of the future
krynju Nov 21, 2021
19fd304
future copy serialization
krynju Nov 21, 2021
b9eaef6
Merge branch 'JuliaLang:master' into kr/local-future-thread-safety
krynju Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 84 additions & 23 deletions stdlib/Distributed/src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
where::Int
whence::Int
id::Int
v::Union{Some{Any}, Nothing}
lock::ReentrantLock
@atomic v::Union{Some{Any}, Nothing}

Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) =
(r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
(r = new(w,rrid.whence,rrid.id,ReentrantLock(),v); return test_existing_ref(r))

Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],ReentrantLock(),t[4]) # Useful for creating dummy, zeroed-out instances
end

"""
Expand Down Expand Up @@ -69,10 +70,15 @@ function test_existing_ref(r::AbstractRemoteRef)
found = getkey(client_refs, r, nothing)
if found !== nothing
@assert r.where > 0
if isa(r, Future) && found.v === nothing && r.v !== nothing
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
found.v = r.v
if isa(r, Future)
# this is only for copying the reference from Future to RemoteRef (just created)
fv_cache = @atomic :acquire found.v
rv_cache = @atomic :monotonic r.v
krynju marked this conversation as resolved.
Show resolved Hide resolved
if fv_cache === nothing && rv_cache !== nothing
# we have recd the value from another source, probably a deserialized ref, send a del_client message
send_del_client(r)
@atomic :release found.v = rv_cache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If multiple tasks are deserializing the same ref, only one of them should "win" here and update found. We probably need to do this with :sequentially_consistent order also, since that seems likely to be our intended memory model for put! / take! on AbstractRemoteRefs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it with an @atomicreplace - seems to be more fitting as its sequentially consistent and also keeps our rule of always setting the cache once

end
end
return found::typeof(r)
end
Expand All @@ -91,8 +97,9 @@ function finalize_ref(r::AbstractRemoteRef)
send_del_client_no_lock(r)
else
# send_del_client only if the reference has not been set
r.v === nothing && send_del_client_no_lock(r)
r.v = nothing
v_cache = @atomic :monotonic r.v
v_cache === nothing && send_del_client_no_lock(r)
@atomic :monotonic r.v = nothing
end
r.where = 0
finally
Expand Down Expand Up @@ -201,7 +208,8 @@ isready(f) # will not block
```
"""
function isready(rr::Future)
rr.v === nothing || return true
v_cache = @atomic rr.v
v_cache === nothing || return true

rid = remoteref_id(rr)
return if rr.where == myid()
Expand Down Expand Up @@ -351,7 +359,10 @@ end

channel_type(rr::RemoteChannel{T}) where {T} = T

serialize(s::ClusterSerializer, f::Future) = serialize(s, f, f.v === nothing)
function serialize(s::ClusterSerializer, f::Future)
v_cache = @atomic f.v
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit aggressive to make serialize/deserialize full atomic barriers (instead of their defaults). @tkf any thoughts too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi this is just one leftover future.v usage that wasn't wrapped in @atomic and didn't error before in tests or usage

Copy link
Member

@tkf tkf Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Can serialize(s::ClusterSerializer, f::Future) call serialize(::AbstractSerializer, ::typeof(something(f.v)))? If so, I guess we need to make it as strong as fetch since the author of MyType can override serialize(::AbstractSerializer, ::MyType) and it can contain arbitrary code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking closer, the issue here is that we need a lock around f.v for the duration of this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put a lock on f.lock around the full serialize function

serialize(s, f, v_cache === nothing)
end
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to change the Future serializer to exclude the new fields:

Suggested change
function serialize(s::ClusterSerializer, f::Future)
v_cache = @atomic f.v
serialize(s, f, v_cache === nothing)
end
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
function serialize(s::ClusterSerializer, f::Future)
serialize_type(s, typeof(f))
serialize(s, f.where)
serialize(s, remoteref_id(f))
value = @atomic f.v
if value === nothing
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
end
serialize(s, value)
end
function serialize(s::ClusterSerializer, f::RemoteChannel)
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
invoke(serialize, Tuple{AbstractSerializer, Any}, s, f)
end
serialize(s::AbstractSerializer, ::AbstractLock) = error("Locks cannot be serialized")

and corresponding deserialize methods

function deserialize(s::ClusterSerializer, T::Type{<:Future})
    where = deserialize(s, f.where)::Int
    rid = deserialize(s, remoteref_id(f))::RRID
    value = deserialize(s, f.value)
    # 1) send_add_client() is not executed when the ref is being serialized
    #    to where it exists, hence do it here.
    # 2) If we have received a 'fetch'ed Future or if the Future ctor found an
    #    already 'fetch'ed instance in client_refs (Issue #25847), we should not
    #    track it in the backing RemoteValue store.
    if where == myid() && value === nothing
        add_client(rid, where)
    end
    return T(where, rid, value) # ctor adds to client_refs table
end
function deserialize(s::ClusterSerializer, T::Type{<:RemoteChannel})
    rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)::T # deserialize a placeholder object
    rid = remoteref_id(rr)
    if rr.where == myid()
        # send_add_client() is not executed when the ref is being
        # serialized to where it exists
        add_client(rid, myid())
    end
    # call ctor to make sure this rr gets added to the client_refs table
    return T(rr.where, rid)
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to make it work, but without any success.
If I serialize the fields manually as proposed by you at some point the code expects a Future structure and I'm not really sure how to handle that.

Anyway i was thinking about the issue and as of right now:

  1. Serialization takes the full Future with the new lock and serializes it
  2. Deserialization takes the serialized data and constructs a new Future based on it, but I'm unsure whether it takes the serialized lock data or creates a new one (constructors only ever create a new one, apart from the default)

I made the serialization now serialize a copy of the Future with a new unused lock, but I'm not sure if it's necessary. It used to work fine before any serialization change anyway

Copy link
Contributor Author

@krynju krynju Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vtjnash I rebased and it's finally passing CI.
What do you think of the above changes?
I couldn't get the serialization of only the "safe" fields working, so I just added a full copy in the serializer (so that the serialization always works on a copy with an unlocked lock).
I'm not sure it's necessary though as serializing a locked lock doesn't matter in the end as during deserialization a new lock is being constructed.
Or is there some reason not to serialize a locked lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why you were trying to use invoke, since there is definitely no hope of that approach working at all, while I had written the code just above here that I think should be roughly correct. Anyways, this approach now seems suitable enough so I have merged it.

Expand All @@ -363,14 +374,16 @@ end

function deserialize(s::ClusterSerializer, t::Type{<:Future})
f = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
f2 = Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
fv_cache = @atomic f.v
f2 = Future(f.where, RRID(f.whence, f.id), fv_cache) # ctor adds to client_refs table

# 1) send_add_client() is not executed when the ref is being serialized
# to where it exists, hence do it here.
# 2) If we have received a 'fetch'ed Future or if the Future ctor found an
# already 'fetch'ed instance in client_refs (Issue #25847), we should not
# track it in the backing RemoteValue store.
if f2.where == myid() && f2.v === nothing
f2v_cache = @atomic f2.v
if f2.where == myid() && f2v_cache === nothing
add_client(remoteref_id(f2), myid())
end
f2
Expand Down Expand Up @@ -564,7 +577,7 @@ end

Wait for a value to become available for the specified [`Future`](@ref).
"""
wait(r::Future) = (r.v !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)
wait(r::Future) = (v_cache = @atomic r.v; v_cache !== nothing && return r; call_on_owner(wait_ref, r, myid()); r)

"""
wait(r::RemoteChannel, args...)
Expand All @@ -581,11 +594,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
"""
function fetch(r::Future)
r.v !== nothing && return something(r.v)
v = call_on_owner(fetch_ref, r)
r.v = Some(v)
v_cache = @atomic r.v
v_cache !== nothing && return something(v_cache)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we lock r::Future here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't add a lock here since it's supposed to be a quick cache check

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant here exactly, as in after the quick lookup.

For (my own) reference on performance:

julia> mutable struct AtomicCounter
           @atomic x::Int
       end

julia> const ac = AtomicCounter(0);

julia> const r = Some(0);

julia> const r2 = Ref(0);

# measure operation overhead
julia> @btime (r2[] = something(r))
  1.696 ns (0 allocations: 0 bytes)

# measure atomic load overhead
julia> @btime @atomic ac.x
  1.696 ns (0 allocations: 0 bytes)

julia> @btime @atomicreplace ac.x 0 => 0
  8.372 ns (0 allocations: 0 bytes)

julia> @btime @atomicreplace :monotonic ac.x 0 => 0
  8.372 ns (0 allocations: 0 bytes)

julia> const lk2 = Base.ThreadSynchronizer();

julia> @btime (lock(lk2); unlock(lk2))
  20.072 ns (0 allocations: 0 bytes)

julia> @btime (Base.iolock_begin(); Base.iolock_end())
  18.390 ns (0 allocations: 0 bytes)

julia> Base.iolock_begin(); @btime (Base.iolock_begin(); Base.iolock_end()); Base.iolock_end()
  12.467 ns (0 allocations: 0 bytes)

julia> const lk3 = Libc.malloc(40)
Ptr{Nothing} @0x0000555e99ac70c0

julia> ccall(:uv_mutex_init, Cint, (Ptr{Cvoid},), lk3)
0

julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
  20.407 ns (0 allocations: 0 bytes)

julia> ccall(:uv_mutex_init_recursive, Cint, (Ptr{Cvoid},), lk3)
0

julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
  23.412 ns (0 allocations: 0 bytes)
0

julia> ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3)
0

julia> @btime (ccall(:uv_mutex_lock, Cint, (Ptr{Cvoid},), lk3); ccall(:uv_mutex_unlock, Cint, (Ptr{Cvoid},), lk3))
  12.392 ns (0 allocations: 0 bytes)
0

julia> const lk = ReentrantLock()

julia> @btime (lock(lk); unlock(lk))
  56.520 ns (0 allocations: 0 bytes)

julia> lock(lk)

julia> @btime (lock(lk); unlock(lk))
  12.405 ns (0 allocations: 0 bytes)

(note: that last lock lk needs to be optimized, to be on-par with the other lks, but we didn't used to have the ability to express atomic operations in julia)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I put the whole thing in a lock(r.lock) then a local put! will never be able to enter the lock, because the fetch will be stuck on the fetch(rv.c). And in put! if i put into the channel outside of the lock I don't have control over who caches the local value first

I'm not sure what else could I improve here.
The local cache is always set once. The paths from fetch(rv.c) and call_on_owner will rightfully try to cache the value and only fail if it was already cached or put! was locally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, yeah, I see how it is complicated by the fact that we might be the owner, so we are waiting for someone else to set the value before we can return it here. But it seems like if this fails in the remote case, it is because there was a thread-synchronization error, which could happen because this lock was not being held across here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call on owner case won't fail due to this lock, because it fetches on a remote channel

this fetch function isn't used in the call_on_owner scenario

This looks safe to me as it is right now because of that, but I haven't dwelled that deep into the remote scenarios

if r.where == myid()
rv, v_cache = @lock r.lock begin
v_cache = @atomic :monotonic r.v
rv = v_cache === nothing ? lookup_ref(remoteref_id(r)) : nothing
rv, v_cache
end

if v_cache !== nothing
return something(v_cache)
else
v_local = fetch(rv.c)
end
else
v_local = call_on_owner(fetch_ref, r)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably be locked, so that call_on_owner only gets called once, but I'm not sure if there's any hidden implications of doing that

end

v_cache = @atomic r.v

if v_cache === nothing # call_on_owner case
v_old, status = @lock r.lock begin
@atomicreplace r.v nothing => Some(v_local)
end
# status == true - when value obtained through call_on_owner
# status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
# why? local put! performs caching and putting into channel under r.lock

# for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
v_cache = status ? v_local : v_old
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vtjnash Just not sure here. So when a fetch gets the value from call_on_owner at line 573 we cache the value in r.v and then should fetch return the v_local obtained from the call_on_owner or should I load r.v and get that cached value?
Right now it's just returning v_local

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure here. Since fetch(rv.c) succeeded, it seems like that set the value for everyone, but now we might decide to copy it locally too, but that we must have strictly maintained that rv.v === r.v, since r.v is just a locally cached copy of the same pointer from rv.v (which has the canonical copy) in this case. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. The rv.v === r.v is maintained right now properly I think

end

send_del_client(r)
v
something(v_cache)
end

fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
Expand All @@ -609,12 +652,30 @@ A `put!` on an already set `Future` throws an `Exception`.
All asynchronous remote calls return `Future`s and set the
value to the return value of the call upon completion.
"""
function put!(rr::Future, v)
rr.v !== nothing && error("Future can be set only once")
call_on_owner(put_future, rr, v, myid())
rr.v = Some(v)
rr
function put!(r::Future, v)
if r.where == myid()
rid = remoteref_id(r)
rv = lookup_ref(rid)
isready(rv) && error("Future can be set only once")
@lock r.lock begin
put!(rv, v) # this notifies the tasks waiting on the channel in fetch
set_future_cache(r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
end
del_client(rid, myid())
else
@lock r.lock begin # same idea as above if there were any local tasks fetching on this Future
call_on_owner(put_future, r, v, myid())
set_future_cache(r, v)
end
end
r
end

function set_future_cache(r::Future, v)
_, ok = @atomicreplace r.v nothing => Some(v)
@assert ok "internal consistency error detected for Future"
end

function put_future(rid, v, caller)
krynju marked this conversation as resolved.
Show resolved Hide resolved
rv = lookup_ref(rid)
isready(rv) && error("Future can be set only once")
Expand Down
3 changes: 3 additions & 0 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ function test_regular_io_ser(ref::Distributed.AbstractRemoteRef)
v = getfield(ref2, fld)
if isa(v, Number)
@test v === zero(typeof(v))
elseif fld == :lock
@test v isa ReentrantLock
@test !islocked(v)
elseif v !== nothing
error(string("Add test for field ", fld))
end
Expand Down