Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more fixes to I/O and threading #31733

Merged
merged 1 commit into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ Use [`isopen`](@ref) to check whether it is still active.
"""
mutable struct AsyncCondition
handle::Ptr{Cvoid}
cond::Condition
cond::ThreadSynchronizer
isopen::Bool

function AsyncCondition()
this = new(Libc.malloc(_sizeof_uv_async), Condition(), true)
this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true)
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)
err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
Expand All @@ -41,14 +41,22 @@ the async condition object itself.
function AsyncCondition(cb::Function)
async = AsyncCondition()
waiter = Task(function()
while isopen(async)
success = try
wait(async)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
lock(async.cond)
try
while isopen(async)
success = try
stream_wait(async, async.cond)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
finally
unlock(async.cond)
end
success && cb(async)
lock(async.cond)
end
success && cb(async)
finally
unlock(async.cond)
end
end)
# must start the task right away so that it can wait for the AsyncCondition before
Expand All @@ -71,14 +79,14 @@ to check whether a timer is still active.
"""
mutable struct Timer
handle::Ptr{Cvoid}
cond::Condition
cond::ThreadSynchronizer
isopen::Bool

function Timer(timeout::Real; interval::Real = 0.0)
timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))

this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true)
this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true)
err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this)
if err != 0
#TODO: this codepath is currently not tested
Expand All @@ -102,8 +110,13 @@ unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle
unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle

function wait(t::Union{Timer, AsyncCondition})
isopen(t) || throw(EOFError())
stream_wait(t, t.cond)
lock(t.cond)
try
isopen(t) || throw(EOFError())
stream_wait(t, t.cond)
finally
unlock(t.cond)
end
end

isopen(t::Union{Timer, AsyncCondition}) = t.isopen
Expand All @@ -128,24 +141,39 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
end

function _uv_hook_close(t::Union{Timer, AsyncCondition})
uvfinalize(t)
notify_error(t.cond, EOFError())
lock(t.cond)
try
uvfinalize(t)
notify_error(t.cond, EOFError())
finally
unlock(t.cond)
end
nothing
end

function uv_asynccb(handle::Ptr{Cvoid})
async = @handle_as handle AsyncCondition
notify(async.cond)
lock(async.cond)
try
notify(async.cond)
finally
unlock(async.cond)
end
nothing
end

function uv_timercb(handle::Ptr{Cvoid})
t = @handle_as handle Timer
if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
# timer is stopped now
close(t)
lock(t.cond)
try
if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
# timer is stopped now
close(t)
end
notify(t.cond)
finally
unlock(t.cond)
end
notify(t.cond)
nothing
end

Expand Down
12 changes: 9 additions & 3 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## thread/task locking abstraction

@noinline function concurrency_violation()
# can be useful for debugging
#try; error(); catch; ccall(:jlbacktrace, Cvoid, ()); end
error("concurrency violation detected")
end

"""
AbstractLock

Expand All @@ -18,10 +24,10 @@ unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wai
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid())
assert_havelock(l::AbstractLock, tid::Integer) =
(islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected")
(islocked(l) && tid == Threads.threadid()) ? nothing : concurrency_violation()
assert_havelock(l::AbstractLock, tid::Task) =
(islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected")
(islocked(l) && tid === current_task()) ? nothing : concurrency_violation()
assert_havelock(l::AbstractLock, tid::Nothing) = concurrency_violation()

"""
AlwaysLockedST
Expand Down
2 changes: 1 addition & 1 deletion base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ function relockall(rl::ReentrantLock, n::Int)
lock(rl)
n1 = rl.reentrancy_cnt
rl.reentrancy_cnt = n
n1 == 1 || error("concurrency violation detected")
n1 == 1 || concurrency_violation()
return
end

Expand Down
6 changes: 3 additions & 3 deletions base/locks-mt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ function mutex_destroy(x::Mutex)
end

function lock(m::Mutex)
m.ownertid == threadid() && error("concurrency violation detected") # deadlock
m.ownertid == threadid() && concurrency_violation() # deadlock
# Temporary solution before we have gc transition support in codegen.
# This could mess up gc state when we add codegen support.
gc_state = ccall(:jl_gc_safe_enter, Int8, ())
Expand All @@ -115,7 +115,7 @@ function lock(m::Mutex)
end

function trylock(m::Mutex)
m.ownertid == threadid() && error("concurrency violation detected") # deadlock
m.ownertid == threadid() && concurrency_violation() # deadlock
r = ccall(:uv_mutex_trylock, Cint, (Ptr{Cvoid},), m)
if r == 0
m.ownertid = threadid()
Expand All @@ -124,7 +124,7 @@ function trylock(m::Mutex)
end

