Skip to content

Commit

Permalink
threads: expand thread-safe region for I/O (#32309)
Browse files Browse the repository at this point in the history
This should hopefully cover most I/O operations which go through libuv to make them thread-safe.

There is no scaling here, just one big global lock, so expect
worse-than-single-threaded performance if doing I/O on multiple threads (as
compared to doing the same work on one thread). The intention is to handle
performance improvement incrementally later.

It also necessarily redesigns parts of the UDPSocket implementation
to properly handle concurrent (single-threaded) usage, as a necessary
part of making it handle parallel (thread-safe) usage.
  • Loading branch information
vtjnash authored and JeffBezanson committed Jun 21, 2019
1 parent dd56dbf commit 58bafe4
Show file tree
Hide file tree
Showing 25 changed files with 1,017 additions and 863 deletions.
158 changes: 93 additions & 65 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ mutable struct AsyncCondition
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
set::Bool

function AsyncCondition()
this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true)
this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true, false)
iolock_begin()
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)
err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
eventloop(), this, uv_jl_asynccb::Ptr{Cvoid})
if err != 0
Expand All @@ -28,6 +29,8 @@ mutable struct AsyncCondition
this.handle = C_NULL
throw(_UVError("uv_async_init", err))
end
finalizer(uvfinalize, this)
iolock_end()
return this
end
end
Expand All @@ -40,28 +43,10 @@ the async condition object itself.
"""
function AsyncCondition(cb::Function)
async = AsyncCondition()
waiter = Task(function()
lock(async.cond)
try
while isopen(async)
success = try
stream_wait(async, async.cond)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
finally
unlock(async.cond)
end
success && cb(async)
lock(async.cond)
end
finally
unlock(async.cond)
@async while _trywait(async)
cb(async)
isopen(async) || return
end
end)
# must start the task right away so that it can wait for the AsyncCondition before
# we re-enter the event loop. this avoids a race condition. see issue #12719
yield(waiter)
return async
end

Expand All @@ -81,59 +66,108 @@ mutable struct Timer
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
timeout 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
interval 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))
timeout = UInt64(round(timeout * 1000)) + 1
interval = UInt64(round(interval * 1000))
loop = eventloop()

this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true)
ccall(:jl_uv_update_timer_start, Cvoid,
(Ptr{Cvoid}, Any, Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
eventloop(), this, this.handle, uv_jl_timercb::Ptr{Cvoid},
UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000)))
this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true, false)
associate_julia_struct(this.handle, this)
iolock_begin()
err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), loop, this)
@assert err == 0
finalizer(uvfinalize, this)
ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), loop)
err = ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
this, uv_jl_timercb::Ptr{Cvoid}, timeout, interval)
@assert err == 0
iolock_end()
return this
end
end

unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle
unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle

function wait(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
isopen(t) || throw(EOFError())
stream_wait(t, t.cond)
finally
unlock(t.cond)
function _trywait(t::Union{Timer, AsyncCondition})
set = t.set
if !set
t.handle == C_NULL && return false
iolock_begin()
set = t.set
if !set
preserve_handle(t)
lock(t.cond)
try
set = t.set
if !set
if t.handle != C_NULL
iolock_end()
set = wait(t.cond)
unlock(t.cond)
iolock_begin()
lock(t.cond)
end
end
finally
unlock(t.cond)
unpreserve_handle(t)
end
end
iolock_end()
end
t.set = false
return set
end

function wait(t::Union{Timer, AsyncCondition})
_trywait(t) || throw(EOFError())
nothing
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if t.handle != C_NULL && isopen(t)
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
nothing
end

function uvfinalize(t::Union{Timer, AsyncCondition})
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
close(t)
t.handle = C_NULL
iolock_begin()
lock(t.cond)
try
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
t.handle = C_NULL
notify(t.cond, false)
end
finally
unlock(t.cond)
end
t.isopen = false
iolock_end()
nothing
end

function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
uvfinalize(t)
notify_error(t.cond, EOFError())
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
finally
unlock(t.cond)
end
Expand All @@ -144,7 +178,8 @@ function uv_asynccb(handle::Ptr{Cvoid})
async = @handle_as handle AsyncCondition
lock(async.cond)
try
notify(async.cond)
async.set = true
notify(async.cond, true)
finally
unlock(async.cond)
end
Expand All @@ -155,11 +190,12 @@ function uv_timercb(handle::Ptr{Cvoid})
t = @handle_as handle Timer
lock(t.cond)
try
t.set = true
if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
# timer is stopped now
close(t)
end
notify(t.cond)
notify(t.cond, true)
finally
unlock(t.cond)
end
Expand Down Expand Up @@ -199,7 +235,7 @@ Here the first number is printed after a delay of two seconds, then the followin
julia> begin
i = 0
cb(timer) = (global i += 1; println(i))
t = Timer(cb, 2, interval = 0.2)
t = Timer(cb, 2, interval=0.2)
wait(t)
sleep(0.5)
close(t)
Expand All @@ -209,37 +245,28 @@ julia> begin
3
```
"""
function Timer(cb::Function, timeout::Real; interval::Real = 0.0)
t = Timer(timeout, interval = interval)
waiter = Task(function()
while isopen(t)
success = try
wait(t)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
false
end
success && cb(t)
function Timer(cb::Function, timeout::Real; interval::Real=0.0)
timer = Timer(timeout, interval=interval)
@async while _trywait(timer)
cb(timer)
isopen(timer) || return
end
end)
# must start the task right away so that it can wait for the Timer before
# we re-enter the event loop. this avoids a race condition. see issue #12719
yield(waiter)
return t
return timer
end

