Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threads: expand thread-safe region for I/O #32309

Merged
merged 1 commit into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 93 additions & 65 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ mutable struct AsyncCondition
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
set::Bool

function AsyncCondition()
this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true)
this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true, false)
iolock_begin()
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)
err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
eventloop(), this, uv_jl_asynccb::Ptr{Cvoid})
if err != 0
Expand All @@ -28,6 +29,8 @@ mutable struct AsyncCondition
this.handle = C_NULL
throw(_UVError("uv_async_init", err))
end
finalizer(uvfinalize, this)
iolock_end()
return this
end
end
Expand All @@ -40,28 +43,10 @@ the async condition object itself.
"""
function AsyncCondition(cb::Function)
async = AsyncCondition()
waiter = Task(function()
lock(async.cond)
try
while isopen(async)
success = try
stream_wait(async, async.cond)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
finally
unlock(async.cond)
end
success && cb(async)
lock(async.cond)
end
finally
unlock(async.cond)
@async while _trywait(async)
cb(async)
isopen(async) || return
end
end)
# must start the task right away so that it can wait for the AsyncCondition before
# we re-enter the event loop. this avoids a race condition. see issue #12719
yield(waiter)
return async
end

Expand All @@ -81,59 +66,108 @@ mutable struct Timer
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))
timeout = UInt64(round(timeout * 1000)) + 1
interval = UInt64(round(interval * 1000))
loop = eventloop()

this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true)
ccall(:jl_uv_update_timer_start, Cvoid,
(Ptr{Cvoid}, Any, Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
eventloop(), this, this.handle, uv_jl_timercb::Ptr{Cvoid},
UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000)))
this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true, false)
associate_julia_struct(this.handle, this)
iolock_begin()
err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), loop, this)
@assert err == 0
finalizer(uvfinalize, this)
ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), loop)
err = ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
this, uv_jl_timercb::Ptr{Cvoid}, timeout, interval)
@assert err == 0
iolock_end()
return this
end
end

unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle
unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle

function wait(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
isopen(t) || throw(EOFError())
stream_wait(t, t.cond)
finally
unlock(t.cond)
function _trywait(t::Union{Timer, AsyncCondition})
set = t.set
if !set
t.handle == C_NULL && return false
iolock_begin()
set = t.set
if !set
preserve_handle(t)
lock(t.cond)
try
set = t.set
if !set
if t.handle != C_NULL
iolock_end()
set = wait(t.cond)
unlock(t.cond)
iolock_begin()
lock(t.cond)
end
end
finally
unlock(t.cond)
unpreserve_handle(t)
end
end
iolock_end()
end
t.set = false
return set
end

function wait(t::Union{Timer, AsyncCondition})
_trywait(t) || throw(EOFError())
nothing
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if t.handle != C_NULL && isopen(t)
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
nothing
end

function uvfinalize(t::Union{Timer, AsyncCondition})
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
close(t)
t.handle = C_NULL
iolock_begin()
lock(t.cond)
try
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
t.handle = C_NULL
notify(t.cond, false)
end
finally
unlock(t.cond)
end
t.isopen = false
iolock_end()
nothing
end

function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
uvfinalize(t)
notify_error(t.cond, EOFError())
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
finally
unlock(t.cond)
end
Expand All @@ -144,7 +178,8 @@ function uv_asynccb(handle::Ptr{Cvoid})
async = @handle_as handle AsyncCondition
lock(async.cond)
try
notify(async.cond)
async.set = true
notify(async.cond, true)
finally
unlock(async.cond)
end
Expand All @@ -155,11 +190,12 @@ function uv_timercb(handle::Ptr{Cvoid})
t = @handle_as handle Timer
lock(t.cond)
try
t.set = true
if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
# timer is stopped now
close(t)
end
notify(t.cond)
notify(t.cond, true)
finally
unlock(t.cond)
end
Expand Down Expand Up @@ -199,7 +235,7 @@ Here the first number is printed after a delay of two seconds, then the followin
julia> begin
i = 0
cb(timer) = (global i += 1; println(i))
t = Timer(cb, 2, interval = 0.2)
t = Timer(cb, 2, interval=0.2)
wait(t)
sleep(0.5)
close(t)
Expand All @@ -209,37 +245,28 @@ julia> begin
3
```
"""
function Timer(cb::Function, timeout::Real; interval::Real = 0.0)
t = Timer(timeout, interval = interval)
waiter = Task(function()
while isopen(t)
success = try
wait(t)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
false
end
success && cb(t)
function Timer(cb::Function, timeout::Real; interval::Real=0.0)
timer = Timer(timeout, interval=interval)
@async while _trywait(timer)
cb(timer)
isopen(timer) || return
end
end)
# must start the task right away so that it can wait for the Timer before
# we re-enter the event loop. this avoids a race condition. see issue #12719
yield(waiter)
return t
return timer
end