function unlock(m::Mutex)
m.ownertid == threadid() || error("concurrency violation detected")
m.ownertid == threadid() || concurrency_violation()
m.ownertid = 0
ccall(:uv_mutex_unlock, Cvoid, (Ptr{Cvoid},), m)
return
Expand Down
92 changes: 50 additions & 42 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,18 @@ mutable struct PipeEndpoint <: LibuvStream
handle::Ptr{Cvoid}
status::Int
buffer::IOBuffer
readnotify::Condition
connectnotify::Condition
cond::ThreadSynchronizer
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
function PipeEndpoint(handle::Ptr{Cvoid}, status)
lock = Threads.SpinLock()
p = new(handle,
status,
PipeBuffer(),
Condition(),
Condition(),
ThreadSynchronizer(),
ThreadSynchronizer(lock),
ThreadSynchronizer(lock),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -164,19 +163,20 @@ mutable struct TTY <: LibuvStream
handle::Ptr{Cvoid}
status::Int
buffer::IOBuffer
readnotify::Condition
cond::ThreadSynchronizer
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
@static if Sys.iswindows(); ispty::Bool; end
function TTY(handle::Ptr{Cvoid}, status)
lock = Threads.SpinLock()
tty = new(
handle,
status,
PipeBuffer(),
Condition(),
ThreadSynchronizer(),
ThreadSynchronizer(lock),
ThreadSynchronizer(lock),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -326,9 +326,14 @@ end

function wait_connected(x::Union{LibuvStream, LibuvServer})
check_open(x)
while x.status == StatusConnecting
stream_wait(x, x.connectnotify)
check_open(x)
lock(x.cond)
try
while x.status == StatusConnecting
stream_wait(x, x.cond)
check_open(x)
end
finally
unlock(x.cond)
end
end

Expand All @@ -339,16 +344,18 @@ function wait_readbyte(x::LibuvStream, c::UInt8)
return
end
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && !occursin(c, x.buffer)
start_reading(x) # ensure we are reading
wait(x.readnotify)
wait(x.cond)
end
finally
if isempty(x.readnotify)
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
unpreserve_handle(x)
unlock(x.cond)
end
nothing
end
Expand All @@ -361,20 +368,22 @@ function wait_readnb(x::LibuvStream, nb::Int)
end
oldthrottle = x.throttle
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && bytesavailable(x.buffer) < nb
x.throttle = max(nb, x.throttle)
start_reading(x) # ensure we are reading
wait(x.readnotify)
wait(x.cond)
end
finally
if isempty(x.readnotify)
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
if oldthrottle <= x.throttle <= nb
x.throttle = oldthrottle
end
unpreserve_handle(x)
unlock(x.cond)
end
nothing
end
Expand Down Expand Up @@ -552,34 +561,34 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
stream_unknown_type = @handle_as handle LibuvStream
nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)
function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt)
if nread < 0
if nread == UV_ENOBUFS && nrequested == 0
# remind the client that stream.buffer is full
notify(stream.readnotify)
elseif nread == UV_EOF
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.readnotify)
lock(stream.closenotify)
try
lock(stream.cond)
try
if nread < 0
if nread == UV_ENOBUFS && nrequested == 0
# remind the client that stream.buffer is full
notify(stream.cond)
elseif nread == UV_EOF
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.cond)
notify(stream.closenotify)
finally
unlock(stream.closenotify)
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
else
# This is a fatal connection error. Shutdown requests as per the usual
# close function won't work and libuv will fail with an assertion failure
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream)
notify_error(stream.cond, _UVError("read", nread))
end
else
# This is a fatal connection error. Shutdown requests as per the usual
# close function won't work and libuv will fail with an assertion failure
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream)
notify_error(stream.readnotify, _UVError("read", nread))
notify_filled(stream.buffer, nread)
notify(stream.cond)
end
else
notify_filled(stream.buffer, nread)
notify(stream.readnotify)
finally
unlock(stream.cond)
end

# Stop background reading when
Expand Down Expand Up @@ -613,11 +622,10 @@ function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
notify(uv.cond)
finally
unlock(uv.closenotify)
end
isdefined(uv, :readnotify) && notify(uv.readnotify)
isdefined(uv, :connectnotify) && notify(uv.connectnotify)
nothing
end

Expand Down Expand Up @@ -797,7 +805,7 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
return bytesavailable(newbuf)
finally
s.buffer = sbuf
if !isempty(s.readnotify)
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
end
Expand Down Expand Up @@ -833,7 +841,7 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
nb == bytesavailable(newbuf) || throw(EOFError())
finally
s.buffer = sbuf
if !isempty(s.readnotify)
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
end
Expand Down
2 changes: 2 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ NOINLINE uintptr_t gc_get_stack_ptr(void)
#ifdef JULIA_ENABLE_THREADING
static void jl_gc_wait_for_the_world(void)
{
if (jl_n_threads > 1)
jl_wake_libuv();
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuyichao shouldn't libuv be in a gc-safe region?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can spend time there without hitting other Julia runtime then yes. In which case we also need to make sure the transition back is emitted for all the callbacks.

for (int i = 0;i < jl_n_threads;i++) {
jl_ptls_t ptls2 = jl_all_tls_states[i];
// FIXME: The acquire load pairs with the release stores
Expand Down
Loading