"""
timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
Waits until `testcb` returns `true` or for `secs` seconds, whichever is earlier.
`testcb` is polled every `pollint` seconds.
Returns :ok, :timed_out, or :error
"""
function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
pollint > 0 || throw(ArgumentError("cannot set pollint to $pollint seconds"))
start = time()
done = Channel(1)
timercb(aw) = begin
function timercb(aw)
try
if testcb()
put!(done, :ok)
Expand All @@ -251,14 +278,15 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
finally
isready(done) && close(aw)
end
nothing
end

if !testcb()
t = Timer(timercb, pollint, interval = pollint)
ret = fetch(done)
ret = fetch(done)::Symbol
close(t)
else
ret = :ok
end
ret
return ret
end
3 changes: 3 additions & 0 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ unlock(c::GenericCondition) = unlock(c.lock)
trylock(c::GenericCondition) = trylock(c.lock)
islocked(c::GenericCondition) = islocked(c.lock)

lock(f, c::GenericCondition) = lock(f, c.lock)
unlock(f, c::GenericCondition) = unlock(f, c.lock)

"""
wait([x])
Expand Down
2 changes: 0 additions & 2 deletions base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
close(::DevNull) = nothing
flush(::DevNull) = nothing
wait_connected(::DevNull) = nothing
wait_readnb(::DevNull) = wait()
wait_readbyte(::DevNull) = wait()
wait_close(::DevNull) = wait()
eof(::DevNull) = true

Expand Down
12 changes: 6 additions & 6 deletions base/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ function mktempdir(parent=tempdir(); prefix=temp_prefix)
try
ret = ccall(:uv_fs_mkdtemp, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
eventloop(), req, tpath, C_NULL)
C_NULL, req, tpath, C_NULL)
if ret < 0
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req)
uv_error("mktempdir", ret)
Expand Down Expand Up @@ -631,8 +631,8 @@ function readdir(path::AbstractString)
uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ()))

# defined in sys.c, to call uv_fs_readdir, which sets errno on error.
err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
eventloop(), uv_readdir_req, path, 0, C_NULL)
err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
C_NULL, uv_readdir_req, path, 0, C_NULL)
err < 0 && throw(SystemError("unable to read directory $path", -err))
#uv_error("unable to read directory $path", err)

Expand All @@ -644,7 +644,7 @@ function readdir(path::AbstractString)
end

# Clean up the request string
ccall(:jl_uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req)
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req)

return entries
end
Expand Down Expand Up @@ -816,9 +816,9 @@ Return the target location a symbolic link `path` points to.
function readlink(path::AbstractString)
req = Libc.malloc(_sizeof_uv_fs)
try
ret = ccall(:jl_uv_fs_readlink, Int32,
ret = ccall(:uv_fs_readlink, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
eventloop(), req, path, C_NULL)
C_NULL, req, path, C_NULL)
if ret < 0
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req)
uv_error("readlink", ret)
Expand Down
14 changes: 7 additions & 7 deletions base/filesystem.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0)
req = Libc.malloc(_sizeof_uv_fs)
local handle
try
ret = ccall(:jl_uv_fs_open, Int32,
ret = ccall(:uv_fs_open, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}),
eventloop(), req, path, flags, mode, C_NULL)
handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req)
C_NULL, req, path, flags, mode, C_NULL)
handle = ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req)
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req)
uv_error("open", ret)
finally # conversion to Cstring could cause an exception
Expand Down Expand Up @@ -138,9 +138,9 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c))
function truncate(f::File, n::Integer)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:jl_uv_fs_ftruncate, Int32,
err = ccall(:uv_fs_ftruncate, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}),
eventloop(), req, f.handle, n, C_NULL)
C_NULL, req, f.handle, n, C_NULL)
Libc.free(req)
uv_error("ftruncate", err)
return f
Expand All @@ -149,9 +149,9 @@ end
function futime(f::File, atime::Float64, mtime::Float64)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:jl_uv_fs_futime, Int32,
err = ccall(:uv_fs_futime, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}),
eventloop(), req, f.handle, atime, mtime, C_NULL)
C_NULL, req, f.handle, atime, mtime, C_NULL)
Libc.free(req)
uv_error("futime", err)
return f
Expand Down
3 changes: 0 additions & 3 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ Close an I/O stream. Performs a [`flush`](@ref) first.
"""
function close end
function flush end
function wait_connected end
function wait_readnb end
function wait_readbyte end
function wait_close end
function bytesavailable end

Expand Down Expand Up @@ -260,7 +258,6 @@ iswritable(io::AbstractPipe) = iswritable(pipe_writer(io))
isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io))
close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io)))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb)
wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io)))

"""
Expand Down
Loading

0 comments on commit 58bafe4

Please sign in to comment.