"""
timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)

Waits until `testcb` returns `true` or for `secs` seconds, whichever is earlier.
`testcb` is polled every `pollint` seconds.

Returns :ok, :timed_out, or :error
"""
function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
pollint > 0 || throw(ArgumentError("cannot set pollint to $pollint seconds"))
start = time()
done = Channel(1)
timercb(aw) = begin
function timercb(aw)
try
if testcb()
put!(done, :ok)
Expand All @@ -251,14 +278,15 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
finally
isready(done) && close(aw)
end
nothing
end

if !testcb()
t = Timer(timercb, pollint, interval = pollint)
ret = fetch(done)
ret = fetch(done)::Symbol
close(t)
else
ret = :ok
end
ret
return ret
end
3 changes: 3 additions & 0 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ unlock(c::GenericCondition) = unlock(c.lock)
trylock(c::GenericCondition) = trylock(c.lock)
islocked(c::GenericCondition) = islocked(c.lock)

lock(f, c::GenericCondition) = lock(f, c.lock)
unlock(f, c::GenericCondition) = unlock(f, c.lock)

"""
wait([x])

Expand Down
2 changes: 0 additions & 2 deletions base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
close(::DevNull) = nothing
flush(::DevNull) = nothing
wait_connected(::DevNull) = nothing
wait_readnb(::DevNull) = wait()
wait_readbyte(::DevNull) = wait()
wait_close(::DevNull) = wait()
eof(::DevNull) = true

Expand Down
12 changes: 6 additions & 6 deletions base/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ function mktempdir(parent=tempdir(); prefix=temp_prefix)
try
ret = ccall(:uv_fs_mkdtemp, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
eventloop(), req, tpath, C_NULL)
C_NULL, req, tpath, C_NULL)
if ret < 0
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req)
uv_error("mktempdir", ret)
Expand Down Expand Up @@ -623,8 +623,8 @@ function readdir(path::AbstractString)
uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ()))

# defined in sys.c, to call uv_fs_readdir, which sets errno on error.
err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
eventloop(), uv_readdir_req, path, 0, C_NULL)
err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
C_NULL, uv_readdir_req, path, 0, C_NULL)
err < 0 && throw(SystemError("unable to read directory $path", -err))
#uv_error("unable to read directory $path", err)

Expand All @@ -636,7 +636,7 @@ function readdir(path::AbstractString)
end

# Clean up the request string
ccall(:jl_uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req)
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req)

return entries
end
Expand Down Expand Up @@ -808,9 +808,9 @@ Return the target location a symbolic link `path` points to.
function readlink(path::AbstractString)
req = Libc.malloc(_sizeof_uv_fs)
try
ret = ccall(:jl_uv_fs_readlink, Int32,
ret = ccall(:uv_fs_readlink, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
eventloop(), req, path, C_NULL)
C_NULL, req, path, C_NULL)
if ret < 0
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req)
uv_error("readlink", ret)
Expand Down
14 changes: 7 additions & 7 deletions base/filesystem.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0)
req = Libc.malloc(_sizeof_uv_fs)
local handle
try
ret = ccall(:jl_uv_fs_open, Int32,
ret = ccall(:uv_fs_open, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}),
eventloop(), req, path, flags, mode, C_NULL)
handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req)
C_NULL, req, path, flags, mode, C_NULL)
handle = ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req)
ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req)
uv_error("open", ret)
finally # conversion to Cstring could cause an exception
Expand Down Expand Up @@ -132,9 +132,9 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c))
function truncate(f::File, n::Integer)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:jl_uv_fs_ftruncate, Int32,
err = ccall(:uv_fs_ftruncate, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}),
eventloop(), req, f.handle, n, C_NULL)
C_NULL, req, f.handle, n, C_NULL)
Libc.free(req)
uv_error("ftruncate", err)
return f
Expand All @@ -143,9 +143,9 @@ end
function futime(f::File, atime::Float64, mtime::Float64)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:jl_uv_fs_futime, Int32,
err = ccall(:uv_fs_futime, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}),
eventloop(), req, f.handle, atime, mtime, C_NULL)
C_NULL, req, f.handle, atime, mtime, C_NULL)
Libc.free(req)
uv_error("futime", err)
return f
Expand Down
3 changes: 0 additions & 3 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ Close an I/O stream. Performs a [`flush`](@ref) first.
"""
function close end
function flush end
function wait_connected end
function wait_readnb end
function wait_readbyte end
function wait_close end
function bytesavailable end

Expand Down Expand Up @@ -260,7 +258,6 @@ iswritable(io::AbstractPipe) = iswritable(pipe_writer(io))
isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io))
close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io)))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb)
wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io)))

"""
Expand Down
Loading