From cfd2a143856a0b7cf8dd7533d245dc238b192e6e Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 5 Jun 2019 17:19:45 -0400 Subject: [PATCH] threads: expand thread-safe region for I/O This should hopefully cover most I/O operations which go through libuv to make them thread-safe. There is no scaling here, just one big global lock, so expect worse-than-single-threaded performance if doing I/O on multiple threads (as compared to doing the same work on one thread). The intention is to handle performance improvement incrementally later. It also necessarily redesigns parts of the UDPSocket implementation to properly handle concurrent (single-threaded) usage, as a necessary part of making it handle parallel (thread-safe) usage. --- base/asyncevent.jl | 158 ++++---- base/condition.jl | 3 + base/coreio.jl | 2 - base/file.jl | 12 +- base/filesystem.jl | 14 +- base/io.jl | 3 - base/libuv.jl | 33 +- base/process.jl | 45 ++- base/stream.jl | 482 ++++++++++++++---------- base/sysinfo.jl | 6 +- doc/src/devdocs/locks.md | 32 +- src/jl_uv.c | 321 ++++------------ src/julia.h | 5 - src/julia_internal.h | 2 - src/sys.c | 2 - stdlib/Distributed/src/Distributed.jl | 4 +- stdlib/Distributed/src/managers.jl | 13 +- stdlib/FileWatching/src/FileWatching.jl | 93 +++-- stdlib/Sockets/src/PipeServer.jl | 19 +- stdlib/Sockets/src/Sockets.jl | 363 +++++++++++------- stdlib/Sockets/src/addrinfo.jl | 31 +- stdlib/Sockets/test/runtests.jl | 86 +++-- test/channels.jl | 141 +++---- test/read.jl | 4 +- test/spawn.jl | 6 +- 25 files changed, 1017 insertions(+), 863 deletions(-) diff --git a/base/asyncevent.jl b/base/asyncevent.jl index 2299993bd2ea0..625eff3bd288a 100644 --- a/base/asyncevent.jl +++ b/base/asyncevent.jl @@ -15,11 +15,12 @@ mutable struct AsyncCondition handle::Ptr{Cvoid} cond::ThreadSynchronizer isopen::Bool + set::Bool function AsyncCondition() - this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true) + this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true, false) + iolock_begin() associate_julia_struct(this.handle, this) - finalizer(uvfinalize, this) err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this, uv_jl_asynccb::Ptr{Cvoid}) if err != 0 @@ -28,6 +29,8 @@ mutable struct AsyncCondition this.handle = C_NULL throw(_UVError("uv_async_init", err)) end + finalizer(uvfinalize, this) + iolock_end() return this end end @@ -40,28 +43,10 @@ the async condition object itself. """ function AsyncCondition(cb::Function) async = AsyncCondition() - waiter = Task(function() - lock(async.cond) - try - while isopen(async) - success = try - stream_wait(async, async.cond) - true - catch exc # ignore possible exception on close() - isa(exc, EOFError) || rethrow() - finally - unlock(async.cond) - end - success && cb(async) - lock(async.cond) - end - finally - unlock(async.cond) + @async while _trywait(async) + cb(async) + isopen(async) || return end - end) - # must start the task right away so that it can wait for the AsyncCondition before - # we re-enter the event loop. this avoids a race condition. see issue #12719 - yield(waiter) return async end @@ -81,17 +66,26 @@ mutable struct Timer handle::Ptr{Cvoid} cond::ThreadSynchronizer isopen::Bool + set::Bool function Timer(timeout::Real; interval::Real = 0.0) timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds")) interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds")) + timeout = UInt64(round(timeout * 1000)) + 1 + interval = UInt64(round(interval * 1000)) + loop = eventloop() - this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true) - ccall(:jl_uv_update_timer_start, Cvoid, - (Ptr{Cvoid}, Any, Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), - eventloop(), this, this.handle, uv_jl_timercb::Ptr{Cvoid}, - UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000))) + this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true, false) + associate_julia_struct(this.handle, this) + iolock_begin() + err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), loop, this) + @assert err == 0 finalizer(uvfinalize, this) + ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), loop) + err = ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), + this, uv_jl_timercb::Ptr{Cvoid}, timeout, interval) + @assert err == 0 + iolock_end() return this end end @@ -99,41 +93,81 @@ end unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle -function wait(t::Union{Timer, AsyncCondition}) - lock(t.cond) - try - isopen(t) || throw(EOFError()) - stream_wait(t, t.cond) - finally - unlock(t.cond) +function _trywait(t::Union{Timer, AsyncCondition}) + set = t.set + if !set + t.handle == C_NULL && return false + iolock_begin() + set = t.set + if !set + preserve_handle(t) + lock(t.cond) + try + set = t.set + if !set + if t.handle != C_NULL + iolock_end() + set = wait(t.cond) + unlock(t.cond) + iolock_begin() + lock(t.cond) + end + end + finally + unlock(t.cond) + unpreserve_handle(t) + end + end + iolock_end() end + t.set = false + return set +end + +function wait(t::Union{Timer, AsyncCondition}) + _trywait(t) || throw(EOFError()) + nothing end + isopen(t::Union{Timer, AsyncCondition}) = t.isopen function close(t::Union{Timer, AsyncCondition}) + iolock_begin() if t.handle != C_NULL && isopen(t) t.isopen = false ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) end + iolock_end() nothing end function uvfinalize(t::Union{Timer, AsyncCondition}) - if t.handle != C_NULL - disassociate_julia_struct(t.handle) # not going to call the usual close hooks - close(t) - t.handle = C_NULL + iolock_begin() + lock(t.cond) + try + if t.handle != C_NULL + disassociate_julia_struct(t.handle) # not going to call the usual close hooks + if t.isopen + t.isopen = false + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) + end + t.handle = C_NULL + notify(t.cond, false) + end + finally + unlock(t.cond) end - t.isopen = false + iolock_end() nothing end function _uv_hook_close(t::Union{Timer, AsyncCondition}) lock(t.cond) try - uvfinalize(t) - notify_error(t.cond, EOFError()) + t.isopen = false + t.handle = C_NULL + notify(t.cond, t.set) finally unlock(t.cond) end @@ -144,7 +178,8 @@ function uv_asynccb(handle::Ptr{Cvoid}) async = @handle_as handle AsyncCondition lock(async.cond) try - notify(async.cond) + async.set = true + notify(async.cond, true) finally unlock(async.cond) end @@ -155,11 +190,12 @@ function uv_timercb(handle::Ptr{Cvoid}) t = @handle_as handle Timer lock(t.cond) try + t.set = true if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0 # timer is stopped now close(t) end - notify(t.cond) + notify(t.cond, true) finally unlock(t.cond) end @@ -199,7 +235,7 @@ Here the first number is printed after a delay of two seconds, then the followin julia> begin i = 0 cb(timer) = (global i += 1; println(i)) - t = Timer(cb, 2, interval = 0.2) + t = Timer(cb, 2, interval=0.2) wait(t) sleep(0.5) close(t) @@ -209,24 +245,13 @@ julia> begin 3 ``` """ -function Timer(cb::Function, timeout::Real; interval::Real = 0.0) - t = Timer(timeout, interval = interval) - waiter = Task(function() - while isopen(t) - success = try - wait(t) - true - catch exc # ignore possible exception on close() - isa(exc, EOFError) || rethrow() - false - end - success && cb(t) +function Timer(cb::Function, timeout::Real; interval::Real=0.0) + timer = Timer(timeout, interval=interval) + @async while _trywait(timer) + cb(timer) + isopen(timer) || return end - end) - # must start the task right away so that it can wait for the Timer before - # we re-enter the event loop. this avoids a race condition. see issue #12719 - yield(waiter) - return t + return timer end """ @@ -234,12 +259,14 @@ end Waits until `testcb` returns `true` or for `secs` seconds, whichever is earlier. `testcb` is polled every `pollint` seconds. + +Returns :ok, :timed_out, or :error """ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) pollint > 0 || throw(ArgumentError("cannot set pollint to $pollint seconds")) start = time() done = Channel(1) - timercb(aw) = begin + function timercb(aw) try if testcb() put!(done, :ok) @@ -251,14 +278,15 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) finally isready(done) && close(aw) end + nothing end if !testcb() t = Timer(timercb, pollint, interval = pollint) - ret = fetch(done) + ret = fetch(done)::Symbol close(t) else ret = :ok end - ret + return ret end diff --git a/base/condition.jl b/base/condition.jl index 3afc538d53386..eb136637dc1d5 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -76,6 +76,9 @@ unlock(c::GenericCondition) = unlock(c.lock) trylock(c::GenericCondition) = trylock(c.lock) islocked(c::GenericCondition) = islocked(c.lock) +lock(f, c::GenericCondition) = lock(f, c.lock) +unlock(f, c::GenericCondition) = unlock(f, c.lock) + """ wait([x]) diff --git a/base/coreio.jl b/base/coreio.jl index d2be652731e43..2796c53e759f5 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -17,9 +17,7 @@ write(::DevNull, ::UInt8) = 1 unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n close(::DevNull) = nothing flush(::DevNull) = nothing -wait_connected(::DevNull) = nothing wait_readnb(::DevNull) = wait() -wait_readbyte(::DevNull) = wait() wait_close(::DevNull) = wait() eof(::DevNull) = true diff --git a/base/file.jl b/base/file.jl index b7f98c7323774..d553b17c2d081 100644 --- a/base/file.jl +++ b/base/file.jl @@ -537,7 +537,7 @@ function mktempdir(parent=tempdir(); prefix=temp_prefix) try ret = ccall(:uv_fs_mkdtemp, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), - eventloop(), req, tpath, C_NULL) + C_NULL, req, tpath, C_NULL) if ret < 0 ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req) uv_error("mktempdir", ret) @@ -623,8 +623,8 @@ function readdir(path::AbstractString) uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ())) # defined in sys.c, to call uv_fs_readdir, which sets errno on error. - err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}), - eventloop(), uv_readdir_req, path, 0, C_NULL) + err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}), + C_NULL, uv_readdir_req, path, 0, C_NULL) err < 0 && throw(SystemError("unable to read directory $path", -err)) #uv_error("unable to read directory $path", err) @@ -636,7 +636,7 @@ function readdir(path::AbstractString) end # Clean up the request string - ccall(:jl_uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req) + ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req) return entries end @@ -808,9 +808,9 @@ Return the target location a symbolic link `path` points to. function readlink(path::AbstractString) req = Libc.malloc(_sizeof_uv_fs) try - ret = ccall(:jl_uv_fs_readlink, Int32, + ret = ccall(:uv_fs_readlink, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), - eventloop(), req, path, C_NULL) + C_NULL, req, path, C_NULL) if ret < 0 ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req) uv_error("readlink", ret) diff --git a/base/filesystem.jl b/base/filesystem.jl index 6b61272cc893b..0bf4c3b1145cf 100644 --- a/base/filesystem.jl +++ b/base/filesystem.jl @@ -73,10 +73,10 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0) req = Libc.malloc(_sizeof_uv_fs) local handle try - ret = ccall(:jl_uv_fs_open, Int32, + ret = ccall(:uv_fs_open, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}), - eventloop(), req, path, flags, mode, C_NULL) - handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req) + C_NULL, req, path, flags, mode, C_NULL) + handle = ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req) ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req) uv_error("open", ret) finally # conversion to Cstring could cause an exception @@ -132,9 +132,9 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c)) function truncate(f::File, n::Integer) check_open(f) req = Libc.malloc(_sizeof_uv_fs) - err = ccall(:jl_uv_fs_ftruncate, Int32, + err = ccall(:uv_fs_ftruncate, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}), - eventloop(), req, f.handle, n, C_NULL) + C_NULL, req, f.handle, n, C_NULL) Libc.free(req) uv_error("ftruncate", err) return f @@ -143,9 +143,9 @@ end function futime(f::File, atime::Float64, mtime::Float64) check_open(f) req = Libc.malloc(_sizeof_uv_fs) - err = ccall(:jl_uv_fs_futime, Int32, + err = ccall(:uv_fs_futime, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}), - eventloop(), req, f.handle, atime, mtime, C_NULL) + C_NULL, req, f.handle, atime, mtime, C_NULL) Libc.free(req) uv_error("futime", err) return f diff --git a/base/io.jl b/base/io.jl index e14caf4ac323b..047862ec227b3 100644 --- a/base/io.jl +++ b/base/io.jl @@ -61,9 +61,7 @@ Close an I/O stream. Performs a [`flush`](@ref) first. """ function close end function flush end -function wait_connected end function wait_readnb end -function wait_readbyte end function wait_close end function bytesavailable end @@ -260,7 +258,6 @@ iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)) isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io)) close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io))) wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb) -wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte) wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io))) """ diff --git a/base/libuv.jl b/base/libuv.jl index 760d97e91eed6..5320e1ef6d8e9 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -9,31 +9,31 @@ function uv_sizeof_handle(handle) if !(UV_UNKNOWN_HANDLE < handle < UV_HANDLE_TYPE_MAX) throw(DomainError(handle)) end - ccall(:uv_handle_size,Csize_t,(Int32,),handle) + return ccall(:uv_handle_size, Csize_t, (Int32,), handle) end function uv_sizeof_req(req) if !(UV_UNKNOWN_REQ < req < UV_REQ_TYPE_MAX) throw(DomainError(req)) end - ccall(:uv_req_size,Csize_t,(Int32,),req) + return ccall(:uv_req_size, Csize_t, (Int32,), req) end for h in uv_handle_types -@eval const $(Symbol("_sizeof_",lowercase(string(h)))) = uv_sizeof_handle($h) +@eval const $(Symbol("_sizeof_", lowercase(string(h)))) = uv_sizeof_handle($h) end for r in uv_req_types -@eval const $(Symbol("_sizeof_",lowercase(string(r)))) = uv_sizeof_req($r) +@eval const $(Symbol("_sizeof_", lowercase(string(r)))) = uv_sizeof_req($r) end -uv_handle_data(handle) = ccall(:jl_uv_handle_data,Ptr{Cvoid},(Ptr{Cvoid},),handle) -uv_req_data(handle) = ccall(:jl_uv_req_data,Ptr{Cvoid},(Ptr{Cvoid},),handle) -uv_req_set_data(req,data) = ccall(:jl_uv_req_set_data,Cvoid,(Ptr{Cvoid},Any),req,data) -uv_req_set_data(req,data::Ptr{Cvoid}) = ccall(:jl_uv_req_set_data,Cvoid,(Ptr{Cvoid},Ptr{Cvoid}),req,data) +uv_handle_data(handle) = ccall(:jl_uv_handle_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) +uv_req_data(handle) = ccall(:jl_uv_req_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) +uv_req_set_data(req, data) = ccall(:jl_uv_req_set_data, Cvoid, (Ptr{Cvoid}, Any), req, data) +uv_req_set_data(req, data::Ptr{Cvoid}) = ccall(:jl_uv_req_set_data, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), req, data) macro handle_as(hand, typ) - quote - data = uv_handle_data($(esc(hand))) + return quote + local data = uv_handle_data($(esc(hand))) data == C_NULL && return unsafe_pointer_to_objref(data)::($(esc(typ))) end @@ -45,6 +45,9 @@ disassociate_julia_struct(uv) = disassociate_julia_struct(uv.handle) disassociate_julia_struct(handle::Ptr{Cvoid}) = handle != C_NULL && ccall(:jl_uv_disassociate_julia_struct, Cvoid, (Ptr{Cvoid},), handle) +iolock_begin() = ccall(:jl_iolock_begin, Cvoid, ()) +iolock_end() = ccall(:jl_iolock_end, Cvoid, ()) + # A dict of all libuv handles that are being waited on somewhere in the system # and should thus not be garbage collected const uvhandles = IdDict() @@ -83,16 +86,15 @@ function _UVError(pfx::AbstractString, code::Integer) IOError(string(pfx, ": ", struverror(code), " (", uverrorname(code), ")"), code) end -struverror(err::Int32) = unsafe_string(ccall(:uv_strerror,Cstring,(Int32,),err)) -uverrorname(err::Int32) = unsafe_string(ccall(:uv_err_name,Cstring,(Int32,),err)) +struverror(err::Int32) = unsafe_string(ccall(:uv_strerror, Cstring, (Int32,), err)) +uverrorname(err::Int32) = unsafe_string(ccall(:uv_err_name, Cstring, (Int32,), err)) -uv_error(prefix::Symbol, c::Integer) = uv_error(string(prefix),c) -uv_error(prefix::AbstractString, c::Integer) = c < 0 ? throw(_UVError(prefix,c)) : nothing +uv_error(prefix::Symbol, c::Integer) = uv_error(string(prefix), c) +uv_error(prefix::AbstractString, c::Integer) = c < 0 ? throw(_UVError(prefix, c)) : nothing ## event loop ## eventloop() = uv_eventloop::Ptr{Cvoid} -#mkNewEventLoop() = ccall(:jl_new_event_loop,Ptr{Cvoid},()) # this would probably be fine, but is nowhere supported function process_events() return ccall(:jl_process_events, Int32, (Ptr{Cvoid},), eventloop()) @@ -117,6 +119,7 @@ function reinit_stdio() global stdin = init_stdio(ccall(:jl_stdin_stream, Ptr{Cvoid}, ())) global stdout = init_stdio(ccall(:jl_stdout_stream, Ptr{Cvoid}, ())) global stderr = init_stdio(ccall(:jl_stderr_stream, Ptr{Cvoid}, ())) + nothing end """ diff --git a/base/process.jl b/base/process.jl index 721838229024a..f7797297e8fe8 100644 --- a/base/process.jl +++ b/base/process.jl @@ -319,13 +319,12 @@ mutable struct Process <: AbstractPipe err::IO exitcode::Int64 termsignal::Int32 - exitnotify::Condition - closenotify::Condition + exitnotify::ThreadSynchronizer function Process(cmd::Cmd, handle::Ptr{Cvoid}) this = new(cmd, handle, devnull, devnull, devnull, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), - Condition(), Condition()) + ThreadSynchronizer()) finalizer(uvfinalize, this) return this end @@ -366,14 +365,18 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) proc.termsignal = termsignal ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle) proc.handle = C_NULL - notify(proc.exitnotify) + lock(proc.exitnotify) + try + notify(proc.exitnotify) + finally + unlock(proc.exitnotify) + end nothing end # called when the libuv handle is destroyed function _uv_hook_close(proc::Process) proc.handle = C_NULL - notify(proc.closenotify) nothing end @@ -855,6 +858,7 @@ error if killing the process failed for other reasons (e.g. insufficient permissions). """ function kill(p::Process, signum::Integer) + iolock_begin() if process_running(p) @assert p.handle != C_NULL err = ccall(:uv_process_kill, Int32, (Ptr{Cvoid}, Int32), p.handle, signum) @@ -862,6 +866,8 @@ function kill(p::Process, signum::Integer) throw(_UVError("kill", err)) end end + iolock_end() + nothing end kill(ps::Vector{Process}) = foreach(kill, ps) kill(ps::ProcessChain) = foreach(kill, ps.processes) @@ -876,18 +882,17 @@ Get the child process ID, if it still exists. This function requires at least Julia 1.1. """ function Libc.getpid(p::Process) + # TODO: due to threading, this method is no longer synchronized with the user application + iolock_begin() ppid = Int32(0) if p.handle != C_NULL ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle) end + iolock_end() ppid <= 0 && throw(_UVError("getpid", UV_ESRCH)) return ppid end -function _contains_newline(bufptr::Ptr{Cvoid}, len::Int32) - return (ccall(:memchr, Ptr{Cvoid}, (Ptr{Cvoid},Int32,Csize_t), bufptr, '\n', len) != C_NULL) -end - ## process status ## """ @@ -988,8 +993,26 @@ macro cmd(str) return :(cmd_gen($(esc(shell_parse(str, special=shell_special)[1])))) end -wait(x::Process) = if !process_exited(x); stream_wait(x, x.exitnotify); end -wait(x::ProcessChain) = for p in x.processes; wait(p); end +function wait(x::Process) + process_exited(x) && return + iolock_begin() + if !process_exited(x) + preserve_handle(x) + lock(x.exitnotify) + iolock_end() + try + wait(x.exitnotify) + finally + unlock(x.exitnotify) + unpreserve_handle(x) + end + else + iolock_end() + end + nothing +end + +wait(x::ProcessChain) = foreach(wait, x.processes) show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")") diff --git a/base/stream.jl b/base/stream.jl index 900439718708c..d7ff4c8cc0b7f 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -41,24 +41,15 @@ abstract type LibuvStream <: IO end # Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported) -function stream_wait(x, c...) # for x::LibuvObject - preserve_handle(x) - try - return wait(c...) - finally - unpreserve_handle(x) - end -end - bytesavailable(s::LibuvStream) = bytesavailable(s.buffer) function eof(s::LibuvStream) if isopen(s) # fast path - bytesavailable(s) > 0 && return false + bytesavailable(s) <= 0 || return false else return bytesavailable(s) <= 0 end - wait_readnb(s,1) + wait_readnb(s, 1) return !isopen(s) && bytesavailable(s) <= 0 end @@ -119,17 +110,16 @@ mutable struct PipeEndpoint <: LibuvStream status::Int buffer::IOBuffer cond::ThreadSynchronizer - closenotify::ThreadSynchronizer + readerror::Any sendbuf::Union{IOBuffer, Nothing} - lock::ReentrantLock + lock::ReentrantLock # advisory lock throttle::Int function PipeEndpoint(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() p = new(handle, status, PipeBuffer(), - ThreadSynchronizer(lock), - ThreadSynchronizer(lock), + ThreadSynchronizer(), + nothing, nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) @@ -141,17 +131,21 @@ end function PipeEndpoint() pipe = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit) + iolock_begin() err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0) uv_error("failed to create pipe endpoint", err) pipe.status = StatusInit + iolock_end() return pipe end function PipeEndpoint(fd::OS_HANDLE) pipe = PipeEndpoint() + iolock_begin() err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) uv_error("pipe_open", err) pipe.status = StatusOpen + iolock_end() return pipe end if OS_HANDLE != RawFD @@ -164,19 +158,18 @@ mutable struct TTY <: LibuvStream status::Int buffer::IOBuffer cond::ThreadSynchronizer - closenotify::ThreadSynchronizer + readerror::Any sendbuf::Union{IOBuffer, Nothing} - lock::ReentrantLock + lock::ReentrantLock # advisory lock throttle::Int @static if Sys.iswindows(); ispty::Bool; end function TTY(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() tty = new( handle, status, PipeBuffer(), - ThreadSynchronizer(lock), - ThreadSynchronizer(lock), + ThreadSynchronizer(), + nothing, nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) @@ -191,10 +184,12 @@ end function TTY(fd::OS_HANDLE) tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit) + iolock_begin() err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int32), eventloop(), tty.handle, fd, 0) uv_error("TTY", err) tty.status = StatusOpen + iolock_end() return tty end if OS_HANDLE != RawFD @@ -230,7 +225,9 @@ rawhandle(stream::LibuvStream) = stream.handle unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle function init_stdio(handle::Ptr{Cvoid}) + iolock_begin() t = ccall(:jl_uv_handle_type, Int32, (Ptr{Cvoid},), handle) + local io if t == UV_FILE fd = ccall(:jl_uv_file_handle, OS_HANDLE, (Ptr{Cvoid},), handle) # TODO: Replace ios.c file with libuv fs? @@ -240,17 +237,19 @@ function init_stdio(handle::Ptr{Cvoid}) fd = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), fd, 0) end # TODO: Get fdio to work natively with file descriptors instead of integers - return fdio(cconvert(Cint, fd)) + io = fdio(cconvert(Cint, fd)) elseif t == UV_TTY - return TTY(handle, StatusOpen) + io = TTY(handle, StatusOpen) elseif t == UV_TCP Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) - return Sockets.TCPSocket(handle, StatusOpen) + io = Sockets.TCPSocket(handle, StatusOpen) elseif t == UV_NAMED_PIPE - return PipeEndpoint(handle, StatusOpen) + io = PipeEndpoint(handle, StatusOpen) else throw(ArgumentError("invalid stdio type: $t")) end + iolock_end() + return io end """ @@ -266,35 +265,38 @@ of the original handle. other part of the system. """ function open(h::OS_HANDLE) + iolock_begin() t = ccall(:uv_guess_handle, Cint, (OS_HANDLE,), h) + local io if t == UV_FILE @static if Sys.iswindows() # TODO: Get ios.c to understand native handles h = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), h, 0) end # TODO: Get fdio to work natively with file descriptors instead of integers - return fdio(cconvert(Cint, h)) + io = fdio(cconvert(Cint, h)) elseif t == UV_TTY - return TTY(h) + io = TTY(h) elseif t == UV_TCP Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) - return Sockets.TCPSocket(h) + io = Sockets.TCPSocket(h) elseif t == UV_NAMED_PIPE - pipe = PipeEndpoint(h) + io = PipeEndpoint(h) @static if Sys.iswindows() - if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), pipe.handle) != 0 + if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), io.handle) != 0 # replace the Julia `PipeEndpoint` type with a `TTY` type, # if we detect that this is a cygwin pty object - pipe_handle, pipe_status = pipe.handle, pipe.status - pipe.status = StatusClosed - pipe.handle = C_NULL - return TTY(pipe_handle, pipe_status) + pipe_handle, pipe_status = io.handle, pipe.status + io.status = StatusClosed + io.handle = C_NULL + io = TTY(pipe_handle, pipe_status) end end - return pipe else throw(ArgumentError("invalid stdio type: $t")) end + iolock_end() + return io end if OS_HANDLE != RawFD @@ -324,46 +326,10 @@ function check_open(x::Union{LibuvStream, LibuvServer}) end end -function wait_connected(x::Union{LibuvStream, LibuvServer}) - check_open(x) - lock(x.cond) - try - while x.status == StatusConnecting - stream_wait(x, x.cond) - check_open(x) - end - finally - unlock(x.cond) - end -end - -function wait_readbyte(x::LibuvStream, c::UInt8) - if isopen(x) # fast path - occursin(c, x.buffer) && return - else - return - end - preserve_handle(x) - lock(x.cond) - try - while isopen(x) && !occursin(c, x.buffer) - start_reading(x) # ensure we are reading - wait(x.cond) - end - finally - if isempty(x.cond) - stop_reading(x) # stop reading iff there are currently no other read clients of the stream - end - unpreserve_handle(x) - unlock(x.cond) - end - nothing -end - function wait_readnb(x::LibuvStream, nb::Int) - if isopen(x) # fast path - bytesavailable(x.buffer) >= nb && return - else + iolock_begin() + if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path + iolock_end() return end oldthrottle = x.throttle @@ -371,9 +337,14 @@ function wait_readnb(x::LibuvStream, nb::Int) lock(x.cond) try while isopen(x) && bytesavailable(x.buffer) < nb + x.readerror === nothing || throw(x.readerror) x.throttle = max(nb, x.throttle) start_reading(x) # ensure we are reading + iolock_end() wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) end finally if isempty(x.cond) @@ -385,45 +356,45 @@ function wait_readnb(x::LibuvStream, nb::Int) unpreserve_handle(x) unlock(x.cond) end + iolock_end() nothing end function wait_close(x::Union{LibuvStream, LibuvServer}) - lock(x.closenotify) + preserve_handle(x) + lock(x.cond) try - if isopen(x) - stream_wait(x, x.closenotify) + while isopen(x) + wait(x.cond) end finally - unlock(x.closenotify) + unlock(x.cond) + unpreserve_handle(x) end nothing end function close(stream::Union{LibuvStream, LibuvServer}) + iolock_begin() + should_wait = false if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - return nothing - end - lock(stream.closenotify) - try - if isopen(stream) || stream.status == StatusEOF - should_wait = uv_handle_data(stream) != C_NULL - if stream.status != StatusClosing - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - stream.status = StatusClosing - end - if should_wait - stream_wait(stream, stream.closenotify) - end + stream.status = StatusClosing + elseif isopen(stream) || stream.status == StatusEOF + should_wait = uv_handle_data(stream) != C_NULL + if stream.status != StatusClosing + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) + stream.status = StatusClosing end - finally - unlock(stream.closenotify) end - return nothing + iolock_end() + should_wait && wait_close(stream) + nothing end function uvfinalize(uv::Union{LibuvStream, LibuvServer}) + uv.handle == C_NULL && return + iolock_begin() if uv.handle != C_NULL disassociate_julia_struct(uv.handle) # not going to call the usual close hooks if uv.status != StatusUninit @@ -434,6 +405,7 @@ function uvfinalize(uv::Union{LibuvStream, LibuvServer}) uv.status = StatusClosed uv.handle = C_NULL end + iolock_end() nothing end @@ -489,9 +461,11 @@ function displaysize(io::TTY) s1 = Ref{Int32}(0) s2 = Ref{Int32}(0) + iolock_begin() Base.uv_error("size (TTY)", ccall(:uv_tty_get_winsize, Int32, (Ptr{Cvoid}, Ptr{Int32}, Ptr{Int32}), io, s1, s2) != 0) + iolock_end() w, h = s1[], s2[] h > 0 || (h = default_size[1]) w > 0 || (w = default_size[2]) @@ -522,6 +496,7 @@ function notify_filled(buffer::IOBuffer, nread::Int) else buffer.ptr += nread end + nothing end function alloc_buf_hook(stream::LibuvStream, size::UInt) @@ -568,20 +543,22 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) # remind the client that stream.buffer is full notify(stream.cond) elseif nread == UV_EOF + stream.readerror = EOFError() if isa(stream, TTY) stream.status = StatusEOF # libuv called uv_stop_reading already notify(stream.cond) - notify(stream.closenotify) elseif stream.status != StatusClosing # begin shutdown of the stream ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end else + stream.readerror = _UVError("read", nread) # This is a fatal connection error. Shutdown requests as per the usual # close function won't work and libuv will fail with an assertion failure ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream) - notify_error(stream.cond, _UVError("read", nread)) + stream.status = StatusClosing + notify(stream.cond) end else notify_filled(stream.buffer, nread) @@ -600,7 +577,7 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) ((bytesavailable(stream.buffer) >= stream.throttle) || (bytesavailable(stream.buffer) >= stream.buffer.maxsize))) # save cycles by stopping kernel notifications from arriving - ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream) + ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) stream.status = StatusOpen end nothing @@ -609,22 +586,24 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) end function reseteof(x::TTY) + iolock_begin() if x.status == StatusEOF x.status = StatusOpen + x.readerror isa EOFError && (x.readerror = nothing) end + iolock_end() nothing end function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) - lock(uv.closenotify) + lock(uv.cond) try uv.handle = C_NULL uv.status = StatusClosed # notify any listeners that exist on this libuv stream type - notify(uv.closenotify) notify(uv.cond) finally - unlock(uv.closenotify) + unlock(uv.cond) end nothing end @@ -678,12 +657,14 @@ show(io::IO, stream::Pipe) = print(io, ## Functions for PipeEndpoint and PipeServer ## function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE) + iolock_begin() if p.status != StatusInit error("pipe is already in use or has been closed") end err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle) uv_error("pipe_open", err) p.status = StatusOpen + iolock_end() return p end @@ -698,13 +679,11 @@ function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool, close_pipe_sync(rd) rethrow() end - read_end.status = StatusOpen open_pipe!(write_end, wr) catch close_pipe_sync(wr) rethrow() end - write_end.status = StatusOpen nothing end @@ -736,6 +715,7 @@ end # flow control function start_reading(stream::LibuvStream) + iolock_begin() if stream.status == StatusOpen if !isreadable(stream) error("tried to read a stream that is not readable") @@ -743,17 +723,18 @@ function start_reading(stream::LibuvStream) # libuv may call the alloc callback immediately # for a TTY on Windows, so ensure the status is set first stream.status = StatusActive - ret = ccall(:jl_uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), + ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid}) - return ret elseif stream.status == StatusPaused stream.status = StatusActive - return Int32(0) + ret = Int32(0) elseif stream.status == StatusActive - return Int32(0) + ret = Int32(0) else - return Int32(-1) + ret = Int32(-1) end + iolock_end() + return ret end if Sys.iswindows() @@ -763,17 +744,21 @@ if Sys.iswindows() # and a ReadFile call blocking on one thread # causes all other operations on that stream to lockup function stop_reading(stream::LibuvStream) + iolock_begin() if stream.status == StatusActive stream.status = StatusOpen - ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream) + ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) end + iolock_end() nothing end else function stop_reading(stream::LibuvStream) + iolock_begin() if stream.status == StatusActive stream.status = StatusPaused end + iolock_end() nothing end end @@ -782,53 +767,72 @@ end readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a)) = readbytes!(s, a, Int(nb)) function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) + iolock_begin() sbuf = s.buffer @assert sbuf.seekable == false @assert sbuf.maxsize >= nb + local nread if bytesavailable(sbuf) >= nb - return readbytes!(sbuf, a, nb) + nread = readbytes!(sbuf, a, nb) + iolock_end() + return nread end if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - wait_readnb(s, nb) - return readbytes!(sbuf, a, nb) - else - try - stop_reading(s) # Just playing it safe, since we are going to switch buffers. - newbuf = PipeBuffer(a, maxsize = nb) - newbuf.size = 0 # reset the write pointer to the beginning - s.buffer = newbuf - write(newbuf, sbuf) - wait_readnb(s, Int(nb)) - compact(newbuf) - return bytesavailable(newbuf) - finally - s.buffer = sbuf - if !isempty(s.cond) - start_reading(s) # resume reading iff there are currently other read clients of the stream - end + while isopen(s) && bytesavailable(sbuf) < nb + iolock_end() + wait_readnb(s, nb) + iolock_begin() + end + nread = readbytes!(sbuf, a, nb) + iolock_end() + return nread + end + + nread = try + stop_reading(s) # Just playing it safe, since we are going to switch buffers. + newbuf = PipeBuffer(a, maxsize = nb) + newbuf.size = 0 # reset the write pointer to the beginning + s.buffer = newbuf + write(newbuf, sbuf) + iolock_end() + wait_readnb(s, Int(nb)) + iolock_begin() + compact(newbuf) + bytesavailable(newbuf) + finally + s.buffer = sbuf + if !isempty(s.cond) + start_reading(s) # resume reading iff there are currently other read clients of the stream end end - @assert false # unreachable + iolock_end() + return nread end function read(stream::LibuvStream) wait_readnb(stream, typemax(Int)) - return take!(stream.buffer) + iolock_begin() + bytes = take!(stream.buffer) + iolock_end() + return bytes end function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) + iolock_begin() sbuf = s.buffer @assert sbuf.seekable == false @assert sbuf.maxsize >= nb if bytesavailable(sbuf) >= nb - return unsafe_read(sbuf, p, nb) - end - - if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - wait_readnb(s, Int(nb)) + unsafe_read(sbuf, p, nb) + elseif nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer + while isopen(s) && bytesavailable(sbuf) < nb + iolock_end() + wait_readnb(s, Int(nb)) + iolock_begin() + end unsafe_read(sbuf, p, nb) else try @@ -837,7 +841,9 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) newbuf.size = 0 # reset the write pointer to the beginning s.buffer = newbuf write(newbuf, sbuf) + iolock_end() wait_readnb(s, Int(nb)) + iolock_begin() nb == bytesavailable(newbuf) || throw(EOFError()) finally s.buffer = sbuf @@ -846,42 +852,80 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) end end end + iolock_end() nothing end function read(this::LibuvStream, ::Type{UInt8}) - wait_readnb(this, 1) - buf = this.buffer - @assert buf.seekable == false - return read(buf, UInt8) + iolock_begin() + sbuf = this.buffer + @assert sbuf.seekable == false + while isopen(this) && bytesavailable(sbuf) < 1 + iolock_end() + wait_readnb(this, 1) + iolock_begin() + end + c = read(sbuf, UInt8) + iolock_end() + return c end function readavailable(this::LibuvStream) wait_readnb(this, 1) + iolock_begin() buf = this.buffer @assert buf.seekable == false - return take!(buf) + bytes = take!(buf) + iolock_end() + return bytes end -function readuntil(this::LibuvStream, c::UInt8; keep::Bool=false) - wait_readbyte(this, c) - buf = this.buffer +function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false) + iolock_begin() + buf = x.buffer @assert buf.seekable == false - return readuntil(buf, c, keep=keep) + if isopen(x) && !occursin(c, buf) # fast path + preserve_handle(x) + lock(x.cond) + try + while isopen(x) && !occursin(c, x.buffer) + x.readerror === nothing || throw(x.readerror) + start_reading(x) # ensure we are reading + iolock_end() + wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) + end + finally + if isempty(x.cond) + stop_reading(x) # stop reading iff there are currently no other read clients of the stream + end + unlock(x.cond) + unpreserve_handle(x) + end + end + bytes = readuntil(buf, c, keep=keep) + iolock_end() + return bytes end uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p))) +# caller must have acquired the iolock function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) + uvw = uv_write_async(s, p, n) ct = current_task() - uvw = uv_write_async(s, p, n, ct) preserve_handle(ct) - try + uv_req_set_data(uvw, ct) + iolock_end() + status = try # wait for the last chunk to complete (or error) # assume that any errors would be sticky, # (so we don't need to monitor the error status of the intermediate writes) - wait() + wait()::Cint finally + iolock_begin() if uv_req_data(uvw) != C_NULL # uvw is still alive, # so make sure we won't get spurious notifications later @@ -890,20 +934,24 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) # done with uvw Libc.free(uvw) end + iolock_end() unpreserve_handle(ct) end + if status < 0 + throw(_UVError("write", status)) + end return Int(n) end # helper function for uv_write that returns the uv_write_t struct for the write -# rather than waiting on it -function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt, reqdata) +# rather than waiting on it, caller must hold the iolock +function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt) check_open(s) while true uvw = Libc.malloc(_sizeof_uv_write) - uv_req_set_data(uvw, reqdata) + uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle. - # TODO: use writev, when that is added to uv-win + # TODO: use writev instead of a loop err = ccall(:jl_uv_write, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}), @@ -927,30 +975,32 @@ end # - large isbits arrays are unbuffered and written directly function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) - if s.sendbuf === nothing - return uv_write(s, p, UInt(n)) - end - - buf = s.sendbuf - totb = bytesavailable(buf) + n - if totb < buf.maxsize - nb = unsafe_write(buf, p, n) - else - flush(s) - if n > buf.maxsize - nb = uv_write(s, p, n) - else + while true + # try to add to the send buffer + iolock_begin() + buf = s.sendbuf + buf === nothing && break + totb = bytesavailable(buf) + n + if totb < buf.maxsize nb = unsafe_write(buf, p, n) + iolock_end() + return nb end + bytesavailable(buf) == 0 && break + # perform flush(s) + arr = take!(buf) + uv_write(s, arr) end - return nb + # perform the output to the kernel + return uv_write(s, p, n) end function flush(s::LibuvStream) + iolock_begin() buf = s.sendbuf if buf !== nothing if bytesavailable(buf) > 0 - arr = take!(buf) # Array of UInt8s + arr = take!(buf) uv_write(s, arr) return end @@ -959,16 +1009,25 @@ function flush(s::LibuvStream) return end -buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s) +function buffer_writes(s::LibuvStream, bufsize) + sendbuf = PipeBuffer(bufsize) + iolock_begin() + s.sendbuf = sendbuf + iolock_end() + return s +end ## low-level calls to libuv ## function write(s::LibuvStream, b::UInt8) buf = s.sendbuf if buf !== nothing + iolock_begin() if bytesavailable(buf) + 1 < buf.maxsize + iolock_end() return write(buf, b) end + iolock_end() end return write(s, Ref{UInt8}(b)) end @@ -978,12 +1037,7 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) if d != C_NULL uv_req_set_data(req, C_NULL) # let the Task know we got the writecb t = unsafe_pointer_to_objref(d)::Task - if status < 0 - err = _UVError("write", status) - schedule(t, err, error=true) - else - schedule(t) - end + schedule(t, status) else # no owner for this req, safe to just free it Libc.free(req) @@ -1139,61 +1193,99 @@ end # BufferStream's are non-OS streams, backed by a regular IOBuffer mutable struct BufferStream <: LibuvStream buffer::IOBuffer - r_c::Condition - close_c::Condition + cond::Threads.Condition is_open::Bool buffer_writes::Bool - lock::ReentrantLock + lock::ReentrantLock # advisory lock - BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock()) + BufferStream() = new(PipeBuffer(), Threads.Condition(), true, false, ReentrantLock()) end isopen(s::BufferStream) = s.is_open + function close(s::BufferStream) - s.is_open = false - notify(s.r_c) - notify(s.close_c) - nothing + lock(s.cond) do + s.is_open = false + notify(s.cond) + nothing + end end uvfinalize(s::BufferStream) = nothing -read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8)) -unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) = (wait_readnb(s, Int(nb)); unsafe_read(s.buffer, a, nb)) +function read(s::BufferStream, ::Type{UInt8}) + nread = lock(s.cond) do + wait_readnb(s, 1) + read(s.buffer, UInt8) + end + return nread +end +function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) + lock(s.cond) do + wait_readnb(s, Int(nb)) + unsafe_read(s.buffer, a, nb) + nothing + end +end bytesavailable(s::BufferStream) = bytesavailable(s.buffer) isreadable(s::BufferStream) = s.buffer.readable iswritable(s::BufferStream) = s.buffer.writable function wait_readnb(s::BufferStream, nb::Int) - while isopen(s) && bytesavailable(s.buffer) < nb - wait(s.r_c) + lock(s.cond) do + while isopen(s) && bytesavailable(s.buffer) < nb + wait(s.cond) + end end end -show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",bytesavailable(s.buffer),", isopen:", s.is_open) +show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open) + +function readuntil(s::BufferStream, c::UInt8; keep::Bool=false) + bytes = lock(s.cond) do + while isopen(s) && !occursin(c, s.buffer) + wait(s.cond) + end + readuntil(s.buffer, c, keep=keep) + end + return bytes +end -function wait_readbyte(s::BufferStream, c::UInt8) - while isopen(s) && !occursin(c, s.buffer) - wait(s.r_c) +function wait_close(s::BufferStream) + lock(s.cond) do + while isopen(s) + wait(s.cond) + end end end -wait_close(s::BufferStream) = if isopen(s); wait(s.close_c); end start_reading(s::BufferStream) = Int32(0) stop_reading(s::BufferStream) = nothing write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b)) function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt) - rv = unsafe_write(s.buffer, p, nb) - !(s.buffer_writes) && notify(s.r_c) - return rv + nwrite = lock(s.cond) do + rv = unsafe_write(s.buffer, p, nb) + s.buffer_writes || notify(s.cond) + rv + end + return nwrite end function eof(s::BufferStream) - wait_readnb(s, 1) - return !isopen(s) && bytesavailable(s) <= 0 + bytesavailable(s) > 0 && return false + iseof = lock(s.cond) do + wait_readnb(s, 1) + return !isopen(s) && bytesavailable(s) <= 0 + end + return iseof end # If buffer_writes is called, it will delay notifying waiters till a flush is called. -buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes=true; s) -flush(s::BufferStream) = (notify(s.r_c); nothing) +buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s) +function flush(s::BufferStream) + lock(s.cond) do + notify(s.cond) + nothing + end +end diff --git a/base/sysinfo.jl b/base/sysinfo.jl index e9c71b0401277..1cbe3a994bcd8 100644 --- a/base/sysinfo.jl +++ b/base/sysinfo.jl @@ -203,7 +203,8 @@ end function cpu_info() UVcpus = Ref{Ptr{UV_cpu_info_t}}() count = Ref{Int32}() - Base.uv_error("uv_cpu_info",ccall(:uv_cpu_info, Int32, (Ptr{Ptr{UV_cpu_info_t}}, Ptr{Int32}), UVcpus, count)) + err = ccall(:uv_cpu_info, Int32, (Ptr{Ptr{UV_cpu_info_t}}, Ptr{Int32}), UVcpus, count) + Base.uv_error("uv_cpu_info", err) cpus = Vector{CPUinfo}(undef, count[]) for i = 1:length(cpus) cpus[i] = CPUinfo(unsafe_load(UVcpus[], i)) @@ -219,7 +220,8 @@ Gets the current system uptime in seconds. """ function uptime() uptime_ = Ref{Float64}() - Base.uv_error("uv_uptime",ccall(:uv_uptime, Int32, (Ptr{Float64},), uptime_)) + err = ccall(:uv_uptime, Int32, (Ptr{Float64},), uptime_) + Base.uv_error("uv_uptime", err) return uptime_[] end diff --git a/doc/src/devdocs/locks.md b/doc/src/devdocs/locks.md index 12dc8e4561c18..912fe663f325d 100644 --- a/doc/src/devdocs/locks.md +++ b/doc/src/devdocs/locks.md @@ -58,6 +58,17 @@ trying to acquire it: > > > > currently the lock is merged with the codegen lock, since they call each other recursively +The following lock synchronizes IO operation. Be aware that doing any I/O (for example, +printing warning messages or debug information) while holding any other lock listed above +may result in pernicious and hard-to-find deadlocks. BE VERY CAREFUL! + +> * iolock +> * Individual ThreadSynchronizers locks +> +> > this may continue to be held after releasing the iolock, or acquired without it, +> > but be very careful to never attempt to acquire the iolock while holding it + + The following is the root lock, meaning no other lock shall be held when trying to acquire it: > * toplevel @@ -95,37 +106,32 @@ Module serializer : toplevel lock JIT & type-inference : codegen lock -MethodInstance updates : codegen lock +MethodInstance/CodeInstance updates : Method->writelock, codegen lock -> * These fields are generally lazy initialized, using the test-and-test-and-set pattern. > * These are set at construction and immutable: -> > * specTypes > * sparam_vals > * def + > * These are set by `jl_type_infer` (while holding codegen lock): -> +> * cache > * rettype > * inferred -> * these can also be reset, see `jl_set_lambda_rettype` for that logic as it needs to keep `functionObjectsDecls` -> in sync + * valid ages + > * `inInference` flag: -> > * optimization to quickly avoid recurring into `jl_type_infer` while it is already running > * actual state (of setting `inferred`, then `fptr`) is protected by codegen lock -> * Function pointers (`jlcall_api` and `fptr`, `unspecialized_ducttape`): -> + +> * Function pointers: > * these transition once, from `NULL` to a value, while the codegen lock is held -> * Code-generator cache (the contents of `functionObjectsDecls`): > +> * Code-generator cache (the contents of `functionObjectsDecls`): > * these can transition multiple times, but only while the codegen lock is held > * it is valid to use old version of this, or block for new versions of this, so races are benign, > as long as the code is careful not to reference other data in the method instance (such as `rettype`) > and assume it is coordinated, unless also holding the codegen lock -> * `compile_traced` flag: > -> * unknown - LLVMContext : codegen lock Method : Method->writelock diff --git a/src/jl_uv.c b/src/jl_uv.c index fc4eb147cc755..ba86352488e51 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -71,6 +71,17 @@ void JL_UV_LOCK(void) } } +JL_DLLEXPORT void jl_iolock_begin(void) +{ + JL_UV_LOCK(); +} + +JL_DLLEXPORT void jl_iolock_end(void) +{ + JL_UV_UNLOCK(); +} + + void jl_uv_call_close_callback(jl_value_t *val) { jl_value_t *args[2]; @@ -94,11 +105,11 @@ static void jl_uv_closeHandle(uv_handle_t *handle) JL_STDERR = (JL_STREAM*)STDERR_FILENO; // also let the client app do its own cleanup if (handle->type != UV_FILE && handle->data) { - size_t last_age = jl_get_ptls_states()->world_age; - // TODO: data race on jl_world_counter across many files, to be fixed in a separate revision - jl_get_ptls_states()->world_age = jl_world_counter; + jl_ptls_t ptls = jl_get_ptls_states(); + size_t last_age = ptls->world_age; + ptls->world_age = jl_world_counter; jl_uv_call_close_callback((jl_value_t*)handle->data); - jl_get_ptls_states()->world_age = last_age; + ptls->world_age = last_age; } if (handle == (uv_handle_t*)&signal_async) return; @@ -268,7 +279,9 @@ JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle) // avoid double-closing the stream if (!uv_is_closing(handle)) { JL_UV_LOCK(); - uv_close(handle, &jl_uv_closeHandle); + if (!uv_is_closing(handle)) { + uv_close(handle, &jl_uv_closeHandle); + } JL_UV_UNLOCK(); } } @@ -626,28 +639,33 @@ JL_DLLEXPORT int jl_getpid(void) #endif } -//NOTE: These function expects port/host to be in network byte-order (Big Endian) -JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, uint32_t host, - unsigned int flags) -{ - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_port = port; - addr.sin_addr.s_addr = host; - addr.sin_family = AF_INET; - // TODO: do we need a lock here? - return uv_tcp_bind(handle, (struct sockaddr*)&addr, flags); +typedef union { + struct sockaddr in; + struct sockaddr_in v4; + struct sockaddr_in6 v6; +} uv_sockaddr_in; + +static void jl_sockaddr_fill(uv_sockaddr_in *addr, uint16_t port, void *host, int ipv6) +{ + memset(addr, 0, sizeof(*addr)); + if (ipv6) { + addr->v6.sin6_family = AF_INET6; + memcpy(&addr->v6.sin6_addr, host, 16); + addr->v6.sin6_port = port; + } + else { + addr->v4.sin_family = AF_INET; + addr->v4.sin_addr.s_addr = *(uint32_t*)host; + addr->v4.sin_port = port; + } } -JL_DLLEXPORT int jl_tcp_bind6(uv_tcp_t *handle, uint16_t port, void *host, - unsigned int flags) +//NOTE: These function expects port/host to be in network byte-order (Big Endian) +JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, void *host, + unsigned int flags, int ipv6) { - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_port = port; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_family = AF_INET6; - // TODO: do we need a lock here + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); return uv_tcp_bind(handle, (struct sockaddr*)&addr, flags); } @@ -701,63 +719,23 @@ JL_DLLEXPORT int jl_tcp_getpeername(uv_tcp_t *handle, uint16_t *port, return res; } -JL_DLLEXPORT int jl_udp_bind(uv_udp_t *handle, uint16_t port, uint32_t host, - uint32_t flags) -{ - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_port = port; - addr.sin_addr.s_addr = host; - addr.sin_family = AF_INET; - return uv_udp_bind(handle, (struct sockaddr*)&addr, flags); -} - -JL_DLLEXPORT int jl_udp_bind6(uv_udp_t *handle, uint16_t port, void *host, - uint32_t flags) +JL_DLLEXPORT int jl_udp_bind(uv_udp_t *handle, uint16_t port, void *host, + uint32_t flags, int ipv6) { - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_port = port; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_family = AF_INET6; + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); return uv_udp_bind(handle, (struct sockaddr*)&addr, flags); } -JL_DLLEXPORT int jl_udp_send(uv_udp_t *handle, uint16_t port, uint32_t host, - void *data, uint32_t size, uv_udp_send_cb cb) +JL_DLLEXPORT int jl_udp_send(uv_udp_send_t *req, uv_udp_t *handle, uint16_t port, void *host, + char *data, uint32_t size, uv_udp_send_cb cb, int ipv6) { - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_port = port; - addr.sin_addr.s_addr = host; - addr.sin_family = AF_INET; + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); uv_buf_t buf[1]; - buf[0].base = (char *) data; - buf[0].len = size; - uv_udp_send_t *req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - req->data = handle->data; - JL_UV_LOCK(); - int r = uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_udp_send6(uv_udp_t *handle, uint16_t port, void *host, - void *data, uint32_t size, uv_udp_send_cb cb) -{ - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_port = port; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_family = AF_INET6; - uv_buf_t buf[1]; - buf[0].base = (char *) data; + buf[0].base = data; buf[0].len = size; - uv_udp_send_t *req = (uv_udp_send_t *) malloc(sizeof(uv_udp_send_t)); - req->data = handle->data; - JL_UV_LOCK(); int r = uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); - JL_UV_UNLOCK(); return r; } @@ -769,7 +747,7 @@ JL_DLLEXPORT int jl_uv_sizeof_interface_address(void) JL_DLLEXPORT int jl_uv_interface_addresses(uv_interface_address_t **ifAddrStruct, int *count) { - return uv_interface_addresses(ifAddrStruct,count); + return uv_interface_addresses(ifAddrStruct, count); } JL_DLLEXPORT int jl_uv_interface_address_is_internal(uv_interface_address_t *addr) @@ -796,124 +774,73 @@ JL_DLLEXPORT int jl_getaddrinfo(uv_loop_t *loop, uv_getaddrinfo_t *req, } JL_DLLEXPORT int jl_getnameinfo(uv_loop_t *loop, uv_getnameinfo_t *req, - uint32_t host, uint16_t port, int flags, uv_getnameinfo_cb uvcb) + void *host, uint16_t port, int flags, uv_getnameinfo_cb uvcb, int ipv6) { - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = host; - addr.sin_port = port; - - req->data = NULL; + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); return uv_getnameinfo(loop, req, uvcb, (struct sockaddr*)&addr, flags); } -JL_DLLEXPORT int jl_getnameinfo6(uv_loop_t *loop, uv_getnameinfo_t *req, - void *host, uint16_t port, int flags, uv_getnameinfo_cb uvcb) -{ - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_port = port; - - req->data = NULL; - JL_UV_LOCK(); - int r = uv_getnameinfo(loop, req, uvcb, (struct sockaddr*)&addr, flags); - JL_UV_UNLOCK(); - return r; -} - - JL_DLLEXPORT struct sockaddr *jl_sockaddr_from_addrinfo(struct addrinfo *addrinfo) { return addrinfo->ai_addr; } + JL_DLLEXPORT struct addrinfo *jl_next_from_addrinfo(struct addrinfo *addrinfo) { return addrinfo->ai_next; } -JL_DLLEXPORT int jl_sockaddr_in_is_ip4(struct sockaddr_in *addr) -{ - return (addr->sin_family==AF_INET); -} - -JL_DLLEXPORT int jl_sockaddr_in_is_ip6(struct sockaddr_in *addr) +JL_DLLEXPORT int jl_sockaddr_is_ip4(struct sockaddr *addr) { - return (addr->sin_family==AF_INET6); + return (addr->sa_family == AF_INET); } -JL_DLLEXPORT int jl_sockaddr_is_ip4(struct sockaddr_storage *addr) +JL_DLLEXPORT int jl_sockaddr_is_ip6(struct sockaddr *addr) { - return (addr->ss_family==AF_INET); + return (addr->sa_family == AF_INET6); } -JL_DLLEXPORT int jl_sockaddr_is_ip6(struct sockaddr_storage *addr) -{ - return (addr->ss_family==AF_INET6); -} - -JL_DLLEXPORT unsigned int jl_sockaddr_host4(struct sockaddr_in *addr) +JL_DLLEXPORT uint32_t jl_sockaddr_host4(struct sockaddr_in *addr) { return addr->sin_addr.s_addr; } -JL_DLLEXPORT unsigned int jl_sockaddr_host6(struct sockaddr_in6 *addr, char *host) +JL_DLLEXPORT unsigned jl_sockaddr_host6(struct sockaddr_in6 *addr, char *host) { memcpy(host, &addr->sin6_addr, 16); return addr->sin6_scope_id; } -JL_DLLEXPORT void jl_sockaddr_set_port(struct sockaddr_storage *addr, - uint16_t port) +JL_DLLEXPORT uint16_t jl_sockaddr_port4(struct sockaddr_in *addr) { - if (addr->ss_family==AF_INET) - ((struct sockaddr_in*)addr)->sin_port = port; - else - ((struct sockaddr_in6*)addr)->sin6_port = port; + return addr->sin_port; } -JL_DLLEXPORT int jl_tcp4_connect(uv_tcp_t *handle,uint32_t host, uint16_t port, - uv_connect_cb cb) +JL_DLLEXPORT uint16_t jl_sockaddr_port6(struct sockaddr_in6 *addr) { - struct sockaddr_in addr; - uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - req->data = 0; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = host; - addr.sin_port = port; - JL_UV_LOCK(); - int r = uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); - JL_UV_UNLOCK(); - return r; + return addr->sin6_port; } -JL_DLLEXPORT int jl_tcp6_connect(uv_tcp_t *handle, void *host, uint16_t port, - uv_connect_cb cb) + +JL_DLLEXPORT void jl_sockaddr_set_port(uv_sockaddr_in *addr, uint16_t port) { - struct sockaddr_in6 addr; - uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - req->data = 0; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_family = AF_INET6; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_port = port; - JL_UV_LOCK(); - int r = uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); - JL_UV_UNLOCK(); - return r; + if (addr->in.sa_family == AF_INET) + addr->v4.sin_port = port; + else + addr->v6.sin6_port = port; } -JL_DLLEXPORT int jl_connect_raw(uv_tcp_t *handle,struct sockaddr_storage *addr, - uv_connect_cb cb) +JL_DLLEXPORT int jl_tcp_connect(uv_tcp_t *handle, void *host, uint16_t port, + uv_connect_cb cb, int ipv6) { + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - req->data = 0; - JL_UV_LOCK(); - int r = uv_tcp_connect(req,handle,(struct sockaddr*)addr,cb); - JL_UV_UNLOCK(); + req->data = NULL; + int r = uv_tcp_connect(req, handle, &addr.in, cb); + if (r) + free(req); return r; } @@ -1085,94 +1012,6 @@ JL_DLLEXPORT int jl_queue_work(work_cb_t work_func, void *work_args, void *work_ return 0; } -JL_DLLEXPORT void jl_uv_update_timer_start(uv_loop_t* loop, jl_value_t* jltimer, - uv_timer_t* uvtimer, uv_timer_cb cb, - uint64_t timeout, uint64_t repeat) -{ - JL_UV_LOCK(); - int err = uv_timer_init(loop, uvtimer); - if (err) - abort(); - - jl_uv_associate_julia_struct((uv_handle_t*)uvtimer, jltimer); - uv_update_time(loop); - err = uv_timer_start(uvtimer, cb, timeout, repeat); - if (err) - abort(); - JL_UV_UNLOCK(); -} - -JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop) -{ - JL_UV_LOCK(); - uv_stop(loop); - // TODO: use memory/compiler fence here instead of the lock - JL_UV_UNLOCK(); -} - -JL_DLLEXPORT int jl_uv_fs_scandir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, - uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_scandir(loop, req, path, flags, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_readlink(uv_loop_t* loop, uv_fs_t* req, const char* path, - uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_readlink(loop, req, path, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, - int mode, uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_open(loop, req, path, flags, mode, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_ftruncate(uv_loop_t* loop, uv_fs_t* req, uv_os_fd_t handle, - int64_t offset, uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_ftruncate(loop, req, handle, offset, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_futime(uv_loop_t* loop, uv_fs_t* req, uv_os_fd_t handle, double atime, - double mtime, uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_futime(loop, req, handle, atime, mtime, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) -{ - JL_UV_LOCK(); - int r = uv_read_start(handle, alloc_cb, read_cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_read_stop(uv_stream_t* handle) -{ - JL_UV_LOCK(); - int r = uv_read_stop(handle); - JL_UV_UNLOCK(); - return r; -} - - #ifndef _OS_WINDOWS_ #if defined(__APPLE__) int uv___stream_fd(uv_stream_t *handle); diff --git a/src/julia.h b/src/julia.h index cb4a0dd3e877a..ff777c9905056 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1783,11 +1783,6 @@ JL_DLLEXPORT uv_loop_t *jl_global_event_loop(void); JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle); -JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, uint32_t host, - unsigned int flags); - -JL_DLLEXPORT int jl_sizeof_ios_t(void); - JL_DLLEXPORT jl_array_t *jl_take_buffer(ios_t *s); typedef struct { diff --git a/src/julia_internal.h b/src/julia_internal.h index 839fbe4aac8c4..094ed264da511 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -868,8 +868,6 @@ int jl_array_store_unboxed(jl_value_t *el_type); JL_DLLEXPORT jl_value_t *(jl_array_data_owner)(jl_array_t *a); JL_DLLEXPORT int jl_array_isassigned(jl_array_t *a, size_t i); -JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop); - JL_DLLEXPORT uintptr_t jl_object_id_(jl_value_t *tv, jl_value_t *v) JL_NOTSAFEPOINT; // -- synchronization utilities -- // diff --git a/src/sys.c b/src/sys.c index 9632765801fe2..ded24078bc05c 100644 --- a/src/sys.c +++ b/src/sys.c @@ -118,10 +118,8 @@ JL_DLLEXPORT int32_t jl_nb_available(ios_t *s) // --- dir/file stuff --- JL_DLLEXPORT int jl_sizeof_uv_fs_t(void) { return sizeof(uv_fs_t); } -JL_DLLEXPORT void jl_uv_fs_req_cleanup(uv_fs_t *req) { uv_fs_req_cleanup(req); } JL_DLLEXPORT char *jl_uv_fs_t_ptr(uv_fs_t *req) { return (char*)req->ptr; } JL_DLLEXPORT char *jl_uv_fs_t_path(uv_fs_t *req) { return (char*)req->path; } -JL_DLLEXPORT ssize_t jl_uv_fs_result(uv_fs_t *f) { return f->result; } // --- stat --- JL_DLLEXPORT int jl_sizeof_stat(void) { return sizeof(uv_stat_t); } diff --git a/stdlib/Distributed/src/Distributed.jl b/stdlib/Distributed/src/Distributed.jl index dcdc75d253327..c1d6cc5d5ffff 100644 --- a/stdlib/Distributed/src/Distributed.jl +++ b/stdlib/Distributed/src/Distributed.jl @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, hash, ==, kill, close, isopen, showerror # imports for use -using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected, +using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, VERSION_STRING, binding_module, atexit, julia_exename, julia_cmd, AsyncGenerator, acquire, release, invokelatest, shell_escape_posixly, uv_error, something, notnothing, isbuffered @@ -18,7 +18,7 @@ using Base.Threads: Event using Serialization, Sockets import Serialization: serialize, deserialize -import Sockets: connect +import Sockets: connect, wait_connected # NOTE: clusterserialize.jl imports additional symbols from Serialization for use diff --git a/stdlib/Distributed/src/managers.jl b/stdlib/Distributed/src/managers.jl index c3a877805b321..62141b1fd1474 100644 --- a/stdlib/Distributed/src/managers.jl +++ b/stdlib/Distributed/src/managers.jl @@ -484,10 +484,15 @@ function socket_reuse_port() end end -function bind_client_port(s) - err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, Cuint), - s.handle, hton(client_port[]), hton(UInt32(0)), 0) - uv_error("bind() failed", err) +# TODO: this doesn't belong here, it belongs in Sockets +function bind_client_port(s::TCPSocket) + Sockets.iolock_begin() + @assert s.status == Sockets.StatusInit + host_in = Ref(hton(UInt32(0))) # IPv4 0.0.0.0 + err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint), + s, hton(client_port[]), host_in, 0, false) + Sockets.iolock_end() + uv_error("tcp_bind", err) _addr, port = getsockname(s) client_port[] = port diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 5df7f4ceb6b34..9c241e3c7bf21 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -18,9 +18,10 @@ export PollingFileWatcher, FDWatcher -import Base: @handle_as, wait, close, eventloop, notify_error, stream_wait, IOError, +import Base: @handle_as, wait, close, eventloop, notify_error, IOError, _sizeof_uv_poll, _sizeof_uv_fs_poll, _sizeof_uv_fs_event, _uv_hook_close, uv_error, _UVError, - associate_julia_struct, disassociate_julia_struct, isreadable, iswritable, | + iolock_begin, iolock_end, associate_julia_struct, disassociate_julia_struct, + preserve_handle, unpreserve_handle, isreadable, iswritable, | import Base.Filesystem.StatStruct if Sys.iswindows() import Base.WindowsRawSocket @@ -80,11 +81,13 @@ mutable struct FileMonitor handle = Libc.malloc(_sizeof_uv_fs_event) this = new(handle, file, Base.ThreadSynchronizer(), 0, false) associate_julia_struct(handle, this) + iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) throw(_UVError("FileMonitor", err)) end + iolock_end() finalizer(uvfinalize, this) return this end @@ -100,6 +103,7 @@ mutable struct FolderMonitor handle = Libc.malloc(_sizeof_uv_fs_event) this = new(handle, Channel(Inf), false) associate_julia_struct(handle, this) + iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) @@ -110,6 +114,7 @@ mutable struct FolderMonitor uv_error("FolderMonitor (start)", ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), handle, uv_jl_fseventscb_folder::Ptr{Cvoid}, folder, 0)) + iolock_end() return this end end @@ -127,12 +132,14 @@ mutable struct PollingFileWatcher handle = Libc.malloc(_sizeof_uv_fs_poll) this = new(handle, file, round(UInt32, interval * 1000), Base.ThreadSynchronizer(), false, 0, StatStruct()) associate_julia_struct(handle, this) + iolock_begin() err = ccall(:uv_fs_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) throw(_UVError("PollingFileWatcher", err)) end finalizer(uvfinalize, this) + iolock_end() return this end end @@ -153,6 +160,7 @@ mutable struct _FDWatcher throw(ArgumentError("must specify at least one of readable or writable to create a FDWatcher")) end fdnum = Core.Intrinsics.bitcast(Int32, fd) + 1 + iolock_begin() if fdnum > length(FDWatchers) old_len = length(FDWatchers) resize!(FDWatchers, fdnum) @@ -160,6 +168,7 @@ mutable struct _FDWatcher elseif FDWatchers[fdnum] !== nothing this = FDWatchers[fdnum]::_FDWatcher this.refcount = (this.refcount[1] + Int(readable), this.refcount[2] + Int(writable)) + iolock_end() return this end if ccall(:jl_uv_unix_fd_is_watched, Int32, (RawFD, Ptr{Cvoid}, Ptr{Cvoid}), fd, C_NULL, eventloop()) == 1 @@ -182,11 +191,13 @@ mutable struct _FDWatcher end finalizer(uvfinalize, this) FDWatchers[fdnum] = this + iolock_end() return this end end function uvfinalize(t::_FDWatcher) + iolock_begin() lock(t.notify) try if t.handle != C_NULL @@ -205,6 +216,7 @@ mutable struct _FDWatcher finally unlock(t.notify) end + iolock_end() nothing end end @@ -228,8 +240,10 @@ mutable struct _FDWatcher 0, (false, false)) associate_julia_struct(handle, this) - err = ccall(:uv_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, WindowsRawSocket), - eventloop(), handle, fd) + iolock_begin() + err = ccall(:uv_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, WindowsRawSocket), + eventloop(), handle, fd) + iolock_end() if err != 0 Libc.free(handle) throw(_UVError("FDWatcher", err)) @@ -261,12 +275,15 @@ end function close(t::_FDWatcher, readable::Bool, writable::Bool) + iolock_begin() if t.refcount != (0, 0) t.refcount = (t.refcount[1] - Int(readable), t.refcount[2] - Int(writable)) end if t.refcount == (0, 0) uvfinalize(t) end + iolock_end() + nothing end function close(t::FDWatcher) @@ -403,6 +420,7 @@ function __init__() end function start_watching(t::_FDWatcher) + iolock_begin() t.handle == C_NULL && return throw(ArgumentError("FDWatcher is closed")) readable = t.refcount[1] > 0 writable = t.refcount[2] > 0 @@ -415,10 +433,12 @@ function start_watching(t::_FDWatcher) uv_jl_pollcb::Ptr{Cvoid})) t.active = (readable, writable) end + iolock_end() nothing end function start_watching(t::PollingFileWatcher) + iolock_begin() t.handle == C_NULL && return throw(ArgumentError("PollingFileWatcher is closed")) if !t.active uv_error("PollingFileWatcher (start)", @@ -426,10 +446,12 @@ function start_watching(t::PollingFileWatcher) t.handle, uv_jl_fspollcb::Ptr{Cvoid}, t.file, t.interval)) t.active = true end + iolock_end() nothing end function stop_watching(t::PollingFileWatcher) + iolock_begin() lock(t.notify) try if t.active && isempty(t.notify) @@ -440,10 +462,12 @@ function stop_watching(t::PollingFileWatcher) finally unlock(t.notify) end + iolock_end() nothing end function start_watching(t::FileMonitor) + iolock_begin() t.handle == C_NULL && return throw(ArgumentError("FileMonitor is closed")) if !t.active uv_error("FileMonitor (start)", @@ -451,10 +475,12 @@ function start_watching(t::FileMonitor) t.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, t.file, 0)) t.active = true end + iolock_end() nothing end function stop_watching(t::FileMonitor) + iolock_begin() lock(t.notify) try if t.active && isempty(t.notify) @@ -465,6 +491,7 @@ function stop_watching(t::FileMonitor) finally unlock(t.notify) end + iolock_end() nothing end @@ -473,12 +500,16 @@ function wait(fdw::FDWatcher) return wait(fdw.watcher, readable = fdw.readable, writable = fdw.writable) end end + function wait(fdw::_FDWatcher; readable=true, writable=true) events = FDEvent(Int32(0)) - while true - if isa(events, FDEvent) - events |= FDEvent(fdw.events) + iolock_begin() + preserve_handle(fdw) + lock(fdw.notify) + try + while true haveevent = false + events |= FDEvent(fdw.events) if readable && isreadable(events) fdw.events &= ~UV_READABLE haveevent = true @@ -488,23 +519,25 @@ function wait(fdw::_FDWatcher; readable=true, writable=true) haveevent = true end if haveevent - return events + break end - else - throw(events) - end - if fdw.refcount == (0, 0) # !open - events = EOFError() - else - lock(fdw.notify) - try + if fdw.refcount == (0, 0) # !open + throw(EOFError()) + else start_watching(fdw) # make sure the poll is active - events = stream_wait(fdw, fdw.notify)::FDEvent - finally + iolock_end() + events = wait(fdw.notify)::FDEvent unlock(fdw.notify) + iolock_begin() + lock(fdw.notify) end end + finally + unlock(fdw.notify) + unpreserve_handle(fdw) end + iolock_end() + return events end function wait(fd::RawFD; readable=false, writable=false) @@ -528,15 +561,23 @@ if Sys.iswindows() end function wait(pfw::PollingFileWatcher) + iolock_begin() + preserve_handle(pfw) lock(pfw.notify) local prevstat try start_watching(pfw) - prevstat = stream_wait(pfw, pfw.notify)::StatStruct + iolock_end() + prevstat = wait(pfw.notify)::StatStruct + unlock(pfw.notify) + iolock_begin() + lock(pfw.notify) finally unlock(pfw.notify) + unpreserve_handle(pfw) end stop_watching(pfw) + iolock_end() if pfw.handle == C_NULL return prevstat, EOFError() elseif pfw.curr_error != 0 @@ -547,17 +588,25 @@ function wait(pfw::PollingFileWatcher) end function wait(m::FileMonitor) + iolock_begin() + preserve_handle(m) lock(m.notify) local events try start_watching(m) - events = stream_wait(m, m.notify)::FileEvent + iolock_end() + events = wait(m.notify)::FileEvent events |= FileEvent(m.events) m.events = 0 + unlock(m.notify) + iolock_begin() + lock(m.notify) finally unlock(m.notify) + unpreserve_handle(m) end stop_watching(m) + iolock_end() return events end @@ -566,11 +615,11 @@ function wait(m::FolderMonitor) if isready(m.notify) evt = take!(m.notify) # non-blocking fast-path else - Base.preserve_handle(m) + preserve_handle(m) evt = try take!(m.notify) catch ex - Base.unpreserve_handle(m) + unpreserve_handle(m) if ex isa InvalidStateException && ex.state == :closed rethrow(EOFError()) # `wait(::Channel)` throws the wrong exception end diff --git a/stdlib/Sockets/src/PipeServer.jl b/stdlib/Sockets/src/PipeServer.jl index 05958c1f18402..f7c5c4cca3cd1 100644 --- a/stdlib/Sockets/src/PipeServer.jl +++ b/stdlib/Sockets/src/PipeServer.jl @@ -4,13 +4,10 @@ mutable struct PipeServer <: LibuvServer handle::Ptr{Cvoid} status::Int cond::Base.ThreadSynchronizer - closenotify::Base.ThreadSynchronizer function PipeServer(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() p = new(handle, status, - Base.ThreadSynchronizer(lock), - Base.ThreadSynchronizer(lock)) + Base.ThreadSynchronizer()) associate_julia_struct(p.handle, p) finalizer(uvfinalize, p) return p @@ -19,9 +16,11 @@ end function PipeServer() pipe = PipeServer(Libc.malloc(Base._sizeof_uv_named_pipe), StatusUninit) + iolock_begin() err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0) uv_error("failed to create pipe server", err) pipe.status = StatusInit + iolock_end() return pipe end @@ -30,6 +29,7 @@ end accept(server::PipeServer) = accept(server, PipeEndpoint()) function accept_nonblock(server::PipeServer, client::PipeEndpoint) + iolock_begin() if client.status != StatusInit error("client is already in use or has been closed") end @@ -37,6 +37,7 @@ function accept_nonblock(server::PipeServer, client::PipeEndpoint) if err == 0 client.status = StatusOpen end + iolock_end() return err end @@ -47,18 +48,21 @@ function accept_nonblock(server::PipeServer) end function bind(server::PipeServer, name::AbstractString) + iolock_begin() @assert server.status == StatusInit err = ccall(:uv_pipe_bind, Int32, (Ptr{Cvoid}, Cstring), server, name) if err != 0 + iolock_end() if err != UV_EADDRINUSE && err != UV_EACCES #TODO: this codepath is currently not tested - throw(_UVError("bind",err)) + throw(_UVError("bind", err)) else return false end end server.status = StatusOpen + iolock_end() return true end @@ -74,18 +78,17 @@ function listen(path::AbstractString) end function connect!(sock::PipeEndpoint, path::AbstractString) + iolock_begin() @assert sock.status == StatusInit req = Libc.malloc(Base._sizeof_uv_connect) uv_req_set_data(req, C_NULL) ccall(:uv_pipe_connect, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), req, sock.handle, path, uv_jl_connectcb::Ptr{Cvoid}) sock.status = StatusConnecting + iolock_end() return sock end -# Libuv will internally reset read/writability, which is uses to -# mark that this is an invalid pipe. - """ connect(path::AbstractString) -> PipeEndpoint diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 0c6e867b20639..7a1726f6c385b 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -31,9 +31,10 @@ export import Base: isless, show, print, parse, bind, convert, isreadable, iswritable, alloc_buf_hook, _uv_hook_close using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize, - notify_error, stream_wait, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError, + notify_error, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError, eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive, - uv_status_string, check_open, wait_connected, OS_HANDLE, RawFD, + preserve_handle, unpreserve_handle, iolock_begin, iolock_end, + uv_status_string, check_open, OS_HANDLE, RawFD, UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL, UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS, UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL, @@ -56,19 +57,18 @@ mutable struct TCPSocket <: LibuvStream status::Int buffer::IOBuffer cond::Base.ThreadSynchronizer - closenotify::Base.ThreadSynchronizer + readerror::Any sendbuf::Union{IOBuffer, Nothing} - lock::ReentrantLock + lock::ReentrantLock # advisory lock throttle::Int function TCPSocket(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() tcp = new( handle, status, PipeBuffer(), - Base.ThreadSynchronizer(lock), - Base.ThreadSynchronizer(lock), + Base.ThreadSynchronizer(), + nothing, nothing, ReentrantLock(), Base.DEFAULT_READ_BUFFER_SZ) @@ -82,18 +82,22 @@ end function TCPSocket(; delay=true) tcp = TCPSocket(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit) af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2 + iolock_begin() err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint), eventloop(), tcp.handle, af_spec) uv_error("failed to create tcp socket", err) tcp.status = StatusInit + iolock_end() return tcp end function TCPSocket(fd::OS_HANDLE) tcp = TCPSocket() + iolock_begin() err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) uv_error("tcp_open", err) tcp.status = StatusOpen + iolock_end() return tcp end if OS_HANDLE != RawFD @@ -105,15 +109,12 @@ mutable struct TCPServer <: LibuvServer handle::Ptr{Cvoid} status::Int cond::Base.ThreadSynchronizer - closenotify::Base.ThreadSynchronizer function TCPServer(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() tcp = new( handle, status, - Base.ThreadSynchronizer(lock), - Base.ThreadSynchronizer(lock)) + Base.ThreadSynchronizer()) associate_julia_struct(tcp.handle, tcp) finalizer(uvfinalize, tcp) return tcp @@ -126,10 +127,12 @@ end function TCPServer(; delay=true) tcp = TCPServer(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit) af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2 + iolock_begin() err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint), eventloop(), tcp.handle, af_spec) uv_error("failed to create tcp server", err) tcp.status = StatusInit + iolock_end() return tcp end @@ -137,7 +140,7 @@ isreadable(io::TCPSocket) = isopen(io) || bytesavailable(io) > 0 iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing """ - accept(server[,client]) + accept(server[, client]) Accepts a connection on the given server and returns a connection to the client. An uninitialized client stream may be provided, in which case it will be used instead of @@ -145,6 +148,23 @@ creating a new stream. """ accept(server::TCPServer) = accept(server, TCPSocket()) +function accept(callback, server::LibuvServer) + task = @async try + while true + client = accept(server) + callback(client) + end + catch ex + # accept below may explicitly throw UV_ECONNABORTED: + # filter that out since we expect that error + if !(ex isa IOError && ex.code == UV_ECONNABORTED) || isopen(server) + rethrow() + end + end + return task # caller is responsible for checking for errors +end + + # UDP """ UDPSocket() @@ -155,17 +175,12 @@ fields to denote the state of the socket. mutable struct UDPSocket <: LibuvStream handle::Ptr{Cvoid} status::Int - recvnotify::Condition - sendnotify::Condition - closenotify::Base.ThreadSynchronizer + recvnotify::Base.ThreadSynchronizer + cond::Base.ThreadSynchronizer function UDPSocket(handle::Ptr{Cvoid}, status) - udp = new( - handle, - status, - Condition(), - Condition(), - Base.ThreadSynchronizer()) + cond = Base.ThreadSynchronizer() + udp = new(handle, status, Base.ThreadSynchronizer(cond.lock), cond) associate_julia_struct(udp.handle, udp) finalizer(uvfinalize, udp) return udp @@ -173,26 +188,28 @@ mutable struct UDPSocket <: LibuvStream end function UDPSocket() this = UDPSocket(Libc.malloc(Base._sizeof_uv_udp), StatusUninit) + iolock_begin() err = ccall(:uv_udp_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this.handle) uv_error("failed to create udp socket", err) this.status = StatusInit + iolock_end() return this end show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")") function _uv_hook_close(sock::UDPSocket) - lock(sock.closenotify) + sock.handle = C_NULL + lock(sock.cond) try - sock.handle = C_NULL sock.status = StatusClosed - notify(sock.closenotify) + notify(sock.cond) + notify_error(sock.recvnotify, EOFError()) finally - unlock(sock.closenotify) + unlock(sock.cond) end - notify(sock.sendnotify) - notify_error(sock.recvnotify,EOFError()) + nothing end # Disables dual stack mode. @@ -214,17 +231,17 @@ const UV_UDP_REUSEADDR = 4 ## -_bind(sock::TCPServer, host::IPv4, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, Cuint), - sock.handle, hton(port), hton(host.host), flags) - -_bind(sock::TCPServer, host::IPv6, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_tcp_bind6, Int32, (Ptr{Cvoid}, UInt16, Ptr{UInt128}, Cuint), - sock.handle, hton(port), Ref(hton(host.host)), flags) - -_bind(sock::UDPSocket, host::IPv4, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, UInt32), - sock.handle, hton(port), hton(host.host), flags) +function _bind(sock::TCPServer, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0)) + host_in = Ref(hton(host.host)) + return ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint), + sock, hton(port), host_in, flags, host isa IPv6) +end -_bind(sock::UDPSocket, host::IPv6, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_udp_bind6, Int32, (Ptr{Cvoid}, UInt16, Ptr{UInt128}, UInt32), - sock.handle, hton(port), Ref(hton(host.host)), flags) +function _bind(sock::UDPSocket, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0)) + host_in = Ref(hton(host.host)) + return ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint), + sock, hton(port), host_in, flags, host isa IPv6) +end """ bind(socket::Union{UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only=false, reuseaddr=false, kws...) @@ -240,14 +257,16 @@ function bind(sock::Union{TCPServer, UDPSocket}, host::IPAddr, port::Integer; ip error("$(typeof(sock)) is not in initialization state") end flags = 0 - if isa(host,IPv6) && ipv6only + if isa(host, IPv6) && ipv6only flags |= isa(sock, UDPSocket) ? UV_UDP_IPV6ONLY : UV_TCP_IPV6ONLY end if isa(sock, UDPSocket) && reuseaddr flags |= UV_UDP_REUSEADDR end + iolock_begin() err = _bind(sock, host, UInt16(port), UInt32(flags)) if err < 0 + iolock_end() if err != UV_EADDRINUSE && err != UV_EACCES && err != UV_EADDRNOTAVAIL #TODO: this codepath is not currently tested throw(_UVError("bind", err)) @@ -257,13 +276,14 @@ function bind(sock::Union{TCPServer, UDPSocket}, host::IPAddr, port::Integer; ip end sock.status = StatusOpen isa(sock, UDPSocket) && setopt(sock; kws...) + iolock_end() return true end bind(sock::TCPServer, addr::InetAddr) = bind(sock, addr.host, addr.port) """ - setopt(sock::UDPSocket; multicast_loop = nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) + setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) Set UDP socket options. @@ -273,22 +293,25 @@ Set UDP socket options. messages, or else the UDP system will return an access error (default: `false`). * `ttl`: Time-to-live of packets sent on the socket (default: `nothing`). """ -function setopt(sock::UDPSocket; multicast_loop = nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) +function setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) + iolock_begin() if sock.status == StatusUninit error("Cannot set options on uninitialized socket") end if multicast_loop !== nothing - uv_error("multicast_loop",ccall(:uv_udp_set_multicast_loop,Cint,(Ptr{Cvoid},Cint),sock.handle,multicast_loop) < 0) + uv_error("multicast_loop", ccall(:uv_udp_set_multicast_loop, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_loop) < 0) end if multicast_ttl !== nothing - uv_error("multicast_ttl",ccall(:uv_udp_set_multicast_ttl,Cint,(Ptr{Cvoid},Cint),sock.handle,multicast_ttl)) + uv_error("multicast_ttl", ccall(:uv_udp_set_multicast_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_ttl)) end if enable_broadcast !== nothing - uv_error("enable_broadcast",ccall(:uv_udp_set_broadcast,Cint,(Ptr{Cvoid},Cint),sock.handle,enable_broadcast)) + uv_error("enable_broadcast", ccall(:uv_udp_set_broadcast, Cint, (Ptr{Cvoid}, Cint), sock.handle, enable_broadcast)) end if ttl !== nothing - uv_error("ttl",ccall(:uv_udp_set_ttl,Cint,(Ptr{Cvoid},Cint),sock.handle,ttl)) + uv_error("ttl", ccall(:uv_udp_set_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, ttl)) end + iolock_end() + nothing end """ @@ -308,82 +331,123 @@ Read a UDP packet from the specified socket, returning a tuple of `(address, dat `address` will be either IPv4 or IPv6 as appropriate. """ function recvfrom(sock::UDPSocket) + iolock_begin() # If the socket has not been bound, it will be bound implicitly to ::0 and a random port if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive error("UDPSocket is not initialized and open") end if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0 - uv_error("recv_start", ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), - sock.handle, Base.uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_recvcb::Ptr{Cvoid})) + err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), + sock, Base.uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_recvcb::Ptr{Cvoid}) + uv_error("recv_start", err) end sock.status = StatusActive - return stream_wait(sock, sock.recvnotify)::Tuple{Union{IPv4, IPv6}, Vector{UInt8}} + lock(sock.recvnotify) + iolock_end() + try + From = Union{InetAddr{IPv4}, InetAddr{IPv6}} + Data = Vector{UInt8} + from, data = wait(sock.recvnotify)::Tuple{From, Data} + return (from.host, data) + finally + unlock(sock.recvnotify) + end end -alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), size) +alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), size) # size is always 64k from libuv function uv_recvcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint) - # C signature documented as (*uv_udp_recv_cb)(...) sock = @handle_as handle UDPSocket - if nread < 0 - Libc.free(buf_addr) - notify_error(sock.recvnotify, _UVError("recv", nread)) - elseif flags & UV_UDP_PARTIAL > 0 - Libc.free(buf_addr) - notify_error(sock.recvnotify, "Partial message received") - else - buf_addr = ccall(:jl_uv_buf_base, Ptr{Cvoid}, (Ptr{Cvoid},), buf) - buf_size = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf) - # need to check the address type in order to convert to a Julia IPAddr - addrout = if addr == C_NULL - IPv4(0) - elseif ccall(:jl_sockaddr_in_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1 - IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr))) - else - tmp = [UInt128(0)] - ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt8}), addr, pointer(tmp)) - IPv6(ntoh(tmp[1])) - end - buf = unsafe_wrap(Array, convert(Ptr{UInt8}, buf_addr), Int(nread), own = true) - notify(sock.recvnotify, (addrout, buf)) - end - ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock.handle) - sock.status = StatusOpen + lock(sock.recvnotify) + try + buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf) + if nread == 0 && addr == C_NULL + Libc.free(buf_addr) + elseif nread < 0 + Libc.free(buf_addr) + notify_error(sock.recvnotify, _UVError("recv", nread)) + elseif flags & UV_UDP_PARTIAL > 0 + Libc.free(buf_addr) + notify_error(sock.recvnotify, "Partial message received") + else + buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)) + if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k) + buf = unsafe_wrap(Array, buf_addr, nread, own=true) + else + buf = Vector{UInt8}(undef, nread) + GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, nread) + Libc.free(buf_addr) + end + # need to check the address type in order to convert to a Julia IPAddr + host = IPv4(0) + port = UInt16(0) + if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1 + host = IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr))) + port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr)) + elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1 + tmp = Ref{UInt128}(0) + scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp) + host = IPv6(ntoh(tmp[])) + port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr)) + end + from = InetAddr(host, port) + notify(sock.recvnotify, (from, buf), all=false) + end + if sock.status == StatusActive && isempty(sock.recvnotify) + sock.status = StatusOpen + ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock) + end + finally + unlock(sock.recvnotify) + end nothing end -function _send(sock::UDPSocket, ipaddr::IPv4, port::UInt16, buf) - ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, UInt16, UInt32, Ptr{UInt8}, Csize_t, Ptr{Cvoid}), - sock.handle, hton(port), hton(ipaddr.host), buf, sizeof(buf), uv_jl_sendcb::Ptr{Cvoid}) -end - -function _send(sock::UDPSocket, ipaddr::IPv6, port::UInt16, buf) - ccall(:jl_udp_send6, Cint, (Ptr{Cvoid}, UInt16, Ref{UInt128}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}), - sock.handle, hton(port), hton(ipaddr.host), buf, sizeof(buf), uv_jl_sendcb::Ptr{Cvoid}) +function _send_async(sock::UDPSocket, ipaddr::Union{IPv4, IPv6}, port::UInt16, buf) + req = Libc.malloc(Base._sizeof_uv_udp_send) + uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call + host_in = Ref(hton(ipaddr.host)) + err = ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}, Cint), + req, sock, hton(port), host_in, buf, sizeof(buf), Base.uv_jl_writecb_task::Ptr{Cvoid}, ipaddr isa IPv6) + if err < 0 + Libc.free(req) + uv_error("send", err) + end + return req end """ - send(socket::UDPSocket, host, port::Integer, msg) + send(socket::UDPSocket, host::IPAddr, port::Integer, msg) Send `msg` over `socket` to `host:port`. """ -function send(sock::UDPSocket,ipaddr,port,msg) +function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg) # If the socket has not been bound, it will be bound implicitly to ::0 and a random port + iolock_begin() if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive error("UDPSocket is not initialized and open") end - uv_error("send", _send(sock, ipaddr, UInt16(port), msg)) - stream_wait(sock, sock.sendnotify) - nothing -end - -function uv_sendcb(handle::Ptr{Cvoid}, status::Cint) - sock = @handle_as handle UDPSocket - if status < 0 - notify_error(sock.sendnotify, _UVError("UDP send failed", status)) + uvw = _send_async(sock, ipaddr, UInt16(port), msg) + ct = current_task() + preserve_handle(ct) + uv_req_set_data(uvw, ct) + iolock_end() + status = try + wait()::Cint + finally + iolock_begin() + if uv_req_data(uvw) != C_NULL + # uvw is still alive, + # so make sure we won't get spurious notifications later + uv_req_set_data(uvw, C_NULL) + else + # done with uvw + Libc.free(uvw) + end + iolock_end() + unpreserve_handle(ct) end - notify(sock.sendnotify) - Libc.free(handle) + uv_error("send", status) nothing end @@ -394,16 +458,18 @@ function uv_connectcb(conn::Ptr{Cvoid}, status::Cint) sock = @handle_as hand LibuvStream lock(sock.cond) try - if status >= 0 + if status >= 0 # success if !(sock.status == StatusClosed || sock.status == StatusClosing) sock.status = StatusOpen end - notify(sock.cond) else - ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand) - err = _UVError("connect", status) - notify_error(sock.cond, err) + sock.readerror = _UVError("connect", status) # TODO: perhaps we should not reuse readerror for this + if !(sock.status == StatusClosed || sock.status == StatusClosing) + ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand) + sock.status = StatusClosing + end end + notify(sock.cond) finally unlock(sock.cond) end @@ -411,34 +477,47 @@ function uv_connectcb(conn::Ptr{Cvoid}, status::Cint) nothing end -function connect!(sock::TCPSocket, host::IPv4, port::Integer) +function connect!(sock::TCPSocket, host::Union{IPv4, IPv6}, port::Integer) + iolock_begin() if sock.status != StatusInit error("TCPSocket is not in initialization state") end if !(0 <= port <= typemax(UInt16)) throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port")) end - uv_error("connect", ccall(:jl_tcp4_connect, Int32, (Ptr{Cvoid}, UInt32, UInt16, Ptr{Cvoid}), - sock.handle, hton(host.host), hton(UInt16(port)), uv_jl_connectcb::Ptr{Cvoid})) + host_in = Ref(hton(host.host)) + uv_error("connect", ccall(:jl_tcp_connect, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cint), + sock, host_in, hton(UInt16(port)), uv_jl_connectcb::Ptr{Cvoid}, host isa IPv6)) sock.status = StatusConnecting + iolock_end() nothing end -function connect!(sock::TCPSocket, host::IPv6, port::Integer) - if sock.status != StatusInit - error("TCPSocket is not in initialization state") - end - if !(0 <= port <= typemax(UInt16)) - throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port")) +connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port) + +function wait_connected(x::LibuvStream) + iolock_begin() + check_open(x) + isopen(x) || x.readerror === nothing || throw(x.readerror) + preserve_handle(x) + lock(x.cond) + try + while x.status == StatusConnecting + iolock_end() + wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) + end + isopen(x) || x.readerror === nothing || throw(x.readerror) + finally + unlock(x.cond) + unpreserve_handle(x) end - uv_error("connect", ccall(:jl_tcp6_connect, Int32, (Ptr{Cvoid}, Ref{UInt128}, UInt16, Ptr{Cvoid}), - sock.handle, hton(host.host), hton(UInt16(port)), uv_jl_connectcb::Ptr{Cvoid})) - sock.status = StatusConnecting + iolock_end() nothing end -connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port) - # Default Host to localhost """ @@ -459,9 +538,7 @@ function connect!(sock::TCPSocket, host::AbstractString, port::Integer) error("TCPSocket is not in initialization state") end ipaddr = getaddrinfo(host) - sock.status = StatusInit - connect!(sock,ipaddr,port) - sock.status = StatusConnecting + connect!(sock, ipaddr, port) return sock end @@ -478,7 +555,12 @@ Enables or disables Nagle's algorithm on a given TCP server or socket. """ function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool) # disable or enable Nagle's algorithm on all OSes - ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) + Sockets.iolock_begin() + Sockets.check_open(sock) + err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) + # TODO: check err + Sockets.iolock_end() + return err end """ @@ -487,12 +569,16 @@ end On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`. """ function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool) + Sockets.iolock_begin() + Sockets.check_open(sock) @static if Sys.islinux() # tcp_quickack is a linux only option if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0 @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1 end end + Sockets.iolock_end() + nothing end @@ -512,36 +598,13 @@ reject them. The default value of `backlog` is 511. """ function listen(addr; backlog::Integer=BACKLOG_DEFAULT) sock = TCPServer() - !bind(sock, addr) && error("cannot bind to port; may already be in use or access denied") + bind(sock, addr) || error("cannot bind to port; may already be in use or access denied") listen(sock; backlog=backlog) return sock end listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(localhost, port; backlog=backlog) listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host, port); backlog=backlog) -function listen(callback, server::Union{TCPSocket, UDPSocket}) - @async begin - local client = TCPSocket() - lock(server.cond) - try - while isopen(server) - err = accept_nonblock(server, client) - if err == 0 - callback(client) - client = TCPSocket() - elseif err != UV_EAGAIN - uv_error("accept", err) - else - stream_wait(server, server.cond) - end - end - finally - unlock(server.cond) - end - end - return sock -end - function listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) uv_error("listen", trylisten(sock)) return sock @@ -564,16 +627,19 @@ function uv_connectioncb(stream::Ptr{Cvoid}, status::Cint) end function trylisten(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) + iolock_begin() check_open(sock) err = ccall(:uv_listen, Cint, (Ptr{Cvoid}, Cint, Ptr{Cvoid}), sock, backlog, uv_jl_connectioncb::Ptr{Cvoid}) sock.status = StatusActive + iolock_end() return err end ## function accept_nonblock(server::TCPServer, client::TCPSocket) + iolock_begin() if client.status != StatusInit error("client TCPSocket is not in initialization state") end @@ -581,6 +647,7 @@ function accept_nonblock(server::TCPServer, client::TCPSocket) if err == 0 client.status = StatusOpen end + iolock_end() return err end @@ -591,24 +658,31 @@ function accept_nonblock(server::TCPServer) end function accept(server::LibuvServer, client::LibuvStream) - if server.status != StatusActive + iolock_begin() + if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed throw(ArgumentError("server not connected, make sure \"listen\" has been called")) end while isopen(server) err = accept_nonblock(server, client) if err == 0 + iolock_end() return client elseif err != UV_EAGAIN uv_error("accept", err) end + preserve_handle(server) lock(server.cond) + iolock_end() try - stream_wait(server, server.cond) + wait(server.cond) finally unlock(server.cond) + unpreserve_handle(server) end + iolock_begin() end uv_error("accept", UV_ECONNABORTED) + nothing end ## Utility functions @@ -663,6 +737,7 @@ function _sockname(sock, self=true) raddress = zeros(UInt8, 16) rfamily = Ref{Cuint}(0) + iolock_begin() if self r = ccall(:jl_tcp_getsockname, Int32, (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}), @@ -672,6 +747,7 @@ function _sockname(sock, self=true) (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}), sock.handle, rport, raddress, rfamily) end + iolock_end() uv_error("cannot obtain socket name", r) if r == 0 port = ntoh(rport[]) @@ -713,7 +789,6 @@ function __init__() global uv_jl_getaddrinfocb = @cfunction(uv_getaddrinfocb, Cvoid, (Ptr{Cvoid}, Cint, Ptr{Cvoid})) global uv_jl_getnameinfocb = @cfunction(uv_getnameinfocb, Cvoid, (Ptr{Cvoid}, Cint, Cstring, Cstring)) global uv_jl_recvcb = @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)) - global uv_jl_sendcb = @cfunction(uv_sendcb, Cvoid, (Ptr{Cvoid}, Cint)) global uv_jl_connectioncb = @cfunction(uv_connectioncb, Cvoid, (Ptr{Cvoid}, Cint)) global uv_jl_connectcb = @cfunction(uv_connectcb, Cvoid, (Ptr{Cvoid}, Cint)) end diff --git a/stdlib/Sockets/src/addrinfo.jl b/stdlib/Sockets/src/addrinfo.jl index a799a731cbe95..e4e26ff0cdc24 100644 --- a/stdlib/Sockets/src/addrinfo.jl +++ b/stdlib/Sockets/src/addrinfo.jl @@ -16,7 +16,7 @@ function uv_getaddrinfocb(req::Ptr{Cvoid}, status::Cint, addrinfo::Ptr{Cvoid}) t = unsafe_pointer_to_objref(data)::Task uv_req_set_data(req, C_NULL) if status != 0 || addrinfo == C_NULL - schedule(t, _UVError("getaddrinfocb", status)) + schedule(t, _UVError("getaddrinfo", status)) else freeaddrinfo = addrinfo addrs = IPAddr[] @@ -59,6 +59,7 @@ julia> getalladdrinfo("google.com") function getalladdrinfo(host::String) req = Libc.malloc(Base._sizeof_uv_getaddrinfo) uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call + iolock_begin() status = ccall(:jl_getaddrinfo, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), req, host, #=service=#C_NULL, uv_jl_getaddrinfocb::Ptr{Cvoid}) if status < 0 @@ -72,8 +73,9 @@ function getalladdrinfo(host::String) end ct = current_task() preserve_handle(ct) + uv_req_set_data(req, ct) + iolock_end() r = try - uv_req_set_data(req, ct) wait() finally if uv_req_data(req) != C_NULL @@ -98,7 +100,7 @@ function getalladdrinfo(host::String) elseif code == UV_EAI_MEMORY throw(OutOfMemoryError()) else - throw(_UVError("getaddrinfo", code)) + throw(r) end end return r::Vector{IPAddr} @@ -129,7 +131,7 @@ function uv_getnameinfocb(req::Ptr{Cvoid}, status::Cint, hostname::Cstring, serv t = unsafe_pointer_to_objref(data)::Task uv_req_set_data(req, C_NULL) if status != 0 - schedule(t, _UVError("getnameinfocb", status)) + schedule(t, _UVError("getnameinfo", status)) else schedule(t, unsafe_string(hostname)) end @@ -155,18 +157,14 @@ julia> getnameinfo(Sockets.IPv4("8.8.8.8")) function getnameinfo(address::Union{IPv4, IPv6}) req = Libc.malloc(Base._sizeof_uv_getnameinfo) uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call - ev = eventloop() port = hton(UInt16(0)) flags = 0 uvcb = uv_jl_getnameinfocb::Ptr{Cvoid} status = UV_EINVAL - if address isa IPv4 - status = ccall(:jl_getnameinfo, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt32, UInt16, Cint, Ptr{Cvoid}), - ev, req, hton(address.host), port, flags, uvcb) - elseif address isa IPv6 - status = ccall(:jl_getnameinfo6, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ref{UInt128}, UInt16, Cint, Ptr{Cvoid}), - ev, req, hton(address.host), port, flags, uvcb) - end + host_in = Ref(hton(address.host)) + iolock_begin() + status = ccall(:jl_getnameinfo, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Cint, Ptr{Cvoid}, Cint), + eventloop(), req, host_in, port, flags, uvcb, address isa IPv6) if status < 0 Libc.free(req) if status == UV_EINVAL @@ -178,8 +176,9 @@ function getnameinfo(address::Union{IPv4, IPv6}) end ct = current_task() preserve_handle(ct) + uv_req_set_data(req, ct) + iolock_end() r = try - uv_req_set_data(req, ct) wait() finally if uv_req_data(req) != C_NULL @@ -204,7 +203,7 @@ function getnameinfo(address::Union{IPv4, IPv6}) elseif code == UV_EAI_MEMORY throw(OutOfMemoryError()) else - throw(_UVError("getnameinfo", code)) + throw(r) end end return r::String @@ -290,9 +289,9 @@ function getipaddrs(addr_type::Type{T}=IPAddr; loopback::Bool=false) where T<:IP end end sockaddr = ccall(:jl_uv_interface_address_sockaddr, Ptr{Cvoid}, (Ptr{UInt8},), current_addr) - if IPv4 <: T && ccall(:jl_sockaddr_in_is_ip4, Int32, (Ptr{Cvoid},), sockaddr) == 1 + if IPv4 <: T && ccall(:jl_sockaddr_is_ip4, Int32, (Ptr{Cvoid},), sockaddr) == 1 push!(addresses, IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), sockaddr)))) - elseif IPv6 <: T && ccall(:jl_sockaddr_in_is_ip6, Int32, (Ptr{Cvoid},), sockaddr) == 1 + elseif IPv6 <: T && ccall(:jl_sockaddr_is_ip6, Int32, (Ptr{Cvoid},), sockaddr) == 1 addr6 = Ref{UInt128}() scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ref{UInt128},), sockaddr, addr6) push!(addresses, IPv6(ntoh(addr6[]))) diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 1263a81d3f381..a7d31bb40a092 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -142,15 +142,20 @@ defaultport = rand(2000:4000) mktempdir() do tmpdir socketname = Sys.iswindows() ? ("\\\\.\\pipe\\uv-test-" * randstring(6)) : joinpath(tmpdir, "socket") - s = listen(socketname) - tsk = @async begin - sock = accept(s) - write(sock, "Hello World\n") - close(s) - close(sock) + local nconn = 0 + srv = listen(socketname) + t = accept(srv) do client + write(client, "Hello World $(nconn += 1)\n") + close(client) + nconn == 3 && Base.wait_close(srv) end - @test read(connect(socketname), String) == "Hello World\n" - wait(tsk) + @test read(connect(socketname), String) == "Hello World 1\n" + @test read(connect(socketname), String) == "Hello World 2\n" + @test read(connect(socketname), String) == "Hello World 3\n" + conn = connect(socketname) + close(srv) + wait(t) + @test read(conn, String) == "" end end @@ -240,41 +245,68 @@ end bind(b, ip"127.0.0.1", randport + 1) @sync begin - # FIXME: check that we received all messages - for i = 1:3 - @async send(b, ip"127.0.0.1", randport, "Hello World") - @async String(recv(a)) == "Hello World" + let i = 0 + for _ = 1:30 + @async let msg = String(recv(a)) + @test msg == "Hello World $(i += 1)" + end + end + end + yield() + for i = 1:30 + send(b, ip"127.0.0.1", randport, "Hello World $i") end end - - tsk = @async send(b, ip"127.0.0.1", randport, "Hello World") - (addr, data) = recvfrom(a) - @test addr == ip"127.0.0.1" && String(data) == "Hello World" - wait(tsk) + let msg = Vector{UInt8}("fedcba9876543210"^36) # The minimum reassembly buffer size for IPv4 is 576 bytes + tsk = @async @test recv(a) == msg + @test send(b, ip"127.0.0.1", randport, msg) === nothing + wait(tsk) + end + let msg = Vector{UInt8}("1234"^16377) # The maximum size of an IPv4 datagram is 65535 bytes, including the header + @test_throws(Base._UVError("send", Base.UV_EMSGSIZE), + send(b, ip"127.0.0.1", randport, msg)) + pop!(msg) + tsk = @async recv(a) + try + send(b, ip"127.0.0.1", randport, msg) + catch ex + if !(ex isa Base.IOError && ex.code == Base.UV_EMSGSIZE) || Sys.islinux() || Sys.iswindows() + # this is allowed failure on some platforms which might further restrict + # the maximum packet size being sent (even locally), such as BSD's `sysctl net.inet.udp.maxdgram` + rethrow() + end + empty!(msg) + send(b, ip"127.0.0.1", randport, msg) # check that the socket is still alive + end + @test fetch(tsk) == msg + end + let tsk = @async send(b, ip"127.0.0.1", randport, "WORLD HELLO") + (addr, data) = recvfrom(a) + @test addr == ip"127.0.0.1" && String(data) == "WORLD HELLO" + wait(tsk) + end close(a) close(b) end @test_throws MethodError bind(UDPSocket(), randport) - if !Sys.iswindows() || Sys.windows_version() >= Sys.WINDOWS_VISTA_VER let a = UDPSocket() b = UDPSocket() bind(a, ip"::1", UInt16(randport)) bind(b, ip"::1", UInt16(randport + 1)) - tsk = @async begin - @test begin - (addr, data) = recvfrom(a) - addr == ip"::1" && String(data) == "Hello World" + for i = 1:3 + tsk = @async begin + let (addr, data) = recvfrom(a) + @test addr == ip"::1" + @test String(data) == "Hello World" + end end + send(b, ip"::1", randport, "Hello World") + wait(tsk) end - send(b, ip"::1", randport, "Hello World") - wait(tsk) - send(b, ip"::1", randport, "Hello World") - wait(tsk) - end end end diff --git a/test/channels.jl b/test/channels.jl index ae787f71c9bed..2583e2f8e4e39 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -238,26 +238,29 @@ end # interpreting the calling function. @noinline garbage_finalizer(f) = (finalizer(f, "gar" * "bage"); nothing) run = Ref(0) - GC.enable(false) + garbage_finalizer(x -> nothing) # warmup + @test GC.enable(false) # test for finalizers trying to yield leading to failed attempts to context switch garbage_finalizer((x) -> (run[] += 1; sleep(1))) garbage_finalizer((x) -> (run[] += 1; yield())) garbage_finalizer((x) -> (run[] += 1; yieldto(@task () -> ()))) t = @task begin - GC.enable(true) + @test !GC.enable(true) GC.gc() + true end oldstderr = stderr - local newstderr, errstream + newstderr = redirect_stderr() + local errstream try - newstderr = redirect_stderr() errstream = @async read(newstderr[1], String) yield(t) finally redirect_stderr(oldstderr) close(newstderr[2]) end - Base.wait(t) + @test istaskdone(t) + @test fetch(t) @test run[] == 3 @test fetch(errstream) == """ error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer") @@ -267,8 +270,8 @@ end # test for invalid state in Workqueue during yield t = @async nothing t.state = :invalid + newstderr = redirect_stderr() try - newstderr = redirect_stderr() errstream = @async read(newstderr[1], String) yield() finally @@ -292,68 +295,78 @@ end end @testset "Timer / AsyncCondition triggering and race #12719" begin - tc = Ref(0) - t = Timer(0) do t - tc[] += 1 + let tc = Ref(0) + t = Timer(0) do t + tc[] += 1 + end + @test isopen(t) + Base.process_events() + @test !isopen(t) + @test tc[] == 0 + yield() + @test tc[] == 1 end - @test isopen(t) - Base.process_events() - @test !isopen(t) - @test tc[] == 0 - yield() - @test tc[] == 1 - - tc = Ref(0) - t = Timer(0) do t - tc[] += 1 + + let tc = Ref(0) + t = Timer(0) do t + tc[] += 1 + end + @test isopen(t) + close(t) + @test !isopen(t) + sleep(0.1) + @test tc[] == 0 end - @test isopen(t) - close(t) - @test !isopen(t) - sleep(0.1) - @test tc[] == 0 - - tc = Ref(0) - async = Base.AsyncCondition() do async - tc[] += 1 + + let tc = Ref(0) + async = Base.AsyncCondition() do async + tc[] += 1 + end + @test isopen(async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() # schedule event + Sys.iswindows() && Base.process_events() # schedule event (windows?) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + @test tc[] == 0 + yield() # consume event + @test tc[] == 1 + Sys.iswindows() && Base.process_events() # schedule event (windows?) + yield() # consume event + @test tc[] == 2 + sleep(0.1) # no further events + @test tc[] == 2 + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() # schedule event + Sys.iswindows() && Base.process_events() # schedule event (windows?) + close(async) # and close + @test !isopen(async) + @test tc[] == 2 + @test tc[] == 2 + yield() # consume event & then close + @test tc[] == 3 + sleep(0.1) # no further events + @test tc[] == 3 end - @test isopen(async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - Base.process_events() # schedule event - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - Sys.iswindows() && Base.process_events() # schedule event (windows?) - @test tc[] == 0 - yield() # consume event - @test tc[] == 1 - sleep(0.1) # no further events - @test tc[] == 1 - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - close(async) - @test !isopen(async) - @test tc[] == 1 - Base.process_events() # schedule event & then close - Sys.iswindows() && Base.process_events() # schedule event (windows?) - yield() # consume event & then close - @test tc[] == 2 - sleep(0.1) # no further events - @test tc[] == 2 - - tc = Ref(0) - async = Base.AsyncCondition() do async - tc[] += 1 + + let tc = Ref(0) + async = Base.AsyncCondition() do async + tc[] += 1 + end + @test isopen(async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() # schedule event + Sys.iswindows() && Base.process_events() # schedule event (windows) + close(async) + @test !isopen(async) + Base.process_events() # and close + @test tc[] == 0 + yield() # consume event & then close + @test tc[] == 1 + sleep(0.1) + @test tc[] == 1 end - @test isopen(async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - close(async) - @test !isopen(async) - Base.process_events() # schedule event & then close - Sys.iswindows() && Base.process_events() # schedule event (windows) - @test tc[] == 0 - yield() # consume event & then close - @test tc[] == 1 - sleep(0.1) - @test tc[] == 1 end @testset "check_channel_state" begin diff --git a/test/read.jl b/test/read.jl index 8e74e038553bf..5782481b76587 100644 --- a/test/read.jl +++ b/test/read.jl @@ -127,11 +127,11 @@ end open_streams = [] function cleanup() for s_ in open_streams - try close(s_); catch; end + close(s_) end empty!(open_streams) for tsk in tasks - Base.wait(tsk) + wait(tsk) end empty!(tasks) end diff --git a/test/spawn.jl b/test/spawn.jl index 122d6e65825de..d96cc43664335 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -314,7 +314,6 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(stdout, " 1\t", r infd = Base._fd(out.in) outfd = Base._fd(out.out) show(out, out) - notify(ready) @test isreadable(out) @test iswritable(out) close(out.in) @@ -333,11 +332,8 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(stdout, " 1\t", r if Sys.iswindows() # WINNT kernel appears to not provide a fast mechanism for async propagation # of EOF for a blocking stream, so just wait for it to catch up. - # This shouldn't take much more than 32ms. + # This shouldn't take much more than 32ms more. Base.wait_close(out) - # it's closed now, but the other task is expected to be behind this task - # in emptying the read buffer - @test isreadable(out) end @test !isopen(out) end