diff --git a/base/asyncevent.jl b/base/asyncevent.jl index 2299993bd2ea0..625eff3bd288a 100644 --- a/base/asyncevent.jl +++ b/base/asyncevent.jl @@ -15,11 +15,12 @@ mutable struct AsyncCondition handle::Ptr{Cvoid} cond::ThreadSynchronizer isopen::Bool + set::Bool function AsyncCondition() - this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true) + this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true, false) + iolock_begin() associate_julia_struct(this.handle, this) - finalizer(uvfinalize, this) err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this, uv_jl_asynccb::Ptr{Cvoid}) if err != 0 @@ -28,6 +29,8 @@ mutable struct AsyncCondition this.handle = C_NULL throw(_UVError("uv_async_init", err)) end + finalizer(uvfinalize, this) + iolock_end() return this end end @@ -40,28 +43,10 @@ the async condition object itself. """ function AsyncCondition(cb::Function) async = AsyncCondition() - waiter = Task(function() - lock(async.cond) - try - while isopen(async) - success = try - stream_wait(async, async.cond) - true - catch exc # ignore possible exception on close() - isa(exc, EOFError) || rethrow() - finally - unlock(async.cond) - end - success && cb(async) - lock(async.cond) - end - finally - unlock(async.cond) + @async while _trywait(async) + cb(async) + isopen(async) || return end - end) - # must start the task right away so that it can wait for the AsyncCondition before - # we re-enter the event loop. this avoids a race condition. see issue #12719 - yield(waiter) return async end @@ -81,17 +66,26 @@ mutable struct Timer handle::Ptr{Cvoid} cond::ThreadSynchronizer isopen::Bool + set::Bool function Timer(timeout::Real; interval::Real = 0.0) timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds")) interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds")) + timeout = UInt64(round(timeout * 1000)) + 1 + interval = UInt64(round(interval * 1000)) + loop = eventloop() - this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true) - ccall(:jl_uv_update_timer_start, Cvoid, - (Ptr{Cvoid}, Any, Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), - eventloop(), this, this.handle, uv_jl_timercb::Ptr{Cvoid}, - UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000))) + this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true, false) + associate_julia_struct(this.handle, this) + iolock_begin() + err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), loop, this) + @assert err == 0 finalizer(uvfinalize, this) + ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), loop) + err = ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), + this, uv_jl_timercb::Ptr{Cvoid}, timeout, interval) + @assert err == 0 + iolock_end() return this end end @@ -99,41 +93,81 @@ end unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle -function wait(t::Union{Timer, AsyncCondition}) - lock(t.cond) - try - isopen(t) || throw(EOFError()) - stream_wait(t, t.cond) - finally - unlock(t.cond) +function _trywait(t::Union{Timer, AsyncCondition}) + set = t.set + if !set + t.handle == C_NULL && return false + iolock_begin() + set = t.set + if !set + preserve_handle(t) + lock(t.cond) + try + set = t.set + if !set + if t.handle != C_NULL + iolock_end() + set = wait(t.cond) + unlock(t.cond) + iolock_begin() + lock(t.cond) + end + end + finally + unlock(t.cond) + unpreserve_handle(t) + end + end + iolock_end() end + t.set = false + return set +end + +function wait(t::Union{Timer, AsyncCondition}) + _trywait(t) || throw(EOFError()) + nothing end + isopen(t::Union{Timer, AsyncCondition}) = t.isopen function close(t::Union{Timer, AsyncCondition}) + iolock_begin() if t.handle != C_NULL && isopen(t) t.isopen = false ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) end + iolock_end() nothing end function uvfinalize(t::Union{Timer, AsyncCondition}) - if t.handle != C_NULL - disassociate_julia_struct(t.handle) # not going to call the usual close hooks - close(t) - t.handle = C_NULL + iolock_begin() + lock(t.cond) + try + if t.handle != C_NULL + disassociate_julia_struct(t.handle) # not going to call the usual close hooks + if t.isopen + t.isopen = false + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) + end + t.handle = C_NULL + notify(t.cond, false) + end + finally + unlock(t.cond) end - t.isopen = false + iolock_end() nothing end function _uv_hook_close(t::Union{Timer, AsyncCondition}) lock(t.cond) try - uvfinalize(t) - notify_error(t.cond, EOFError()) + t.isopen = false + t.handle = C_NULL + notify(t.cond, t.set) finally unlock(t.cond) end @@ -144,7 +178,8 @@ function uv_asynccb(handle::Ptr{Cvoid}) async = @handle_as handle AsyncCondition lock(async.cond) try - notify(async.cond) + async.set = true + notify(async.cond, true) finally unlock(async.cond) end @@ -155,11 +190,12 @@ function uv_timercb(handle::Ptr{Cvoid}) t = @handle_as handle Timer lock(t.cond) try + t.set = true if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0 # timer is stopped now close(t) end - notify(t.cond) + notify(t.cond, true) finally unlock(t.cond) end @@ -199,7 +235,7 @@ Here the first number is printed after a delay of two seconds, then the followin julia> begin i = 0 cb(timer) = (global i += 1; println(i)) - t = Timer(cb, 2, interval = 0.2) + t = Timer(cb, 2, interval=0.2) wait(t) sleep(0.5) close(t) @@ -209,24 +245,13 @@ julia> begin 3 ``` """ -function Timer(cb::Function, timeout::Real; interval::Real = 0.0) - t = Timer(timeout, interval = interval) - waiter = Task(function() - while isopen(t) - success = try - wait(t) - true - catch exc # ignore possible exception on close() - isa(exc, EOFError) || rethrow() - false - end - success && cb(t) +function Timer(cb::Function, timeout::Real; interval::Real=0.0) + timer = Timer(timeout, interval=interval) + @async while _trywait(timer) + cb(timer) + isopen(timer) || return end - end) - # must start the task right away so that it can wait for the Timer before - # we re-enter the event loop. this avoids a race condition. see issue #12719 - yield(waiter) - return t + return timer end """ @@ -234,12 +259,14 @@ end Waits until `testcb` returns `true` or for `secs` seconds, whichever is earlier. `testcb` is polled every `pollint` seconds. + +Returns :ok, :timed_out, or :error """ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) pollint > 0 || throw(ArgumentError("cannot set pollint to $pollint seconds")) start = time() done = Channel(1) - timercb(aw) = begin + function timercb(aw) try if testcb() put!(done, :ok) @@ -251,14 +278,15 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) finally isready(done) && close(aw) end + nothing end if !testcb() t = Timer(timercb, pollint, interval = pollint) - ret = fetch(done) + ret = fetch(done)::Symbol close(t) else ret = :ok end - ret + return ret end diff --git a/base/condition.jl b/base/condition.jl index 3afc538d53386..eb136637dc1d5 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -76,6 +76,9 @@ unlock(c::GenericCondition) = unlock(c.lock) trylock(c::GenericCondition) = trylock(c.lock) islocked(c::GenericCondition) = islocked(c.lock) +lock(f, c::GenericCondition) = lock(f, c.lock) +unlock(f, c::GenericCondition) = unlock(f, c.lock) + """ wait([x]) diff --git a/base/coreio.jl b/base/coreio.jl index d2be652731e43..2796c53e759f5 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -17,9 +17,7 @@ write(::DevNull, ::UInt8) = 1 unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n close(::DevNull) = nothing flush(::DevNull) = nothing -wait_connected(::DevNull) = nothing wait_readnb(::DevNull) = wait() -wait_readbyte(::DevNull) = wait() wait_close(::DevNull) = wait() eof(::DevNull) = true diff --git a/base/file.jl b/base/file.jl index b7f98c7323774..d553b17c2d081 100644 --- a/base/file.jl +++ b/base/file.jl @@ -537,7 +537,7 @@ function mktempdir(parent=tempdir(); prefix=temp_prefix) try ret = ccall(:uv_fs_mkdtemp, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), - eventloop(), req, tpath, C_NULL) + C_NULL, req, tpath, C_NULL) if ret < 0 ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req) uv_error("mktempdir", ret) @@ -623,8 +623,8 @@ function readdir(path::AbstractString) uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ())) # defined in sys.c, to call uv_fs_readdir, which sets errno on error. - err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}), - eventloop(), uv_readdir_req, path, 0, C_NULL) + err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}), + C_NULL, uv_readdir_req, path, 0, C_NULL) err < 0 && throw(SystemError("unable to read directory $path", -err)) #uv_error("unable to read directory $path", err) @@ -636,7 +636,7 @@ function readdir(path::AbstractString) end # Clean up the request string - ccall(:jl_uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req) + ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{UInt8},), uv_readdir_req) return entries end @@ -808,9 +808,9 @@ Return the target location a symbolic link `path` points to. function readlink(path::AbstractString) req = Libc.malloc(_sizeof_uv_fs) try - ret = ccall(:jl_uv_fs_readlink, Int32, + ret = ccall(:uv_fs_readlink, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), - eventloop(), req, path, C_NULL) + C_NULL, req, path, C_NULL) if ret < 0 ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req) uv_error("readlink", ret) diff --git a/base/filesystem.jl b/base/filesystem.jl index 6b61272cc893b..0bf4c3b1145cf 100644 --- a/base/filesystem.jl +++ b/base/filesystem.jl @@ -73,10 +73,10 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0) req = Libc.malloc(_sizeof_uv_fs) local handle try - ret = ccall(:jl_uv_fs_open, Int32, + ret = ccall(:uv_fs_open, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}), - eventloop(), req, path, flags, mode, C_NULL) - handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req) + C_NULL, req, path, flags, mode, C_NULL) + handle = ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req) ccall(:uv_fs_req_cleanup, Cvoid, (Ptr{Cvoid},), req) uv_error("open", ret) finally # conversion to Cstring could cause an exception @@ -132,9 +132,9 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c)) function truncate(f::File, n::Integer) check_open(f) req = Libc.malloc(_sizeof_uv_fs) - err = ccall(:jl_uv_fs_ftruncate, Int32, + err = ccall(:uv_fs_ftruncate, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}), - eventloop(), req, f.handle, n, C_NULL) + C_NULL, req, f.handle, n, C_NULL) Libc.free(req) uv_error("ftruncate", err) return f @@ -143,9 +143,9 @@ end function futime(f::File, atime::Float64, mtime::Float64) check_open(f) req = Libc.malloc(_sizeof_uv_fs) - err = ccall(:jl_uv_fs_futime, Int32, + err = ccall(:uv_fs_futime, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}), - eventloop(), req, f.handle, atime, mtime, C_NULL) + C_NULL, req, f.handle, atime, mtime, C_NULL) Libc.free(req) uv_error("futime", err) return f diff --git a/base/io.jl b/base/io.jl index e14caf4ac323b..047862ec227b3 100644 --- a/base/io.jl +++ b/base/io.jl @@ -61,9 +61,7 @@ Close an I/O stream. Performs a [`flush`](@ref) first. """ function close end function flush end -function wait_connected end function wait_readnb end -function wait_readbyte end function wait_close end function bytesavailable end @@ -260,7 +258,6 @@ iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)) isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io)) close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io))) wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb) -wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte) wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io))) """ diff --git a/base/libuv.jl b/base/libuv.jl index 760d97e91eed6..5320e1ef6d8e9 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -9,31 +9,31 @@ function uv_sizeof_handle(handle) if !(UV_UNKNOWN_HANDLE < handle < UV_HANDLE_TYPE_MAX) throw(DomainError(handle)) end - ccall(:uv_handle_size,Csize_t,(Int32,),handle) + return ccall(:uv_handle_size, Csize_t, (Int32,), handle) end function uv_sizeof_req(req) if !(UV_UNKNOWN_REQ < req < UV_REQ_TYPE_MAX) throw(DomainError(req)) end - ccall(:uv_req_size,Csize_t,(Int32,),req) + return ccall(:uv_req_size, Csize_t, (Int32,), req) end for h in uv_handle_types -@eval const $(Symbol("_sizeof_",lowercase(string(h)))) = uv_sizeof_handle($h) +@eval const $(Symbol("_sizeof_", lowercase(string(h)))) = uv_sizeof_handle($h) end for r in uv_req_types -@eval const $(Symbol("_sizeof_",lowercase(string(r)))) = uv_sizeof_req($r) +@eval const $(Symbol("_sizeof_", lowercase(string(r)))) = uv_sizeof_req($r) end -uv_handle_data(handle) = ccall(:jl_uv_handle_data,Ptr{Cvoid},(Ptr{Cvoid},),handle) -uv_req_data(handle) = ccall(:jl_uv_req_data,Ptr{Cvoid},(Ptr{Cvoid},),handle) -uv_req_set_data(req,data) = ccall(:jl_uv_req_set_data,Cvoid,(Ptr{Cvoid},Any),req,data) -uv_req_set_data(req,data::Ptr{Cvoid}) = ccall(:jl_uv_req_set_data,Cvoid,(Ptr{Cvoid},Ptr{Cvoid}),req,data) +uv_handle_data(handle) = ccall(:jl_uv_handle_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) +uv_req_data(handle) = ccall(:jl_uv_req_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) +uv_req_set_data(req, data) = ccall(:jl_uv_req_set_data, Cvoid, (Ptr{Cvoid}, Any), req, data) +uv_req_set_data(req, data::Ptr{Cvoid}) = ccall(:jl_uv_req_set_data, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), req, data) macro handle_as(hand, typ) - quote - data = uv_handle_data($(esc(hand))) + return quote + local data = uv_handle_data($(esc(hand))) data == C_NULL && return unsafe_pointer_to_objref(data)::($(esc(typ))) end @@ -45,6 +45,9 @@ disassociate_julia_struct(uv) = disassociate_julia_struct(uv.handle) disassociate_julia_struct(handle::Ptr{Cvoid}) = handle != C_NULL && ccall(:jl_uv_disassociate_julia_struct, Cvoid, (Ptr{Cvoid},), handle) +iolock_begin() = ccall(:jl_iolock_begin, Cvoid, ()) +iolock_end() = ccall(:jl_iolock_end, Cvoid, ()) + # A dict of all libuv handles that are being waited on somewhere in the system # and should thus not be garbage collected const uvhandles = IdDict() @@ -83,16 +86,15 @@ function _UVError(pfx::AbstractString, code::Integer) IOError(string(pfx, ": ", struverror(code), " (", uverrorname(code), ")"), code) end -struverror(err::Int32) = unsafe_string(ccall(:uv_strerror,Cstring,(Int32,),err)) -uverrorname(err::Int32) = unsafe_string(ccall(:uv_err_name,Cstring,(Int32,),err)) +struverror(err::Int32) = unsafe_string(ccall(:uv_strerror, Cstring, (Int32,), err)) +uverrorname(err::Int32) = unsafe_string(ccall(:uv_err_name, Cstring, (Int32,), err)) -uv_error(prefix::Symbol, c::Integer) = uv_error(string(prefix),c) -uv_error(prefix::AbstractString, c::Integer) = c < 0 ? throw(_UVError(prefix,c)) : nothing +uv_error(prefix::Symbol, c::Integer) = uv_error(string(prefix), c) +uv_error(prefix::AbstractString, c::Integer) = c < 0 ? throw(_UVError(prefix, c)) : nothing ## event loop ## eventloop() = uv_eventloop::Ptr{Cvoid} -#mkNewEventLoop() = ccall(:jl_new_event_loop,Ptr{Cvoid},()) # this would probably be fine, but is nowhere supported function process_events() return ccall(:jl_process_events, Int32, (Ptr{Cvoid},), eventloop()) @@ -117,6 +119,7 @@ function reinit_stdio() global stdin = init_stdio(ccall(:jl_stdin_stream, Ptr{Cvoid}, ())) global stdout = init_stdio(ccall(:jl_stdout_stream, Ptr{Cvoid}, ())) global stderr = init_stdio(ccall(:jl_stderr_stream, Ptr{Cvoid}, ())) + nothing end """ diff --git a/base/process.jl b/base/process.jl index 721838229024a..f7797297e8fe8 100644 --- a/base/process.jl +++ b/base/process.jl @@ -319,13 +319,12 @@ mutable struct Process <: AbstractPipe err::IO exitcode::Int64 termsignal::Int32 - exitnotify::Condition - closenotify::Condition + exitnotify::ThreadSynchronizer function Process(cmd::Cmd, handle::Ptr{Cvoid}) this = new(cmd, handle, devnull, devnull, devnull, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), - Condition(), Condition()) + ThreadSynchronizer()) finalizer(uvfinalize, this) return this end @@ -366,14 +365,18 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) proc.termsignal = termsignal ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle) proc.handle = C_NULL - notify(proc.exitnotify) + lock(proc.exitnotify) + try + notify(proc.exitnotify) + finally + unlock(proc.exitnotify) + end nothing end # called when the libuv handle is destroyed function _uv_hook_close(proc::Process) proc.handle = C_NULL - notify(proc.closenotify) nothing end @@ -855,6 +858,7 @@ error if killing the process failed for other reasons (e.g. insufficient permissions). """ function kill(p::Process, signum::Integer) + iolock_begin() if process_running(p) @assert p.handle != C_NULL err = ccall(:uv_process_kill, Int32, (Ptr{Cvoid}, Int32), p.handle, signum) @@ -862,6 +866,8 @@ function kill(p::Process, signum::Integer) throw(_UVError("kill", err)) end end + iolock_end() + nothing end kill(ps::Vector{Process}) = foreach(kill, ps) kill(ps::ProcessChain) = foreach(kill, ps.processes) @@ -876,18 +882,17 @@ Get the child process ID, if it still exists. This function requires at least Julia 1.1. """ function Libc.getpid(p::Process) + # TODO: due to threading, this method is no longer synchronized with the user application + iolock_begin() ppid = Int32(0) if p.handle != C_NULL ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle) end + iolock_end() ppid <= 0 && throw(_UVError("getpid", UV_ESRCH)) return ppid end -function _contains_newline(bufptr::Ptr{Cvoid}, len::Int32) - return (ccall(:memchr, Ptr{Cvoid}, (Ptr{Cvoid},Int32,Csize_t), bufptr, '\n', len) != C_NULL) -end - ## process status ## """ @@ -988,8 +993,26 @@ macro cmd(str) return :(cmd_gen($(esc(shell_parse(str, special=shell_special)[1])))) end -wait(x::Process) = if !process_exited(x); stream_wait(x, x.exitnotify); end -wait(x::ProcessChain) = for p in x.processes; wait(p); end +function wait(x::Process) + process_exited(x) && return + iolock_begin() + if !process_exited(x) + preserve_handle(x) + lock(x.exitnotify) + iolock_end() + try + wait(x.exitnotify) + finally + unlock(x.exitnotify) + unpreserve_handle(x) + end + else + iolock_end() + end + nothing +end + +wait(x::ProcessChain) = foreach(wait, x.processes) show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")") diff --git a/base/stream.jl b/base/stream.jl index 900439718708c..d7ff4c8cc0b7f 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -41,24 +41,15 @@ abstract type LibuvStream <: IO end # Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported) -function stream_wait(x, c...) # for x::LibuvObject - preserve_handle(x) - try - return wait(c...) - finally - unpreserve_handle(x) - end -end - bytesavailable(s::LibuvStream) = bytesavailable(s.buffer) function eof(s::LibuvStream) if isopen(s) # fast path - bytesavailable(s) > 0 && return false + bytesavailable(s) <= 0 || return false else return bytesavailable(s) <= 0 end - wait_readnb(s,1) + wait_readnb(s, 1) return !isopen(s) && bytesavailable(s) <= 0 end @@ -119,17 +110,16 @@ mutable struct PipeEndpoint <: LibuvStream status::Int buffer::IOBuffer cond::ThreadSynchronizer - closenotify::ThreadSynchronizer + readerror::Any sendbuf::Union{IOBuffer, Nothing} - lock::ReentrantLock + lock::ReentrantLock # advisory lock throttle::Int function PipeEndpoint(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() p = new(handle, status, PipeBuffer(), - ThreadSynchronizer(lock), - ThreadSynchronizer(lock), + ThreadSynchronizer(), + nothing, nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) @@ -141,17 +131,21 @@ end function PipeEndpoint() pipe = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit) + iolock_begin() err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0) uv_error("failed to create pipe endpoint", err) pipe.status = StatusInit + iolock_end() return pipe end function PipeEndpoint(fd::OS_HANDLE) pipe = PipeEndpoint() + iolock_begin() err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) uv_error("pipe_open", err) pipe.status = StatusOpen + iolock_end() return pipe end if OS_HANDLE != RawFD @@ -164,19 +158,18 @@ mutable struct TTY <: LibuvStream status::Int buffer::IOBuffer cond::ThreadSynchronizer - closenotify::ThreadSynchronizer + readerror::Any sendbuf::Union{IOBuffer, Nothing} - lock::ReentrantLock + lock::ReentrantLock # advisory lock throttle::Int @static if Sys.iswindows(); ispty::Bool; end function TTY(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() tty = new( handle, status, PipeBuffer(), - ThreadSynchronizer(lock), - ThreadSynchronizer(lock), + ThreadSynchronizer(), + nothing, nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) @@ -191,10 +184,12 @@ end function TTY(fd::OS_HANDLE) tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit) + iolock_begin() err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int32), eventloop(), tty.handle, fd, 0) uv_error("TTY", err) tty.status = StatusOpen + iolock_end() return tty end if OS_HANDLE != RawFD @@ -230,7 +225,9 @@ rawhandle(stream::LibuvStream) = stream.handle unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle function init_stdio(handle::Ptr{Cvoid}) + iolock_begin() t = ccall(:jl_uv_handle_type, Int32, (Ptr{Cvoid},), handle) + local io if t == UV_FILE fd = ccall(:jl_uv_file_handle, OS_HANDLE, (Ptr{Cvoid},), handle) # TODO: Replace ios.c file with libuv fs? @@ -240,17 +237,19 @@ function init_stdio(handle::Ptr{Cvoid}) fd = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), fd, 0) end # TODO: Get fdio to work natively with file descriptors instead of integers - return fdio(cconvert(Cint, fd)) + io = fdio(cconvert(Cint, fd)) elseif t == UV_TTY - return TTY(handle, StatusOpen) + io = TTY(handle, StatusOpen) elseif t == UV_TCP Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) - return Sockets.TCPSocket(handle, StatusOpen) + io = Sockets.TCPSocket(handle, StatusOpen) elseif t == UV_NAMED_PIPE - return PipeEndpoint(handle, StatusOpen) + io = PipeEndpoint(handle, StatusOpen) else throw(ArgumentError("invalid stdio type: $t")) end + iolock_end() + return io end """ @@ -266,35 +265,38 @@ of the original handle. other part of the system. """ function open(h::OS_HANDLE) + iolock_begin() t = ccall(:uv_guess_handle, Cint, (OS_HANDLE,), h) + local io if t == UV_FILE @static if Sys.iswindows() # TODO: Get ios.c to understand native handles h = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), h, 0) end # TODO: Get fdio to work natively with file descriptors instead of integers - return fdio(cconvert(Cint, h)) + io = fdio(cconvert(Cint, h)) elseif t == UV_TTY - return TTY(h) + io = TTY(h) elseif t == UV_TCP Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) - return Sockets.TCPSocket(h) + io = Sockets.TCPSocket(h) elseif t == UV_NAMED_PIPE - pipe = PipeEndpoint(h) + io = PipeEndpoint(h) @static if Sys.iswindows() - if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), pipe.handle) != 0 + if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), io.handle) != 0 # replace the Julia `PipeEndpoint` type with a `TTY` type, # if we detect that this is a cygwin pty object - pipe_handle, pipe_status = pipe.handle, pipe.status - pipe.status = StatusClosed - pipe.handle = C_NULL - return TTY(pipe_handle, pipe_status) + pipe_handle, pipe_status = io.handle, pipe.status + io.status = StatusClosed + io.handle = C_NULL + io = TTY(pipe_handle, pipe_status) end end - return pipe else throw(ArgumentError("invalid stdio type: $t")) end + iolock_end() + return io end if OS_HANDLE != RawFD @@ -324,46 +326,10 @@ function check_open(x::Union{LibuvStream, LibuvServer}) end end -function wait_connected(x::Union{LibuvStream, LibuvServer}) - check_open(x) - lock(x.cond) - try - while x.status == StatusConnecting - stream_wait(x, x.cond) - check_open(x) - end - finally - unlock(x.cond) - end -end - -function wait_readbyte(x::LibuvStream, c::UInt8) - if isopen(x) # fast path - occursin(c, x.buffer) && return - else - return - end - preserve_handle(x) - lock(x.cond) - try - while isopen(x) && !occursin(c, x.buffer) - start_reading(x) # ensure we are reading - wait(x.cond) - end - finally - if isempty(x.cond) - stop_reading(x) # stop reading iff there are currently no other read clients of the stream - end - unpreserve_handle(x) - unlock(x.cond) - end - nothing -end - function wait_readnb(x::LibuvStream, nb::Int) - if isopen(x) # fast path - bytesavailable(x.buffer) >= nb && return - else + iolock_begin() + if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path + iolock_end() return end oldthrottle = x.throttle @@ -371,9 +337,14 @@ function wait_readnb(x::LibuvStream, nb::Int) lock(x.cond) try while isopen(x) && bytesavailable(x.buffer) < nb + x.readerror === nothing || throw(x.readerror) x.throttle = max(nb, x.throttle) start_reading(x) # ensure we are reading + iolock_end() wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) end finally if isempty(x.cond) @@ -385,45 +356,45 @@ function wait_readnb(x::LibuvStream, nb::Int) unpreserve_handle(x) unlock(x.cond) end + iolock_end() nothing end function wait_close(x::Union{LibuvStream, LibuvServer}) - lock(x.closenotify) + preserve_handle(x) + lock(x.cond) try - if isopen(x) - stream_wait(x, x.closenotify) + while isopen(x) + wait(x.cond) end finally - unlock(x.closenotify) + unlock(x.cond) + unpreserve_handle(x) end nothing end function close(stream::Union{LibuvStream, LibuvServer}) + iolock_begin() + should_wait = false if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - return nothing - end - lock(stream.closenotify) - try - if isopen(stream) || stream.status == StatusEOF - should_wait = uv_handle_data(stream) != C_NULL - if stream.status != StatusClosing - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - stream.status = StatusClosing - end - if should_wait - stream_wait(stream, stream.closenotify) - end + stream.status = StatusClosing + elseif isopen(stream) || stream.status == StatusEOF + should_wait = uv_handle_data(stream) != C_NULL + if stream.status != StatusClosing + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) + stream.status = StatusClosing end - finally - unlock(stream.closenotify) end - return nothing + iolock_end() + should_wait && wait_close(stream) + nothing end function uvfinalize(uv::Union{LibuvStream, LibuvServer}) + uv.handle == C_NULL && return + iolock_begin() if uv.handle != C_NULL disassociate_julia_struct(uv.handle) # not going to call the usual close hooks if uv.status != StatusUninit @@ -434,6 +405,7 @@ function uvfinalize(uv::Union{LibuvStream, LibuvServer}) uv.status = StatusClosed uv.handle = C_NULL end + iolock_end() nothing end @@ -489,9 +461,11 @@ function displaysize(io::TTY) s1 = Ref{Int32}(0) s2 = Ref{Int32}(0) + iolock_begin() Base.uv_error("size (TTY)", ccall(:uv_tty_get_winsize, Int32, (Ptr{Cvoid}, Ptr{Int32}, Ptr{Int32}), io, s1, s2) != 0) + iolock_end() w, h = s1[], s2[] h > 0 || (h = default_size[1]) w > 0 || (w = default_size[2]) @@ -522,6 +496,7 @@ function notify_filled(buffer::IOBuffer, nread::Int) else buffer.ptr += nread end + nothing end function alloc_buf_hook(stream::LibuvStream, size::UInt) @@ -568,20 +543,22 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) # remind the client that stream.buffer is full notify(stream.cond) elseif nread == UV_EOF + stream.readerror = EOFError() if isa(stream, TTY) stream.status = StatusEOF # libuv called uv_stop_reading already notify(stream.cond) - notify(stream.closenotify) elseif stream.status != StatusClosing # begin shutdown of the stream ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end else + stream.readerror = _UVError("read", nread) # This is a fatal connection error. Shutdown requests as per the usual # close function won't work and libuv will fail with an assertion failure ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream) - notify_error(stream.cond, _UVError("read", nread)) + stream.status = StatusClosing + notify(stream.cond) end else notify_filled(stream.buffer, nread) @@ -600,7 +577,7 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) ((bytesavailable(stream.buffer) >= stream.throttle) || (bytesavailable(stream.buffer) >= stream.buffer.maxsize))) # save cycles by stopping kernel notifications from arriving - ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream) + ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) stream.status = StatusOpen end nothing @@ -609,22 +586,24 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) end function reseteof(x::TTY) + iolock_begin() if x.status == StatusEOF x.status = StatusOpen + x.readerror isa EOFError && (x.readerror = nothing) end + iolock_end() nothing end function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) - lock(uv.closenotify) + lock(uv.cond) try uv.handle = C_NULL uv.status = StatusClosed # notify any listeners that exist on this libuv stream type - notify(uv.closenotify) notify(uv.cond) finally - unlock(uv.closenotify) + unlock(uv.cond) end nothing end @@ -678,12 +657,14 @@ show(io::IO, stream::Pipe) = print(io, ## Functions for PipeEndpoint and PipeServer ## function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE) + iolock_begin() if p.status != StatusInit error("pipe is already in use or has been closed") end err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle) uv_error("pipe_open", err) p.status = StatusOpen + iolock_end() return p end @@ -698,13 +679,11 @@ function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool, close_pipe_sync(rd) rethrow() end - read_end.status = StatusOpen open_pipe!(write_end, wr) catch close_pipe_sync(wr) rethrow() end - write_end.status = StatusOpen nothing end @@ -736,6 +715,7 @@ end # flow control function start_reading(stream::LibuvStream) + iolock_begin() if stream.status == StatusOpen if !isreadable(stream) error("tried to read a stream that is not readable") @@ -743,17 +723,18 @@ function start_reading(stream::LibuvStream) # libuv may call the alloc callback immediately # for a TTY on Windows, so ensure the status is set first stream.status = StatusActive - ret = ccall(:jl_uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), + ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid}) - return ret elseif stream.status == StatusPaused stream.status = StatusActive - return Int32(0) + ret = Int32(0) elseif stream.status == StatusActive - return Int32(0) + ret = Int32(0) else - return Int32(-1) + ret = Int32(-1) end + iolock_end() + return ret end if Sys.iswindows() @@ -763,17 +744,21 @@ if Sys.iswindows() # and a ReadFile call blocking on one thread # causes all other operations on that stream to lockup function stop_reading(stream::LibuvStream) + iolock_begin() if stream.status == StatusActive stream.status = StatusOpen - ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream) + ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) end + iolock_end() nothing end else function stop_reading(stream::LibuvStream) + iolock_begin() if stream.status == StatusActive stream.status = StatusPaused end + iolock_end() nothing end end @@ -782,53 +767,72 @@ end readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a)) = readbytes!(s, a, Int(nb)) function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) + iolock_begin() sbuf = s.buffer @assert sbuf.seekable == false @assert sbuf.maxsize >= nb + local nread if bytesavailable(sbuf) >= nb - return readbytes!(sbuf, a, nb) + nread = readbytes!(sbuf, a, nb) + iolock_end() + return nread end if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - wait_readnb(s, nb) - return readbytes!(sbuf, a, nb) - else - try - stop_reading(s) # Just playing it safe, since we are going to switch buffers. - newbuf = PipeBuffer(a, maxsize = nb) - newbuf.size = 0 # reset the write pointer to the beginning - s.buffer = newbuf - write(newbuf, sbuf) - wait_readnb(s, Int(nb)) - compact(newbuf) - return bytesavailable(newbuf) - finally - s.buffer = sbuf - if !isempty(s.cond) - start_reading(s) # resume reading iff there are currently other read clients of the stream - end + while isopen(s) && bytesavailable(sbuf) < nb + iolock_end() + wait_readnb(s, nb) + iolock_begin() + end + nread = readbytes!(sbuf, a, nb) + iolock_end() + return nread + end + + nread = try + stop_reading(s) # Just playing it safe, since we are going to switch buffers. + newbuf = PipeBuffer(a, maxsize = nb) + newbuf.size = 0 # reset the write pointer to the beginning + s.buffer = newbuf + write(newbuf, sbuf) + iolock_end() + wait_readnb(s, Int(nb)) + iolock_begin() + compact(newbuf) + bytesavailable(newbuf) + finally + s.buffer = sbuf + if !isempty(s.cond) + start_reading(s) # resume reading iff there are currently other read clients of the stream end end - @assert false # unreachable + iolock_end() + return nread end function read(stream::LibuvStream) wait_readnb(stream, typemax(Int)) - return take!(stream.buffer) + iolock_begin() + bytes = take!(stream.buffer) + iolock_end() + return bytes end function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) + iolock_begin() sbuf = s.buffer @assert sbuf.seekable == false @assert sbuf.maxsize >= nb if bytesavailable(sbuf) >= nb - return unsafe_read(sbuf, p, nb) - end - - if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - wait_readnb(s, Int(nb)) + unsafe_read(sbuf, p, nb) + elseif nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer + while isopen(s) && bytesavailable(sbuf) < nb + iolock_end() + wait_readnb(s, Int(nb)) + iolock_begin() + end unsafe_read(sbuf, p, nb) else try @@ -837,7 +841,9 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) newbuf.size = 0 # reset the write pointer to the beginning s.buffer = newbuf write(newbuf, sbuf) + iolock_end() wait_readnb(s, Int(nb)) + iolock_begin() nb == bytesavailable(newbuf) || throw(EOFError()) finally s.buffer = sbuf @@ -846,42 +852,80 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) end end end + iolock_end() nothing end function read(this::LibuvStream, ::Type{UInt8}) - wait_readnb(this, 1) - buf = this.buffer - @assert buf.seekable == false - return read(buf, UInt8) + iolock_begin() + sbuf = this.buffer + @assert sbuf.seekable == false + while isopen(this) && bytesavailable(sbuf) < 1 + iolock_end() + wait_readnb(this, 1) + iolock_begin() + end + c = read(sbuf, UInt8) + iolock_end() + return c end function readavailable(this::LibuvStream) wait_readnb(this, 1) + iolock_begin() buf = this.buffer @assert buf.seekable == false - return take!(buf) + bytes = take!(buf) + iolock_end() + return bytes end -function readuntil(this::LibuvStream, c::UInt8; keep::Bool=false) - wait_readbyte(this, c) - buf = this.buffer +function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false) + iolock_begin() + buf = x.buffer @assert buf.seekable == false - return readuntil(buf, c, keep=keep) + if isopen(x) && !occursin(c, buf) # fast path + preserve_handle(x) + lock(x.cond) + try + while isopen(x) && !occursin(c, x.buffer) + x.readerror === nothing || throw(x.readerror) + start_reading(x) # ensure we are reading + iolock_end() + wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) + end + finally + if isempty(x.cond) + stop_reading(x) # stop reading iff there are currently no other read clients of the stream + end + unlock(x.cond) + unpreserve_handle(x) + end + end + bytes = readuntil(buf, c, keep=keep) + iolock_end() + return bytes end uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p))) +# caller must have acquired the iolock function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) + uvw = uv_write_async(s, p, n) ct = current_task() - uvw = uv_write_async(s, p, n, ct) preserve_handle(ct) - try + uv_req_set_data(uvw, ct) + iolock_end() + status = try # wait for the last chunk to complete (or error) # assume that any errors would be sticky, # (so we don't need to monitor the error status of the intermediate writes) - wait() + wait()::Cint finally + iolock_begin() if uv_req_data(uvw) != C_NULL # uvw is still alive, # so make sure we won't get spurious notifications later @@ -890,20 +934,24 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) # done with uvw Libc.free(uvw) end + iolock_end() unpreserve_handle(ct) end + if status < 0 + throw(_UVError("write", status)) + end return Int(n) end # helper function for uv_write that returns the uv_write_t struct for the write -# rather than waiting on it -function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt, reqdata) +# rather than waiting on it, caller must hold the iolock +function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt) check_open(s) while true uvw = Libc.malloc(_sizeof_uv_write) - uv_req_set_data(uvw, reqdata) + uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle. - # TODO: use writev, when that is added to uv-win + # TODO: use writev instead of a loop err = ccall(:jl_uv_write, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}), @@ -927,30 +975,32 @@ end # - large isbits arrays are unbuffered and written directly function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) - if s.sendbuf === nothing - return uv_write(s, p, UInt(n)) - end - - buf = s.sendbuf - totb = bytesavailable(buf) + n - if totb < buf.maxsize - nb = unsafe_write(buf, p, n) - else - flush(s) - if n > buf.maxsize - nb = uv_write(s, p, n) - else + while true + # try to add to the send buffer + iolock_begin() + buf = s.sendbuf + buf === nothing && break + totb = bytesavailable(buf) + n + if totb < buf.maxsize nb = unsafe_write(buf, p, n) + iolock_end() + return nb end + bytesavailable(buf) == 0 && break + # perform flush(s) + arr = take!(buf) + uv_write(s, arr) end - return nb + # perform the output to the kernel + return uv_write(s, p, n) end function flush(s::LibuvStream) + iolock_begin() buf = s.sendbuf if buf !== nothing if bytesavailable(buf) > 0 - arr = take!(buf) # Array of UInt8s + arr = take!(buf) uv_write(s, arr) return end @@ -959,16 +1009,25 @@ function flush(s::LibuvStream) return end -buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s) +function buffer_writes(s::LibuvStream, bufsize) + sendbuf = PipeBuffer(bufsize) + iolock_begin() + s.sendbuf = sendbuf + iolock_end() + return s +end ## low-level calls to libuv ## function write(s::LibuvStream, b::UInt8) buf = s.sendbuf if buf !== nothing + iolock_begin() if bytesavailable(buf) + 1 < buf.maxsize + iolock_end() return write(buf, b) end + iolock_end() end return write(s, Ref{UInt8}(b)) end @@ -978,12 +1037,7 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) if d != C_NULL uv_req_set_data(req, C_NULL) # let the Task know we got the writecb t = unsafe_pointer_to_objref(d)::Task - if status < 0 - err = _UVError("write", status) - schedule(t, err, error=true) - else - schedule(t) - end + schedule(t, status) else # no owner for this req, safe to just free it Libc.free(req) @@ -1139,61 +1193,99 @@ end # BufferStream's are non-OS streams, backed by a regular IOBuffer mutable struct BufferStream <: LibuvStream buffer::IOBuffer - r_c::Condition - close_c::Condition + cond::Threads.Condition is_open::Bool buffer_writes::Bool - lock::ReentrantLock + lock::ReentrantLock # advisory lock - BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock()) + BufferStream() = new(PipeBuffer(), Threads.Condition(), true, false, ReentrantLock()) end isopen(s::BufferStream) = s.is_open + function close(s::BufferStream) - s.is_open = false - notify(s.r_c) - notify(s.close_c) - nothing + lock(s.cond) do + s.is_open = false + notify(s.cond) + nothing + end end uvfinalize(s::BufferStream) = nothing -read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8)) -unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) = (wait_readnb(s, Int(nb)); unsafe_read(s.buffer, a, nb)) +function read(s::BufferStream, ::Type{UInt8}) + nread = lock(s.cond) do + wait_readnb(s, 1) + read(s.buffer, UInt8) + end + return nread +end +function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) + lock(s.cond) do + wait_readnb(s, Int(nb)) + unsafe_read(s.buffer, a, nb) + nothing + end +end bytesavailable(s::BufferStream) = bytesavailable(s.buffer) isreadable(s::BufferStream) = s.buffer.readable iswritable(s::BufferStream) = s.buffer.writable function wait_readnb(s::BufferStream, nb::Int) - while isopen(s) && bytesavailable(s.buffer) < nb - wait(s.r_c) + lock(s.cond) do + while isopen(s) && bytesavailable(s.buffer) < nb + wait(s.cond) + end end end -show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",bytesavailable(s.buffer),", isopen:", s.is_open) +show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open) + +function readuntil(s::BufferStream, c::UInt8; keep::Bool=false) + bytes = lock(s.cond) do + while isopen(s) && !occursin(c, s.buffer) + wait(s.cond) + end + readuntil(s.buffer, c, keep=keep) + end + return bytes +end -function wait_readbyte(s::BufferStream, c::UInt8) - while isopen(s) && !occursin(c, s.buffer) - wait(s.r_c) +function wait_close(s::BufferStream) + lock(s.cond) do + while isopen(s) + wait(s.cond) + end end end -wait_close(s::BufferStream) = if isopen(s); wait(s.close_c); end start_reading(s::BufferStream) = Int32(0) stop_reading(s::BufferStream) = nothing write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b)) function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt) - rv = unsafe_write(s.buffer, p, nb) - !(s.buffer_writes) && notify(s.r_c) - return rv + nwrite = lock(s.cond) do + rv = unsafe_write(s.buffer, p, nb) + s.buffer_writes || notify(s.cond) + rv + end + return nwrite end function eof(s::BufferStream) - wait_readnb(s, 1) - return !isopen(s) && bytesavailable(s) <= 0 + bytesavailable(s) > 0 && return false + iseof = lock(s.cond) do + wait_readnb(s, 1) + return !isopen(s) && bytesavailable(s) <= 0 + end + return iseof end # If buffer_writes is called, it will delay notifying waiters till a flush is called. -buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes=true; s) -flush(s::BufferStream) = (notify(s.r_c); nothing) +buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s) +function flush(s::BufferStream) + lock(s.cond) do + notify(s.cond) + nothing + end +end diff --git a/base/sysinfo.jl b/base/sysinfo.jl index e9c71b0401277..1cbe3a994bcd8 100644 --- a/base/sysinfo.jl +++ b/base/sysinfo.jl @@ -203,7 +203,8 @@ end function cpu_info() UVcpus = Ref{Ptr{UV_cpu_info_t}}() count = Ref{Int32}() - Base.uv_error("uv_cpu_info",ccall(:uv_cpu_info, Int32, (Ptr{Ptr{UV_cpu_info_t}}, Ptr{Int32}), UVcpus, count)) + err = ccall(:uv_cpu_info, Int32, (Ptr{Ptr{UV_cpu_info_t}}, Ptr{Int32}), UVcpus, count) + Base.uv_error("uv_cpu_info", err) cpus = Vector{CPUinfo}(undef, count[]) for i = 1:length(cpus) cpus[i] = CPUinfo(unsafe_load(UVcpus[], i)) @@ -219,7 +220,8 @@ Gets the current system uptime in seconds. """ function uptime() uptime_ = Ref{Float64}() - Base.uv_error("uv_uptime",ccall(:uv_uptime, Int32, (Ptr{Float64},), uptime_)) + err = ccall(:uv_uptime, Int32, (Ptr{Float64},), uptime_) + Base.uv_error("uv_uptime", err) return uptime_[] end diff --git a/doc/src/devdocs/locks.md b/doc/src/devdocs/locks.md index 12dc8e4561c18..912fe663f325d 100644 --- a/doc/src/devdocs/locks.md +++ b/doc/src/devdocs/locks.md @@ -58,6 +58,17 @@ trying to acquire it: > > > > currently the lock is merged with the codegen lock, since they call each other recursively +The following lock synchronizes IO operation. Be aware that doing any I/O (for example, +printing warning messages or debug information) while holding any other lock listed above +may result in pernicious and hard-to-find deadlocks. BE VERY CAREFUL! + +> * iolock +> * Individual ThreadSynchronizers locks +> +> > this may continue to be held after releasing the iolock, or acquired without it, +> > but be very careful to never attempt to acquire the iolock while holding it + + The following is the root lock, meaning no other lock shall be held when trying to acquire it: > * toplevel @@ -95,37 +106,32 @@ Module serializer : toplevel lock JIT & type-inference : codegen lock -MethodInstance updates : codegen lock +MethodInstance/CodeInstance updates : Method->writelock, codegen lock -> * These fields are generally lazy initialized, using the test-and-test-and-set pattern. > * These are set at construction and immutable: -> > * specTypes > * sparam_vals > * def + > * These are set by `jl_type_infer` (while holding codegen lock): -> +> * cache > * rettype > * inferred -> * these can also be reset, see `jl_set_lambda_rettype` for that logic as it needs to keep `functionObjectsDecls` -> in sync + * valid ages + > * `inInference` flag: -> > * optimization to quickly avoid recurring into `jl_type_infer` while it is already running > * actual state (of setting `inferred`, then `fptr`) is protected by codegen lock -> * Function pointers (`jlcall_api` and `fptr`, `unspecialized_ducttape`): -> + +> * Function pointers: > * these transition once, from `NULL` to a value, while the codegen lock is held -> * Code-generator cache (the contents of `functionObjectsDecls`): > +> * Code-generator cache (the contents of `functionObjectsDecls`): > * these can transition multiple times, but only while the codegen lock is held > * it is valid to use old version of this, or block for new versions of this, so races are benign, > as long as the code is careful not to reference other data in the method instance (such as `rettype`) > and assume it is coordinated, unless also holding the codegen lock -> * `compile_traced` flag: > -> * unknown - LLVMContext : codegen lock Method : Method->writelock diff --git a/src/jl_uv.c b/src/jl_uv.c index fc4eb147cc755..ba86352488e51 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -71,6 +71,17 @@ void JL_UV_LOCK(void) } } +JL_DLLEXPORT void jl_iolock_begin(void) +{ + JL_UV_LOCK(); +} + +JL_DLLEXPORT void jl_iolock_end(void) +{ + JL_UV_UNLOCK(); +} + + void jl_uv_call_close_callback(jl_value_t *val) { jl_value_t *args[2]; @@ -94,11 +105,11 @@ static void jl_uv_closeHandle(uv_handle_t *handle) JL_STDERR = (JL_STREAM*)STDERR_FILENO; // also let the client app do its own cleanup if (handle->type != UV_FILE && handle->data) { - size_t last_age = jl_get_ptls_states()->world_age; - // TODO: data race on jl_world_counter across many files, to be fixed in a separate revision - jl_get_ptls_states()->world_age = jl_world_counter; + jl_ptls_t ptls = jl_get_ptls_states(); + size_t last_age = ptls->world_age; + ptls->world_age = jl_world_counter; jl_uv_call_close_callback((jl_value_t*)handle->data); - jl_get_ptls_states()->world_age = last_age; + ptls->world_age = last_age; } if (handle == (uv_handle_t*)&signal_async) return; @@ -268,7 +279,9 @@ JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle) // avoid double-closing the stream if (!uv_is_closing(handle)) { JL_UV_LOCK(); - uv_close(handle, &jl_uv_closeHandle); + if (!uv_is_closing(handle)) { + uv_close(handle, &jl_uv_closeHandle); + } JL_UV_UNLOCK(); } } @@ -626,28 +639,33 @@ JL_DLLEXPORT int jl_getpid(void) #endif } -//NOTE: These function expects port/host to be in network byte-order (Big Endian) -JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, uint32_t host, - unsigned int flags) -{ - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_port = port; - addr.sin_addr.s_addr = host; - addr.sin_family = AF_INET; - // TODO: do we need a lock here? - return uv_tcp_bind(handle, (struct sockaddr*)&addr, flags); +typedef union { + struct sockaddr in; + struct sockaddr_in v4; + struct sockaddr_in6 v6; +} uv_sockaddr_in; + +static void jl_sockaddr_fill(uv_sockaddr_in *addr, uint16_t port, void *host, int ipv6) +{ + memset(addr, 0, sizeof(*addr)); + if (ipv6) { + addr->v6.sin6_family = AF_INET6; + memcpy(&addr->v6.sin6_addr, host, 16); + addr->v6.sin6_port = port; + } + else { + addr->v4.sin_family = AF_INET; + addr->v4.sin_addr.s_addr = *(uint32_t*)host; + addr->v4.sin_port = port; + } } -JL_DLLEXPORT int jl_tcp_bind6(uv_tcp_t *handle, uint16_t port, void *host, - unsigned int flags) +//NOTE: These function expects port/host to be in network byte-order (Big Endian) +JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, void *host, + unsigned int flags, int ipv6) { - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_port = port; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_family = AF_INET6; - // TODO: do we need a lock here + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); return uv_tcp_bind(handle, (struct sockaddr*)&addr, flags); } @@ -701,63 +719,23 @@ JL_DLLEXPORT int jl_tcp_getpeername(uv_tcp_t *handle, uint16_t *port, return res; } -JL_DLLEXPORT int jl_udp_bind(uv_udp_t *handle, uint16_t port, uint32_t host, - uint32_t flags) -{ - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_port = port; - addr.sin_addr.s_addr = host; - addr.sin_family = AF_INET; - return uv_udp_bind(handle, (struct sockaddr*)&addr, flags); -} - -JL_DLLEXPORT int jl_udp_bind6(uv_udp_t *handle, uint16_t port, void *host, - uint32_t flags) +JL_DLLEXPORT int jl_udp_bind(uv_udp_t *handle, uint16_t port, void *host, + uint32_t flags, int ipv6) { - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_port = port; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_family = AF_INET6; + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); return uv_udp_bind(handle, (struct sockaddr*)&addr, flags); } -JL_DLLEXPORT int jl_udp_send(uv_udp_t *handle, uint16_t port, uint32_t host, - void *data, uint32_t size, uv_udp_send_cb cb) +JL_DLLEXPORT int jl_udp_send(uv_udp_send_t *req, uv_udp_t *handle, uint16_t port, void *host, + char *data, uint32_t size, uv_udp_send_cb cb, int ipv6) { - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_port = port; - addr.sin_addr.s_addr = host; - addr.sin_family = AF_INET; + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); uv_buf_t buf[1]; - buf[0].base = (char *) data; - buf[0].len = size; - uv_udp_send_t *req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - req->data = handle->data; - JL_UV_LOCK(); - int r = uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_udp_send6(uv_udp_t *handle, uint16_t port, void *host, - void *data, uint32_t size, uv_udp_send_cb cb) -{ - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_port = port; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_family = AF_INET6; - uv_buf_t buf[1]; - buf[0].base = (char *) data; + buf[0].base = data; buf[0].len = size; - uv_udp_send_t *req = (uv_udp_send_t *) malloc(sizeof(uv_udp_send_t)); - req->data = handle->data; - JL_UV_LOCK(); int r = uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); - JL_UV_UNLOCK(); return r; } @@ -769,7 +747,7 @@ JL_DLLEXPORT int jl_uv_sizeof_interface_address(void) JL_DLLEXPORT int jl_uv_interface_addresses(uv_interface_address_t **ifAddrStruct, int *count) { - return uv_interface_addresses(ifAddrStruct,count); + return uv_interface_addresses(ifAddrStruct, count); } JL_DLLEXPORT int jl_uv_interface_address_is_internal(uv_interface_address_t *addr) @@ -796,124 +774,73 @@ JL_DLLEXPORT int jl_getaddrinfo(uv_loop_t *loop, uv_getaddrinfo_t *req, } JL_DLLEXPORT int jl_getnameinfo(uv_loop_t *loop, uv_getnameinfo_t *req, - uint32_t host, uint16_t port, int flags, uv_getnameinfo_cb uvcb) + void *host, uint16_t port, int flags, uv_getnameinfo_cb uvcb, int ipv6) { - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = host; - addr.sin_port = port; - - req->data = NULL; + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); return uv_getnameinfo(loop, req, uvcb, (struct sockaddr*)&addr, flags); } -JL_DLLEXPORT int jl_getnameinfo6(uv_loop_t *loop, uv_getnameinfo_t *req, - void *host, uint16_t port, int flags, uv_getnameinfo_cb uvcb) -{ - struct sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_port = port; - - req->data = NULL; - JL_UV_LOCK(); - int r = uv_getnameinfo(loop, req, uvcb, (struct sockaddr*)&addr, flags); - JL_UV_UNLOCK(); - return r; -} - - JL_DLLEXPORT struct sockaddr *jl_sockaddr_from_addrinfo(struct addrinfo *addrinfo) { return addrinfo->ai_addr; } + JL_DLLEXPORT struct addrinfo *jl_next_from_addrinfo(struct addrinfo *addrinfo) { return addrinfo->ai_next; } -JL_DLLEXPORT int jl_sockaddr_in_is_ip4(struct sockaddr_in *addr) -{ - return (addr->sin_family==AF_INET); -} - -JL_DLLEXPORT int jl_sockaddr_in_is_ip6(struct sockaddr_in *addr) +JL_DLLEXPORT int jl_sockaddr_is_ip4(struct sockaddr *addr) { - return (addr->sin_family==AF_INET6); + return (addr->sa_family == AF_INET); } -JL_DLLEXPORT int jl_sockaddr_is_ip4(struct sockaddr_storage *addr) +JL_DLLEXPORT int jl_sockaddr_is_ip6(struct sockaddr *addr) { - return (addr->ss_family==AF_INET); + return (addr->sa_family == AF_INET6); } -JL_DLLEXPORT int jl_sockaddr_is_ip6(struct sockaddr_storage *addr) -{ - return (addr->ss_family==AF_INET6); -} - -JL_DLLEXPORT unsigned int jl_sockaddr_host4(struct sockaddr_in *addr) +JL_DLLEXPORT uint32_t jl_sockaddr_host4(struct sockaddr_in *addr) { return addr->sin_addr.s_addr; } -JL_DLLEXPORT unsigned int jl_sockaddr_host6(struct sockaddr_in6 *addr, char *host) +JL_DLLEXPORT unsigned jl_sockaddr_host6(struct sockaddr_in6 *addr, char *host) { memcpy(host, &addr->sin6_addr, 16); return addr->sin6_scope_id; } -JL_DLLEXPORT void jl_sockaddr_set_port(struct sockaddr_storage *addr, - uint16_t port) +JL_DLLEXPORT uint16_t jl_sockaddr_port4(struct sockaddr_in *addr) { - if (addr->ss_family==AF_INET) - ((struct sockaddr_in*)addr)->sin_port = port; - else - ((struct sockaddr_in6*)addr)->sin6_port = port; + return addr->sin_port; } -JL_DLLEXPORT int jl_tcp4_connect(uv_tcp_t *handle,uint32_t host, uint16_t port, - uv_connect_cb cb) +JL_DLLEXPORT uint16_t jl_sockaddr_port6(struct sockaddr_in6 *addr) { - struct sockaddr_in addr; - uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - req->data = 0; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = host; - addr.sin_port = port; - JL_UV_LOCK(); - int r = uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); - JL_UV_UNLOCK(); - return r; + return addr->sin6_port; } -JL_DLLEXPORT int jl_tcp6_connect(uv_tcp_t *handle, void *host, uint16_t port, - uv_connect_cb cb) + +JL_DLLEXPORT void jl_sockaddr_set_port(uv_sockaddr_in *addr, uint16_t port) { - struct sockaddr_in6 addr; - uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - req->data = 0; - memset(&addr, 0, sizeof(struct sockaddr_in6)); - addr.sin6_family = AF_INET6; - memcpy(&addr.sin6_addr, host, 16); - addr.sin6_port = port; - JL_UV_LOCK(); - int r = uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); - JL_UV_UNLOCK(); - return r; + if (addr->in.sa_family == AF_INET) + addr->v4.sin_port = port; + else + addr->v6.sin6_port = port; } -JL_DLLEXPORT int jl_connect_raw(uv_tcp_t *handle,struct sockaddr_storage *addr, - uv_connect_cb cb) +JL_DLLEXPORT int jl_tcp_connect(uv_tcp_t *handle, void *host, uint16_t port, + uv_connect_cb cb, int ipv6) { + uv_sockaddr_in addr; + jl_sockaddr_fill(&addr, port, host, ipv6); uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - req->data = 0; - JL_UV_LOCK(); - int r = uv_tcp_connect(req,handle,(struct sockaddr*)addr,cb); - JL_UV_UNLOCK(); + req->data = NULL; + int r = uv_tcp_connect(req, handle, &addr.in, cb); + if (r) + free(req); return r; } @@ -1085,94 +1012,6 @@ JL_DLLEXPORT int jl_queue_work(work_cb_t work_func, void *work_args, void *work_ return 0; } -JL_DLLEXPORT void jl_uv_update_timer_start(uv_loop_t* loop, jl_value_t* jltimer, - uv_timer_t* uvtimer, uv_timer_cb cb, - uint64_t timeout, uint64_t repeat) -{ - JL_UV_LOCK(); - int err = uv_timer_init(loop, uvtimer); - if (err) - abort(); - - jl_uv_associate_julia_struct((uv_handle_t*)uvtimer, jltimer); - uv_update_time(loop); - err = uv_timer_start(uvtimer, cb, timeout, repeat); - if (err) - abort(); - JL_UV_UNLOCK(); -} - -JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop) -{ - JL_UV_LOCK(); - uv_stop(loop); - // TODO: use memory/compiler fence here instead of the lock - JL_UV_UNLOCK(); -} - -JL_DLLEXPORT int jl_uv_fs_scandir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, - uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_scandir(loop, req, path, flags, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_readlink(uv_loop_t* loop, uv_fs_t* req, const char* path, - uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_readlink(loop, req, path, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, - int mode, uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_open(loop, req, path, flags, mode, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_ftruncate(uv_loop_t* loop, uv_fs_t* req, uv_os_fd_t handle, - int64_t offset, uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_ftruncate(loop, req, handle, offset, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_fs_futime(uv_loop_t* loop, uv_fs_t* req, uv_os_fd_t handle, double atime, - double mtime, uv_fs_cb cb) -{ - JL_UV_LOCK(); - int r = uv_fs_futime(loop, req, handle, atime, mtime, cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) -{ - JL_UV_LOCK(); - int r = uv_read_start(handle, alloc_cb, read_cb); - JL_UV_UNLOCK(); - return r; -} - -JL_DLLEXPORT int jl_uv_read_stop(uv_stream_t* handle) -{ - JL_UV_LOCK(); - int r = uv_read_stop(handle); - JL_UV_UNLOCK(); - return r; -} - - #ifndef _OS_WINDOWS_ #if defined(__APPLE__) int uv___stream_fd(uv_stream_t *handle); diff --git a/src/julia.h b/src/julia.h index cb4a0dd3e877a..ff777c9905056 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1783,11 +1783,6 @@ JL_DLLEXPORT uv_loop_t *jl_global_event_loop(void); JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle); -JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, uint32_t host, - unsigned int flags); - -JL_DLLEXPORT int jl_sizeof_ios_t(void); - JL_DLLEXPORT jl_array_t *jl_take_buffer(ios_t *s); typedef struct { diff --git a/src/julia_internal.h b/src/julia_internal.h index 839fbe4aac8c4..094ed264da511 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -868,8 +868,6 @@ int jl_array_store_unboxed(jl_value_t *el_type); JL_DLLEXPORT jl_value_t *(jl_array_data_owner)(jl_array_t *a); JL_DLLEXPORT int jl_array_isassigned(jl_array_t *a, size_t i); -JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop); - JL_DLLEXPORT uintptr_t jl_object_id_(jl_value_t *tv, jl_value_t *v) JL_NOTSAFEPOINT; // -- synchronization utilities -- // diff --git a/src/sys.c b/src/sys.c index 9632765801fe2..ded24078bc05c 100644 --- a/src/sys.c +++ b/src/sys.c @@ -118,10 +118,8 @@ JL_DLLEXPORT int32_t jl_nb_available(ios_t *s) // --- dir/file stuff --- JL_DLLEXPORT int jl_sizeof_uv_fs_t(void) { return sizeof(uv_fs_t); } -JL_DLLEXPORT void jl_uv_fs_req_cleanup(uv_fs_t *req) { uv_fs_req_cleanup(req); } JL_DLLEXPORT char *jl_uv_fs_t_ptr(uv_fs_t *req) { return (char*)req->ptr; } JL_DLLEXPORT char *jl_uv_fs_t_path(uv_fs_t *req) { return (char*)req->path; } -JL_DLLEXPORT ssize_t jl_uv_fs_result(uv_fs_t *f) { return f->result; } // --- stat --- JL_DLLEXPORT int jl_sizeof_stat(void) { return sizeof(uv_stat_t); } diff --git a/stdlib/Distributed/src/Distributed.jl b/stdlib/Distributed/src/Distributed.jl index dcdc75d253327..c1d6cc5d5ffff 100644 --- a/stdlib/Distributed/src/Distributed.jl +++ b/stdlib/Distributed/src/Distributed.jl @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, hash, ==, kill, close, isopen, showerror # imports for use -using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected, +using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, VERSION_STRING, binding_module, atexit, julia_exename, julia_cmd, AsyncGenerator, acquire, release, invokelatest, shell_escape_posixly, uv_error, something, notnothing, isbuffered @@ -18,7 +18,7 @@ using Base.Threads: Event using Serialization, Sockets import Serialization: serialize, deserialize -import Sockets: connect +import Sockets: connect, wait_connected # NOTE: clusterserialize.jl imports additional symbols from Serialization for use diff --git a/stdlib/Distributed/src/managers.jl b/stdlib/Distributed/src/managers.jl index c3a877805b321..62141b1fd1474 100644 --- a/stdlib/Distributed/src/managers.jl +++ b/stdlib/Distributed/src/managers.jl @@ -484,10 +484,15 @@ function socket_reuse_port() end end -function bind_client_port(s) - err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, Cuint), - s.handle, hton(client_port[]), hton(UInt32(0)), 0) - uv_error("bind() failed", err) +# TODO: this doesn't belong here, it belongs in Sockets +function bind_client_port(s::TCPSocket) + Sockets.iolock_begin() + @assert s.status == Sockets.StatusInit + host_in = Ref(hton(UInt32(0))) # IPv4 0.0.0.0 + err = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint), + s, hton(client_port[]), host_in, 0, false) + Sockets.iolock_end() + uv_error("tcp_bind", err) _addr, port = getsockname(s) client_port[] = port diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 5df7f4ceb6b34..9c241e3c7bf21 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -18,9 +18,10 @@ export PollingFileWatcher, FDWatcher -import Base: @handle_as, wait, close, eventloop, notify_error, stream_wait, IOError, +import Base: @handle_as, wait, close, eventloop, notify_error, IOError, _sizeof_uv_poll, _sizeof_uv_fs_poll, _sizeof_uv_fs_event, _uv_hook_close, uv_error, _UVError, - associate_julia_struct, disassociate_julia_struct, isreadable, iswritable, | + iolock_begin, iolock_end, associate_julia_struct, disassociate_julia_struct, + preserve_handle, unpreserve_handle, isreadable, iswritable, | import Base.Filesystem.StatStruct if Sys.iswindows() import Base.WindowsRawSocket @@ -80,11 +81,13 @@ mutable struct FileMonitor handle = Libc.malloc(_sizeof_uv_fs_event) this = new(handle, file, Base.ThreadSynchronizer(), 0, false) associate_julia_struct(handle, this) + iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) throw(_UVError("FileMonitor", err)) end + iolock_end() finalizer(uvfinalize, this) return this end @@ -100,6 +103,7 @@ mutable struct FolderMonitor handle = Libc.malloc(_sizeof_uv_fs_event) this = new(handle, Channel(Inf), false) associate_julia_struct(handle, this) + iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) @@ -110,6 +114,7 @@ mutable struct FolderMonitor uv_error("FolderMonitor (start)", ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), handle, uv_jl_fseventscb_folder::Ptr{Cvoid}, folder, 0)) + iolock_end() return this end end @@ -127,12 +132,14 @@ mutable struct PollingFileWatcher handle = Libc.malloc(_sizeof_uv_fs_poll) this = new(handle, file, round(UInt32, interval * 1000), Base.ThreadSynchronizer(), false, 0, StatStruct()) associate_julia_struct(handle, this) + iolock_begin() err = ccall(:uv_fs_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) throw(_UVError("PollingFileWatcher", err)) end finalizer(uvfinalize, this) + iolock_end() return this end end @@ -153,6 +160,7 @@ mutable struct _FDWatcher throw(ArgumentError("must specify at least one of readable or writable to create a FDWatcher")) end fdnum = Core.Intrinsics.bitcast(Int32, fd) + 1 + iolock_begin() if fdnum > length(FDWatchers) old_len = length(FDWatchers) resize!(FDWatchers, fdnum) @@ -160,6 +168,7 @@ mutable struct _FDWatcher elseif FDWatchers[fdnum] !== nothing this = FDWatchers[fdnum]::_FDWatcher this.refcount = (this.refcount[1] + Int(readable), this.refcount[2] + Int(writable)) + iolock_end() return this end if ccall(:jl_uv_unix_fd_is_watched, Int32, (RawFD, Ptr{Cvoid}, Ptr{Cvoid}), fd, C_NULL, eventloop()) == 1 @@ -182,11 +191,13 @@ mutable struct _FDWatcher end finalizer(uvfinalize, this) FDWatchers[fdnum] = this + iolock_end() return this end end function uvfinalize(t::_FDWatcher) + iolock_begin() lock(t.notify) try if t.handle != C_NULL @@ -205,6 +216,7 @@ mutable struct _FDWatcher finally unlock(t.notify) end + iolock_end() nothing end end @@ -228,8 +240,10 @@ mutable struct _FDWatcher 0, (false, false)) associate_julia_struct(handle, this) - err = ccall(:uv_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, WindowsRawSocket), - eventloop(), handle, fd) + iolock_begin() + err = ccall(:uv_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, WindowsRawSocket), + eventloop(), handle, fd) + iolock_end() if err != 0 Libc.free(handle) throw(_UVError("FDWatcher", err)) @@ -261,12 +275,15 @@ end function close(t::_FDWatcher, readable::Bool, writable::Bool) + iolock_begin() if t.refcount != (0, 0) t.refcount = (t.refcount[1] - Int(readable), t.refcount[2] - Int(writable)) end if t.refcount == (0, 0) uvfinalize(t) end + iolock_end() + nothing end function close(t::FDWatcher) @@ -403,6 +420,7 @@ function __init__() end function start_watching(t::_FDWatcher) + iolock_begin() t.handle == C_NULL && return throw(ArgumentError("FDWatcher is closed")) readable = t.refcount[1] > 0 writable = t.refcount[2] > 0 @@ -415,10 +433,12 @@ function start_watching(t::_FDWatcher) uv_jl_pollcb::Ptr{Cvoid})) t.active = (readable, writable) end + iolock_end() nothing end function start_watching(t::PollingFileWatcher) + iolock_begin() t.handle == C_NULL && return throw(ArgumentError("PollingFileWatcher is closed")) if !t.active uv_error("PollingFileWatcher (start)", @@ -426,10 +446,12 @@ function start_watching(t::PollingFileWatcher) t.handle, uv_jl_fspollcb::Ptr{Cvoid}, t.file, t.interval)) t.active = true end + iolock_end() nothing end function stop_watching(t::PollingFileWatcher) + iolock_begin() lock(t.notify) try if t.active && isempty(t.notify) @@ -440,10 +462,12 @@ function stop_watching(t::PollingFileWatcher) finally unlock(t.notify) end + iolock_end() nothing end function start_watching(t::FileMonitor) + iolock_begin() t.handle == C_NULL && return throw(ArgumentError("FileMonitor is closed")) if !t.active uv_error("FileMonitor (start)", @@ -451,10 +475,12 @@ function start_watching(t::FileMonitor) t.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, t.file, 0)) t.active = true end + iolock_end() nothing end function stop_watching(t::FileMonitor) + iolock_begin() lock(t.notify) try if t.active && isempty(t.notify) @@ -465,6 +491,7 @@ function stop_watching(t::FileMonitor) finally unlock(t.notify) end + iolock_end() nothing end @@ -473,12 +500,16 @@ function wait(fdw::FDWatcher) return wait(fdw.watcher, readable = fdw.readable, writable = fdw.writable) end end + function wait(fdw::_FDWatcher; readable=true, writable=true) events = FDEvent(Int32(0)) - while true - if isa(events, FDEvent) - events |= FDEvent(fdw.events) + iolock_begin() + preserve_handle(fdw) + lock(fdw.notify) + try + while true haveevent = false + events |= FDEvent(fdw.events) if readable && isreadable(events) fdw.events &= ~UV_READABLE haveevent = true @@ -488,23 +519,25 @@ function wait(fdw::_FDWatcher; readable=true, writable=true) haveevent = true end if haveevent - return events + break end - else - throw(events) - end - if fdw.refcount == (0, 0) # !open - events = EOFError() - else - lock(fdw.notify) - try + if fdw.refcount == (0, 0) # !open + throw(EOFError()) + else start_watching(fdw) # make sure the poll is active - events = stream_wait(fdw, fdw.notify)::FDEvent - finally + iolock_end() + events = wait(fdw.notify)::FDEvent unlock(fdw.notify) + iolock_begin() + lock(fdw.notify) end end + finally + unlock(fdw.notify) + unpreserve_handle(fdw) end + iolock_end() + return events end function wait(fd::RawFD; readable=false, writable=false) @@ -528,15 +561,23 @@ if Sys.iswindows() end function wait(pfw::PollingFileWatcher) + iolock_begin() + preserve_handle(pfw) lock(pfw.notify) local prevstat try start_watching(pfw) - prevstat = stream_wait(pfw, pfw.notify)::StatStruct + iolock_end() + prevstat = wait(pfw.notify)::StatStruct + unlock(pfw.notify) + iolock_begin() + lock(pfw.notify) finally unlock(pfw.notify) + unpreserve_handle(pfw) end stop_watching(pfw) + iolock_end() if pfw.handle == C_NULL return prevstat, EOFError() elseif pfw.curr_error != 0 @@ -547,17 +588,25 @@ function wait(pfw::PollingFileWatcher) end function wait(m::FileMonitor) + iolock_begin() + preserve_handle(m) lock(m.notify) local events try start_watching(m) - events = stream_wait(m, m.notify)::FileEvent + iolock_end() + events = wait(m.notify)::FileEvent events |= FileEvent(m.events) m.events = 0 + unlock(m.notify) + iolock_begin() + lock(m.notify) finally unlock(m.notify) + unpreserve_handle(m) end stop_watching(m) + iolock_end() return events end @@ -566,11 +615,11 @@ function wait(m::FolderMonitor) if isready(m.notify) evt = take!(m.notify) # non-blocking fast-path else - Base.preserve_handle(m) + preserve_handle(m) evt = try take!(m.notify) catch ex - Base.unpreserve_handle(m) + unpreserve_handle(m) if ex isa InvalidStateException && ex.state == :closed rethrow(EOFError()) # `wait(::Channel)` throws the wrong exception end diff --git a/stdlib/Sockets/src/PipeServer.jl b/stdlib/Sockets/src/PipeServer.jl index 05958c1f18402..f7c5c4cca3cd1 100644 --- a/stdlib/Sockets/src/PipeServer.jl +++ b/stdlib/Sockets/src/PipeServer.jl @@ -4,13 +4,10 @@ mutable struct PipeServer <: LibuvServer handle::Ptr{Cvoid} status::Int cond::Base.ThreadSynchronizer - closenotify::Base.ThreadSynchronizer function PipeServer(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() p = new(handle, status, - Base.ThreadSynchronizer(lock), - Base.ThreadSynchronizer(lock)) + Base.ThreadSynchronizer()) associate_julia_struct(p.handle, p) finalizer(uvfinalize, p) return p @@ -19,9 +16,11 @@ end function PipeServer() pipe = PipeServer(Libc.malloc(Base._sizeof_uv_named_pipe), StatusUninit) + iolock_begin() err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0) uv_error("failed to create pipe server", err) pipe.status = StatusInit + iolock_end() return pipe end @@ -30,6 +29,7 @@ end accept(server::PipeServer) = accept(server, PipeEndpoint()) function accept_nonblock(server::PipeServer, client::PipeEndpoint) + iolock_begin() if client.status != StatusInit error("client is already in use or has been closed") end @@ -37,6 +37,7 @@ function accept_nonblock(server::PipeServer, client::PipeEndpoint) if err == 0 client.status = StatusOpen end + iolock_end() return err end @@ -47,18 +48,21 @@ function accept_nonblock(server::PipeServer) end function bind(server::PipeServer, name::AbstractString) + iolock_begin() @assert server.status == StatusInit err = ccall(:uv_pipe_bind, Int32, (Ptr{Cvoid}, Cstring), server, name) if err != 0 + iolock_end() if err != UV_EADDRINUSE && err != UV_EACCES #TODO: this codepath is currently not tested - throw(_UVError("bind",err)) + throw(_UVError("bind", err)) else return false end end server.status = StatusOpen + iolock_end() return true end @@ -74,18 +78,17 @@ function listen(path::AbstractString) end function connect!(sock::PipeEndpoint, path::AbstractString) + iolock_begin() @assert sock.status == StatusInit req = Libc.malloc(Base._sizeof_uv_connect) uv_req_set_data(req, C_NULL) ccall(:uv_pipe_connect, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), req, sock.handle, path, uv_jl_connectcb::Ptr{Cvoid}) sock.status = StatusConnecting + iolock_end() return sock end -# Libuv will internally reset read/writability, which is uses to -# mark that this is an invalid pipe. - """ connect(path::AbstractString) -> PipeEndpoint diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 0c6e867b20639..7a1726f6c385b 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -31,9 +31,10 @@ export import Base: isless, show, print, parse, bind, convert, isreadable, iswritable, alloc_buf_hook, _uv_hook_close using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize, - notify_error, stream_wait, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError, + notify_error, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError, eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive, - uv_status_string, check_open, wait_connected, OS_HANDLE, RawFD, + preserve_handle, unpreserve_handle, iolock_begin, iolock_end, + uv_status_string, check_open, OS_HANDLE, RawFD, UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL, UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS, UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL, @@ -56,19 +57,18 @@ mutable struct TCPSocket <: LibuvStream status::Int buffer::IOBuffer cond::Base.ThreadSynchronizer - closenotify::Base.ThreadSynchronizer + readerror::Any sendbuf::Union{IOBuffer, Nothing} - lock::ReentrantLock + lock::ReentrantLock # advisory lock throttle::Int function TCPSocket(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() tcp = new( handle, status, PipeBuffer(), - Base.ThreadSynchronizer(lock), - Base.ThreadSynchronizer(lock), + Base.ThreadSynchronizer(), + nothing, nothing, ReentrantLock(), Base.DEFAULT_READ_BUFFER_SZ) @@ -82,18 +82,22 @@ end function TCPSocket(; delay=true) tcp = TCPSocket(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit) af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2 + iolock_begin() err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint), eventloop(), tcp.handle, af_spec) uv_error("failed to create tcp socket", err) tcp.status = StatusInit + iolock_end() return tcp end function TCPSocket(fd::OS_HANDLE) tcp = TCPSocket() + iolock_begin() err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) uv_error("tcp_open", err) tcp.status = StatusOpen + iolock_end() return tcp end if OS_HANDLE != RawFD @@ -105,15 +109,12 @@ mutable struct TCPServer <: LibuvServer handle::Ptr{Cvoid} status::Int cond::Base.ThreadSynchronizer - closenotify::Base.ThreadSynchronizer function TCPServer(handle::Ptr{Cvoid}, status) - lock = Threads.SpinLock() tcp = new( handle, status, - Base.ThreadSynchronizer(lock), - Base.ThreadSynchronizer(lock)) + Base.ThreadSynchronizer()) associate_julia_struct(tcp.handle, tcp) finalizer(uvfinalize, tcp) return tcp @@ -126,10 +127,12 @@ end function TCPServer(; delay=true) tcp = TCPServer(Libc.malloc(Base._sizeof_uv_tcp), StatusUninit) af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2 + iolock_begin() err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cuint), eventloop(), tcp.handle, af_spec) uv_error("failed to create tcp server", err) tcp.status = StatusInit + iolock_end() return tcp end @@ -137,7 +140,7 @@ isreadable(io::TCPSocket) = isopen(io) || bytesavailable(io) > 0 iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing """ - accept(server[,client]) + accept(server[, client]) Accepts a connection on the given server and returns a connection to the client. An uninitialized client stream may be provided, in which case it will be used instead of @@ -145,6 +148,23 @@ creating a new stream. """ accept(server::TCPServer) = accept(server, TCPSocket()) +function accept(callback, server::LibuvServer) + task = @async try + while true + client = accept(server) + callback(client) + end + catch ex + # accept below may explicitly throw UV_ECONNABORTED: + # filter that out since we expect that error + if !(ex isa IOError && ex.code == UV_ECONNABORTED) || isopen(server) + rethrow() + end + end + return task # caller is responsible for checking for errors +end + + # UDP """ UDPSocket() @@ -155,17 +175,12 @@ fields to denote the state of the socket. mutable struct UDPSocket <: LibuvStream handle::Ptr{Cvoid} status::Int - recvnotify::Condition - sendnotify::Condition - closenotify::Base.ThreadSynchronizer + recvnotify::Base.ThreadSynchronizer + cond::Base.ThreadSynchronizer function UDPSocket(handle::Ptr{Cvoid}, status) - udp = new( - handle, - status, - Condition(), - Condition(), - Base.ThreadSynchronizer()) + cond = Base.ThreadSynchronizer() + udp = new(handle, status, Base.ThreadSynchronizer(cond.lock), cond) associate_julia_struct(udp.handle, udp) finalizer(uvfinalize, udp) return udp @@ -173,26 +188,28 @@ mutable struct UDPSocket <: LibuvStream end function UDPSocket() this = UDPSocket(Libc.malloc(Base._sizeof_uv_udp), StatusUninit) + iolock_begin() err = ccall(:uv_udp_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this.handle) uv_error("failed to create udp socket", err) this.status = StatusInit + iolock_end() return this end show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")") function _uv_hook_close(sock::UDPSocket) - lock(sock.closenotify) + sock.handle = C_NULL + lock(sock.cond) try - sock.handle = C_NULL sock.status = StatusClosed - notify(sock.closenotify) + notify(sock.cond) + notify_error(sock.recvnotify, EOFError()) finally - unlock(sock.closenotify) + unlock(sock.cond) end - notify(sock.sendnotify) - notify_error(sock.recvnotify,EOFError()) + nothing end # Disables dual stack mode. @@ -214,17 +231,17 @@ const UV_UDP_REUSEADDR = 4 ## -_bind(sock::TCPServer, host::IPv4, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, Cuint), - sock.handle, hton(port), hton(host.host), flags) - -_bind(sock::TCPServer, host::IPv6, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_tcp_bind6, Int32, (Ptr{Cvoid}, UInt16, Ptr{UInt128}, Cuint), - sock.handle, hton(port), Ref(hton(host.host)), flags) - -_bind(sock::UDPSocket, host::IPv4, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, UInt32, UInt32), - sock.handle, hton(port), hton(host.host), flags) +function _bind(sock::TCPServer, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0)) + host_in = Ref(hton(host.host)) + return ccall(:jl_tcp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint), + sock, hton(port), host_in, flags, host isa IPv6) +end -_bind(sock::UDPSocket, host::IPv6, port::UInt16, flags::UInt32 = UInt32(0)) = ccall(:jl_udp_bind6, Int32, (Ptr{Cvoid}, UInt16, Ptr{UInt128}, UInt32), - sock.handle, hton(port), Ref(hton(host.host)), flags) +function _bind(sock::UDPSocket, host::Union{IPv4, IPv6}, port::UInt16, flags::UInt32=UInt32(0)) + host_in = Ref(hton(host.host)) + return ccall(:jl_udp_bind, Int32, (Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cuint, Cint), + sock, hton(port), host_in, flags, host isa IPv6) +end """ bind(socket::Union{UDPSocket, TCPSocket}, host::IPAddr, port::Integer; ipv6only=false, reuseaddr=false, kws...) @@ -240,14 +257,16 @@ function bind(sock::Union{TCPServer, UDPSocket}, host::IPAddr, port::Integer; ip error("$(typeof(sock)) is not in initialization state") end flags = 0 - if isa(host,IPv6) && ipv6only + if isa(host, IPv6) && ipv6only flags |= isa(sock, UDPSocket) ? UV_UDP_IPV6ONLY : UV_TCP_IPV6ONLY end if isa(sock, UDPSocket) && reuseaddr flags |= UV_UDP_REUSEADDR end + iolock_begin() err = _bind(sock, host, UInt16(port), UInt32(flags)) if err < 0 + iolock_end() if err != UV_EADDRINUSE && err != UV_EACCES && err != UV_EADDRNOTAVAIL #TODO: this codepath is not currently tested throw(_UVError("bind", err)) @@ -257,13 +276,14 @@ function bind(sock::Union{TCPServer, UDPSocket}, host::IPAddr, port::Integer; ip end sock.status = StatusOpen isa(sock, UDPSocket) && setopt(sock; kws...) + iolock_end() return true end bind(sock::TCPServer, addr::InetAddr) = bind(sock, addr.host, addr.port) """ - setopt(sock::UDPSocket; multicast_loop = nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) + setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) Set UDP socket options. @@ -273,22 +293,25 @@ Set UDP socket options. messages, or else the UDP system will return an access error (default: `false`). * `ttl`: Time-to-live of packets sent on the socket (default: `nothing`). """ -function setopt(sock::UDPSocket; multicast_loop = nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) +function setopt(sock::UDPSocket; multicast_loop=nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing) + iolock_begin() if sock.status == StatusUninit error("Cannot set options on uninitialized socket") end if multicast_loop !== nothing - uv_error("multicast_loop",ccall(:uv_udp_set_multicast_loop,Cint,(Ptr{Cvoid},Cint),sock.handle,multicast_loop) < 0) + uv_error("multicast_loop", ccall(:uv_udp_set_multicast_loop, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_loop) < 0) end if multicast_ttl !== nothing - uv_error("multicast_ttl",ccall(:uv_udp_set_multicast_ttl,Cint,(Ptr{Cvoid},Cint),sock.handle,multicast_ttl)) + uv_error("multicast_ttl", ccall(:uv_udp_set_multicast_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, multicast_ttl)) end if enable_broadcast !== nothing - uv_error("enable_broadcast",ccall(:uv_udp_set_broadcast,Cint,(Ptr{Cvoid},Cint),sock.handle,enable_broadcast)) + uv_error("enable_broadcast", ccall(:uv_udp_set_broadcast, Cint, (Ptr{Cvoid}, Cint), sock.handle, enable_broadcast)) end if ttl !== nothing - uv_error("ttl",ccall(:uv_udp_set_ttl,Cint,(Ptr{Cvoid},Cint),sock.handle,ttl)) + uv_error("ttl", ccall(:uv_udp_set_ttl, Cint, (Ptr{Cvoid}, Cint), sock.handle, ttl)) end + iolock_end() + nothing end """ @@ -308,82 +331,123 @@ Read a UDP packet from the specified socket, returning a tuple of `(address, dat `address` will be either IPv4 or IPv6 as appropriate. """ function recvfrom(sock::UDPSocket) + iolock_begin() # If the socket has not been bound, it will be bound implicitly to ::0 and a random port if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive error("UDPSocket is not initialized and open") end if ccall(:uv_is_active, Cint, (Ptr{Cvoid},), sock.handle) == 0 - uv_error("recv_start", ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), - sock.handle, Base.uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_recvcb::Ptr{Cvoid})) + err = ccall(:uv_udp_recv_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), + sock, Base.uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_recvcb::Ptr{Cvoid}) + uv_error("recv_start", err) end sock.status = StatusActive - return stream_wait(sock, sock.recvnotify)::Tuple{Union{IPv4, IPv6}, Vector{UInt8}} + lock(sock.recvnotify) + iolock_end() + try + From = Union{InetAddr{IPv4}, InetAddr{IPv6}} + Data = Vector{UInt8} + from, data = wait(sock.recvnotify)::Tuple{From, Data} + return (from.host, data) + finally + unlock(sock.recvnotify) + end end -alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), size) +alloc_buf_hook(sock::UDPSocket, size::UInt) = (Libc.malloc(size), size) # size is always 64k from libuv function uv_recvcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}, addr::Ptr{Cvoid}, flags::Cuint) - # C signature documented as (*uv_udp_recv_cb)(...) sock = @handle_as handle UDPSocket - if nread < 0 - Libc.free(buf_addr) - notify_error(sock.recvnotify, _UVError("recv", nread)) - elseif flags & UV_UDP_PARTIAL > 0 - Libc.free(buf_addr) - notify_error(sock.recvnotify, "Partial message received") - else - buf_addr = ccall(:jl_uv_buf_base, Ptr{Cvoid}, (Ptr{Cvoid},), buf) - buf_size = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf) - # need to check the address type in order to convert to a Julia IPAddr - addrout = if addr == C_NULL - IPv4(0) - elseif ccall(:jl_sockaddr_in_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1 - IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr))) - else - tmp = [UInt128(0)] - ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt8}), addr, pointer(tmp)) - IPv6(ntoh(tmp[1])) - end - buf = unsafe_wrap(Array, convert(Ptr{UInt8}, buf_addr), Int(nread), own = true) - notify(sock.recvnotify, (addrout, buf)) - end - ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock.handle) - sock.status = StatusOpen + lock(sock.recvnotify) + try + buf_addr = ccall(:jl_uv_buf_base, Ptr{UInt8}, (Ptr{Cvoid},), buf) + if nread == 0 && addr == C_NULL + Libc.free(buf_addr) + elseif nread < 0 + Libc.free(buf_addr) + notify_error(sock.recvnotify, _UVError("recv", nread)) + elseif flags & UV_UDP_PARTIAL > 0 + Libc.free(buf_addr) + notify_error(sock.recvnotify, "Partial message received") + else + buf_size = Int(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)) + if buf_size - nread < 16384 # waste at most 16k (note: buf_size is currently always 64k) + buf = unsafe_wrap(Array, buf_addr, nread, own=true) + else + buf = Vector{UInt8}(undef, nread) + GC.@preserve buf unsafe_copyto!(pointer(buf), buf_addr, nread) + Libc.free(buf_addr) + end + # need to check the address type in order to convert to a Julia IPAddr + host = IPv4(0) + port = UInt16(0) + if ccall(:jl_sockaddr_is_ip4, Cint, (Ptr{Cvoid},), addr) == 1 + host = IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), addr))) + port = ntoh(ccall(:jl_sockaddr_port4, UInt16, (Ptr{Cvoid},), addr)) + elseif ccall(:jl_sockaddr_is_ip6, Cint, (Ptr{Cvoid},), addr) == 1 + tmp = Ref{UInt128}(0) + scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ptr{UInt128}), addr, tmp) + host = IPv6(ntoh(tmp[])) + port = ntoh(ccall(:jl_sockaddr_port6, UInt16, (Ptr{Cvoid},), addr)) + end + from = InetAddr(host, port) + notify(sock.recvnotify, (from, buf), all=false) + end + if sock.status == StatusActive && isempty(sock.recvnotify) + sock.status = StatusOpen + ccall(:uv_udp_recv_stop, Cint, (Ptr{Cvoid},), sock) + end + finally + unlock(sock.recvnotify) + end nothing end -function _send(sock::UDPSocket, ipaddr::IPv4, port::UInt16, buf) - ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, UInt16, UInt32, Ptr{UInt8}, Csize_t, Ptr{Cvoid}), - sock.handle, hton(port), hton(ipaddr.host), buf, sizeof(buf), uv_jl_sendcb::Ptr{Cvoid}) -end - -function _send(sock::UDPSocket, ipaddr::IPv6, port::UInt16, buf) - ccall(:jl_udp_send6, Cint, (Ptr{Cvoid}, UInt16, Ref{UInt128}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}), - sock.handle, hton(port), hton(ipaddr.host), buf, sizeof(buf), uv_jl_sendcb::Ptr{Cvoid}) +function _send_async(sock::UDPSocket, ipaddr::Union{IPv4, IPv6}, port::UInt16, buf) + req = Libc.malloc(Base._sizeof_uv_udp_send) + uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call + host_in = Ref(hton(ipaddr.host)) + err = ccall(:jl_udp_send, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Ptr{UInt8}, Csize_t, Ptr{Cvoid}, Cint), + req, sock, hton(port), host_in, buf, sizeof(buf), Base.uv_jl_writecb_task::Ptr{Cvoid}, ipaddr isa IPv6) + if err < 0 + Libc.free(req) + uv_error("send", err) + end + return req end """ - send(socket::UDPSocket, host, port::Integer, msg) + send(socket::UDPSocket, host::IPAddr, port::Integer, msg) Send `msg` over `socket` to `host:port`. """ -function send(sock::UDPSocket,ipaddr,port,msg) +function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg) # If the socket has not been bound, it will be bound implicitly to ::0 and a random port + iolock_begin() if sock.status != StatusInit && sock.status != StatusOpen && sock.status != StatusActive error("UDPSocket is not initialized and open") end - uv_error("send", _send(sock, ipaddr, UInt16(port), msg)) - stream_wait(sock, sock.sendnotify) - nothing -end - -function uv_sendcb(handle::Ptr{Cvoid}, status::Cint) - sock = @handle_as handle UDPSocket - if status < 0 - notify_error(sock.sendnotify, _UVError("UDP send failed", status)) + uvw = _send_async(sock, ipaddr, UInt16(port), msg) + ct = current_task() + preserve_handle(ct) + uv_req_set_data(uvw, ct) + iolock_end() + status = try + wait()::Cint + finally + iolock_begin() + if uv_req_data(uvw) != C_NULL + # uvw is still alive, + # so make sure we won't get spurious notifications later + uv_req_set_data(uvw, C_NULL) + else + # done with uvw + Libc.free(uvw) + end + iolock_end() + unpreserve_handle(ct) end - notify(sock.sendnotify) - Libc.free(handle) + uv_error("send", status) nothing end @@ -394,16 +458,18 @@ function uv_connectcb(conn::Ptr{Cvoid}, status::Cint) sock = @handle_as hand LibuvStream lock(sock.cond) try - if status >= 0 + if status >= 0 # success if !(sock.status == StatusClosed || sock.status == StatusClosing) sock.status = StatusOpen end - notify(sock.cond) else - ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand) - err = _UVError("connect", status) - notify_error(sock.cond, err) + sock.readerror = _UVError("connect", status) # TODO: perhaps we should not reuse readerror for this + if !(sock.status == StatusClosed || sock.status == StatusClosing) + ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), hand) + sock.status = StatusClosing + end end + notify(sock.cond) finally unlock(sock.cond) end @@ -411,34 +477,47 @@ function uv_connectcb(conn::Ptr{Cvoid}, status::Cint) nothing end -function connect!(sock::TCPSocket, host::IPv4, port::Integer) +function connect!(sock::TCPSocket, host::Union{IPv4, IPv6}, port::Integer) + iolock_begin() if sock.status != StatusInit error("TCPSocket is not in initialization state") end if !(0 <= port <= typemax(UInt16)) throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port")) end - uv_error("connect", ccall(:jl_tcp4_connect, Int32, (Ptr{Cvoid}, UInt32, UInt16, Ptr{Cvoid}), - sock.handle, hton(host.host), hton(UInt16(port)), uv_jl_connectcb::Ptr{Cvoid})) + host_in = Ref(hton(host.host)) + uv_error("connect", ccall(:jl_tcp_connect, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Ptr{Cvoid}, Cint), + sock, host_in, hton(UInt16(port)), uv_jl_connectcb::Ptr{Cvoid}, host isa IPv6)) sock.status = StatusConnecting + iolock_end() nothing end -function connect!(sock::TCPSocket, host::IPv6, port::Integer) - if sock.status != StatusInit - error("TCPSocket is not in initialization state") - end - if !(0 <= port <= typemax(UInt16)) - throw(ArgumentError("port out of range, must be 0 ≤ port ≤ 65535, got $port")) +connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port) + +function wait_connected(x::LibuvStream) + iolock_begin() + check_open(x) + isopen(x) || x.readerror === nothing || throw(x.readerror) + preserve_handle(x) + lock(x.cond) + try + while x.status == StatusConnecting + iolock_end() + wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) + end + isopen(x) || x.readerror === nothing || throw(x.readerror) + finally + unlock(x.cond) + unpreserve_handle(x) end - uv_error("connect", ccall(:jl_tcp6_connect, Int32, (Ptr{Cvoid}, Ref{UInt128}, UInt16, Ptr{Cvoid}), - sock.handle, hton(host.host), hton(UInt16(port)), uv_jl_connectcb::Ptr{Cvoid})) - sock.status = StatusConnecting + iolock_end() nothing end -connect!(sock::TCPSocket, addr::InetAddr) = connect!(sock, addr.host, addr.port) - # Default Host to localhost """ @@ -459,9 +538,7 @@ function connect!(sock::TCPSocket, host::AbstractString, port::Integer) error("TCPSocket is not in initialization state") end ipaddr = getaddrinfo(host) - sock.status = StatusInit - connect!(sock,ipaddr,port) - sock.status = StatusConnecting + connect!(sock, ipaddr, port) return sock end @@ -478,7 +555,12 @@ Enables or disables Nagle's algorithm on a given TCP server or socket. """ function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool) # disable or enable Nagle's algorithm on all OSes - ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) + Sockets.iolock_begin() + Sockets.check_open(sock) + err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) + # TODO: check err + Sockets.iolock_end() + return err end """ @@ -487,12 +569,16 @@ end On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`. """ function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool) + Sockets.iolock_begin() + Sockets.check_open(sock) @static if Sys.islinux() # tcp_quickack is a linux only option if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0 @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1 end end + Sockets.iolock_end() + nothing end @@ -512,36 +598,13 @@ reject them. The default value of `backlog` is 511. """ function listen(addr; backlog::Integer=BACKLOG_DEFAULT) sock = TCPServer() - !bind(sock, addr) && error("cannot bind to port; may already be in use or access denied") + bind(sock, addr) || error("cannot bind to port; may already be in use or access denied") listen(sock; backlog=backlog) return sock end listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(localhost, port; backlog=backlog) listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host, port); backlog=backlog) -function listen(callback, server::Union{TCPSocket, UDPSocket}) - @async begin - local client = TCPSocket() - lock(server.cond) - try - while isopen(server) - err = accept_nonblock(server, client) - if err == 0 - callback(client) - client = TCPSocket() - elseif err != UV_EAGAIN - uv_error("accept", err) - else - stream_wait(server, server.cond) - end - end - finally - unlock(server.cond) - end - end - return sock -end - function listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) uv_error("listen", trylisten(sock)) return sock @@ -564,16 +627,19 @@ function uv_connectioncb(stream::Ptr{Cvoid}, status::Cint) end function trylisten(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) + iolock_begin() check_open(sock) err = ccall(:uv_listen, Cint, (Ptr{Cvoid}, Cint, Ptr{Cvoid}), sock, backlog, uv_jl_connectioncb::Ptr{Cvoid}) sock.status = StatusActive + iolock_end() return err end ## function accept_nonblock(server::TCPServer, client::TCPSocket) + iolock_begin() if client.status != StatusInit error("client TCPSocket is not in initialization state") end @@ -581,6 +647,7 @@ function accept_nonblock(server::TCPServer, client::TCPSocket) if err == 0 client.status = StatusOpen end + iolock_end() return err end @@ -591,24 +658,31 @@ function accept_nonblock(server::TCPServer) end function accept(server::LibuvServer, client::LibuvStream) - if server.status != StatusActive + iolock_begin() + if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed throw(ArgumentError("server not connected, make sure \"listen\" has been called")) end while isopen(server) err = accept_nonblock(server, client) if err == 0 + iolock_end() return client elseif err != UV_EAGAIN uv_error("accept", err) end + preserve_handle(server) lock(server.cond) + iolock_end() try - stream_wait(server, server.cond) + wait(server.cond) finally unlock(server.cond) + unpreserve_handle(server) end + iolock_begin() end uv_error("accept", UV_ECONNABORTED) + nothing end ## Utility functions @@ -663,6 +737,7 @@ function _sockname(sock, self=true) raddress = zeros(UInt8, 16) rfamily = Ref{Cuint}(0) + iolock_begin() if self r = ccall(:jl_tcp_getsockname, Int32, (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}), @@ -672,6 +747,7 @@ function _sockname(sock, self=true) (Ptr{Cvoid}, Ref{Cushort}, Ptr{Cvoid}, Ref{Cuint}), sock.handle, rport, raddress, rfamily) end + iolock_end() uv_error("cannot obtain socket name", r) if r == 0 port = ntoh(rport[]) @@ -713,7 +789,6 @@ function __init__() global uv_jl_getaddrinfocb = @cfunction(uv_getaddrinfocb, Cvoid, (Ptr{Cvoid}, Cint, Ptr{Cvoid})) global uv_jl_getnameinfocb = @cfunction(uv_getnameinfocb, Cvoid, (Ptr{Cvoid}, Cint, Cstring, Cstring)) global uv_jl_recvcb = @cfunction(uv_recvcb, Cvoid, (Ptr{Cvoid}, Cssize_t, Ptr{Cvoid}, Ptr{Cvoid}, Cuint)) - global uv_jl_sendcb = @cfunction(uv_sendcb, Cvoid, (Ptr{Cvoid}, Cint)) global uv_jl_connectioncb = @cfunction(uv_connectioncb, Cvoid, (Ptr{Cvoid}, Cint)) global uv_jl_connectcb = @cfunction(uv_connectcb, Cvoid, (Ptr{Cvoid}, Cint)) end diff --git a/stdlib/Sockets/src/addrinfo.jl b/stdlib/Sockets/src/addrinfo.jl index a799a731cbe95..e4e26ff0cdc24 100644 --- a/stdlib/Sockets/src/addrinfo.jl +++ b/stdlib/Sockets/src/addrinfo.jl @@ -16,7 +16,7 @@ function uv_getaddrinfocb(req::Ptr{Cvoid}, status::Cint, addrinfo::Ptr{Cvoid}) t = unsafe_pointer_to_objref(data)::Task uv_req_set_data(req, C_NULL) if status != 0 || addrinfo == C_NULL - schedule(t, _UVError("getaddrinfocb", status)) + schedule(t, _UVError("getaddrinfo", status)) else freeaddrinfo = addrinfo addrs = IPAddr[] @@ -59,6 +59,7 @@ julia> getalladdrinfo("google.com") function getalladdrinfo(host::String) req = Libc.malloc(Base._sizeof_uv_getaddrinfo) uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call + iolock_begin() status = ccall(:jl_getaddrinfo, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), req, host, #=service=#C_NULL, uv_jl_getaddrinfocb::Ptr{Cvoid}) if status < 0 @@ -72,8 +73,9 @@ function getalladdrinfo(host::String) end ct = current_task() preserve_handle(ct) + uv_req_set_data(req, ct) + iolock_end() r = try - uv_req_set_data(req, ct) wait() finally if uv_req_data(req) != C_NULL @@ -98,7 +100,7 @@ function getalladdrinfo(host::String) elseif code == UV_EAI_MEMORY throw(OutOfMemoryError()) else - throw(_UVError("getaddrinfo", code)) + throw(r) end end return r::Vector{IPAddr} @@ -129,7 +131,7 @@ function uv_getnameinfocb(req::Ptr{Cvoid}, status::Cint, hostname::Cstring, serv t = unsafe_pointer_to_objref(data)::Task uv_req_set_data(req, C_NULL) if status != 0 - schedule(t, _UVError("getnameinfocb", status)) + schedule(t, _UVError("getnameinfo", status)) else schedule(t, unsafe_string(hostname)) end @@ -155,18 +157,14 @@ julia> getnameinfo(Sockets.IPv4("8.8.8.8")) function getnameinfo(address::Union{IPv4, IPv6}) req = Libc.malloc(Base._sizeof_uv_getnameinfo) uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call - ev = eventloop() port = hton(UInt16(0)) flags = 0 uvcb = uv_jl_getnameinfocb::Ptr{Cvoid} status = UV_EINVAL - if address isa IPv4 - status = ccall(:jl_getnameinfo, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt32, UInt16, Cint, Ptr{Cvoid}), - ev, req, hton(address.host), port, flags, uvcb) - elseif address isa IPv6 - status = ccall(:jl_getnameinfo6, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ref{UInt128}, UInt16, Cint, Ptr{Cvoid}), - ev, req, hton(address.host), port, flags, uvcb) - end + host_in = Ref(hton(address.host)) + iolock_begin() + status = ccall(:jl_getnameinfo, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}, UInt16, Cint, Ptr{Cvoid}, Cint), + eventloop(), req, host_in, port, flags, uvcb, address isa IPv6) if status < 0 Libc.free(req) if status == UV_EINVAL @@ -178,8 +176,9 @@ function getnameinfo(address::Union{IPv4, IPv6}) end ct = current_task() preserve_handle(ct) + uv_req_set_data(req, ct) + iolock_end() r = try - uv_req_set_data(req, ct) wait() finally if uv_req_data(req) != C_NULL @@ -204,7 +203,7 @@ function getnameinfo(address::Union{IPv4, IPv6}) elseif code == UV_EAI_MEMORY throw(OutOfMemoryError()) else - throw(_UVError("getnameinfo", code)) + throw(r) end end return r::String @@ -290,9 +289,9 @@ function getipaddrs(addr_type::Type{T}=IPAddr; loopback::Bool=false) where T<:IP end end sockaddr = ccall(:jl_uv_interface_address_sockaddr, Ptr{Cvoid}, (Ptr{UInt8},), current_addr) - if IPv4 <: T && ccall(:jl_sockaddr_in_is_ip4, Int32, (Ptr{Cvoid},), sockaddr) == 1 + if IPv4 <: T && ccall(:jl_sockaddr_is_ip4, Int32, (Ptr{Cvoid},), sockaddr) == 1 push!(addresses, IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), sockaddr)))) - elseif IPv6 <: T && ccall(:jl_sockaddr_in_is_ip6, Int32, (Ptr{Cvoid},), sockaddr) == 1 + elseif IPv6 <: T && ccall(:jl_sockaddr_is_ip6, Int32, (Ptr{Cvoid},), sockaddr) == 1 addr6 = Ref{UInt128}() scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ref{UInt128},), sockaddr, addr6) push!(addresses, IPv6(ntoh(addr6[]))) diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 1263a81d3f381..a7d31bb40a092 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -142,15 +142,20 @@ defaultport = rand(2000:4000) mktempdir() do tmpdir socketname = Sys.iswindows() ? ("\\\\.\\pipe\\uv-test-" * randstring(6)) : joinpath(tmpdir, "socket") - s = listen(socketname) - tsk = @async begin - sock = accept(s) - write(sock, "Hello World\n") - close(s) - close(sock) + local nconn = 0 + srv = listen(socketname) + t = accept(srv) do client + write(client, "Hello World $(nconn += 1)\n") + close(client) + nconn == 3 && Base.wait_close(srv) end - @test read(connect(socketname), String) == "Hello World\n" - wait(tsk) + @test read(connect(socketname), String) == "Hello World 1\n" + @test read(connect(socketname), String) == "Hello World 2\n" + @test read(connect(socketname), String) == "Hello World 3\n" + conn = connect(socketname) + close(srv) + wait(t) + @test read(conn, String) == "" end end @@ -240,41 +245,68 @@ end bind(b, ip"127.0.0.1", randport + 1) @sync begin - # FIXME: check that we received all messages - for i = 1:3 - @async send(b, ip"127.0.0.1", randport, "Hello World") - @async String(recv(a)) == "Hello World" + let i = 0 + for _ = 1:30 + @async let msg = String(recv(a)) + @test msg == "Hello World $(i += 1)" + end + end + end + yield() + for i = 1:30 + send(b, ip"127.0.0.1", randport, "Hello World $i") end end - - tsk = @async send(b, ip"127.0.0.1", randport, "Hello World") - (addr, data) = recvfrom(a) - @test addr == ip"127.0.0.1" && String(data) == "Hello World" - wait(tsk) + let msg = Vector{UInt8}("fedcba9876543210"^36) # The minimum reassembly buffer size for IPv4 is 576 bytes + tsk = @async @test recv(a) == msg + @test send(b, ip"127.0.0.1", randport, msg) === nothing + wait(tsk) + end + let msg = Vector{UInt8}("1234"^16377) # The maximum size of an IPv4 datagram is 65535 bytes, including the header + @test_throws(Base._UVError("send", Base.UV_EMSGSIZE), + send(b, ip"127.0.0.1", randport, msg)) + pop!(msg) + tsk = @async recv(a) + try + send(b, ip"127.0.0.1", randport, msg) + catch ex + if !(ex isa Base.IOError && ex.code == Base.UV_EMSGSIZE) || Sys.islinux() || Sys.iswindows() + # this is allowed failure on some platforms which might further restrict + # the maximum packet size being sent (even locally), such as BSD's `sysctl net.inet.udp.maxdgram` + rethrow() + end + empty!(msg) + send(b, ip"127.0.0.1", randport, msg) # check that the socket is still alive + end + @test fetch(tsk) == msg + end + let tsk = @async send(b, ip"127.0.0.1", randport, "WORLD HELLO") + (addr, data) = recvfrom(a) + @test addr == ip"127.0.0.1" && String(data) == "WORLD HELLO" + wait(tsk) + end close(a) close(b) end @test_throws MethodError bind(UDPSocket(), randport) - if !Sys.iswindows() || Sys.windows_version() >= Sys.WINDOWS_VISTA_VER let a = UDPSocket() b = UDPSocket() bind(a, ip"::1", UInt16(randport)) bind(b, ip"::1", UInt16(randport + 1)) - tsk = @async begin - @test begin - (addr, data) = recvfrom(a) - addr == ip"::1" && String(data) == "Hello World" + for i = 1:3 + tsk = @async begin + let (addr, data) = recvfrom(a) + @test addr == ip"::1" + @test String(data) == "Hello World" + end end + send(b, ip"::1", randport, "Hello World") + wait(tsk) end - send(b, ip"::1", randport, "Hello World") - wait(tsk) - send(b, ip"::1", randport, "Hello World") - wait(tsk) - end end end diff --git a/test/channels.jl b/test/channels.jl index ae787f71c9bed..2583e2f8e4e39 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -238,26 +238,29 @@ end # interpreting the calling function. @noinline garbage_finalizer(f) = (finalizer(f, "gar" * "bage"); nothing) run = Ref(0) - GC.enable(false) + garbage_finalizer(x -> nothing) # warmup + @test GC.enable(false) # test for finalizers trying to yield leading to failed attempts to context switch garbage_finalizer((x) -> (run[] += 1; sleep(1))) garbage_finalizer((x) -> (run[] += 1; yield())) garbage_finalizer((x) -> (run[] += 1; yieldto(@task () -> ()))) t = @task begin - GC.enable(true) + @test !GC.enable(true) GC.gc() + true end oldstderr = stderr - local newstderr, errstream + newstderr = redirect_stderr() + local errstream try - newstderr = redirect_stderr() errstream = @async read(newstderr[1], String) yield(t) finally redirect_stderr(oldstderr) close(newstderr[2]) end - Base.wait(t) + @test istaskdone(t) + @test fetch(t) @test run[] == 3 @test fetch(errstream) == """ error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer") @@ -267,8 +270,8 @@ end # test for invalid state in Workqueue during yield t = @async nothing t.state = :invalid + newstderr = redirect_stderr() try - newstderr = redirect_stderr() errstream = @async read(newstderr[1], String) yield() finally @@ -292,68 +295,78 @@ end end @testset "Timer / AsyncCondition triggering and race #12719" begin - tc = Ref(0) - t = Timer(0) do t - tc[] += 1 + let tc = Ref(0) + t = Timer(0) do t + tc[] += 1 + end + @test isopen(t) + Base.process_events() + @test !isopen(t) + @test tc[] == 0 + yield() + @test tc[] == 1 end - @test isopen(t) - Base.process_events() - @test !isopen(t) - @test tc[] == 0 - yield() - @test tc[] == 1 - - tc = Ref(0) - t = Timer(0) do t - tc[] += 1 + + let tc = Ref(0) + t = Timer(0) do t + tc[] += 1 + end + @test isopen(t) + close(t) + @test !isopen(t) + sleep(0.1) + @test tc[] == 0 end - @test isopen(t) - close(t) - @test !isopen(t) - sleep(0.1) - @test tc[] == 0 - - tc = Ref(0) - async = Base.AsyncCondition() do async - tc[] += 1 + + let tc = Ref(0) + async = Base.AsyncCondition() do async + tc[] += 1 + end + @test isopen(async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() # schedule event + Sys.iswindows() && Base.process_events() # schedule event (windows?) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + @test tc[] == 0 + yield() # consume event + @test tc[] == 1 + Sys.iswindows() && Base.process_events() # schedule event (windows?) + yield() # consume event + @test tc[] == 2 + sleep(0.1) # no further events + @test tc[] == 2 + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() # schedule event + Sys.iswindows() && Base.process_events() # schedule event (windows?) + close(async) # and close + @test !isopen(async) + @test tc[] == 2 + @test tc[] == 2 + yield() # consume event & then close + @test tc[] == 3 + sleep(0.1) # no further events + @test tc[] == 3 end - @test isopen(async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - Base.process_events() # schedule event - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - Sys.iswindows() && Base.process_events() # schedule event (windows?) - @test tc[] == 0 - yield() # consume event - @test tc[] == 1 - sleep(0.1) # no further events - @test tc[] == 1 - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - close(async) - @test !isopen(async) - @test tc[] == 1 - Base.process_events() # schedule event & then close - Sys.iswindows() && Base.process_events() # schedule event (windows?) - yield() # consume event & then close - @test tc[] == 2 - sleep(0.1) # no further events - @test tc[] == 2 - - tc = Ref(0) - async = Base.AsyncCondition() do async - tc[] += 1 + + let tc = Ref(0) + async = Base.AsyncCondition() do async + tc[] += 1 + end + @test isopen(async) + ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) + Base.process_events() # schedule event + Sys.iswindows() && Base.process_events() # schedule event (windows) + close(async) + @test !isopen(async) + Base.process_events() # and close + @test tc[] == 0 + yield() # consume event & then close + @test tc[] == 1 + sleep(0.1) + @test tc[] == 1 end - @test isopen(async) - ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async) - close(async) - @test !isopen(async) - Base.process_events() # schedule event & then close - Sys.iswindows() && Base.process_events() # schedule event (windows) - @test tc[] == 0 - yield() # consume event & then close - @test tc[] == 1 - sleep(0.1) - @test tc[] == 1 end @testset "check_channel_state" begin diff --git a/test/read.jl b/test/read.jl index 8e74e038553bf..5782481b76587 100644 --- a/test/read.jl +++ b/test/read.jl @@ -127,11 +127,11 @@ end open_streams = [] function cleanup() for s_ in open_streams - try close(s_); catch; end + close(s_) end empty!(open_streams) for tsk in tasks - Base.wait(tsk) + wait(tsk) end empty!(tasks) end diff --git a/test/spawn.jl b/test/spawn.jl index 122d6e65825de..d96cc43664335 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -314,7 +314,6 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(stdout, " 1\t", r infd = Base._fd(out.in) outfd = Base._fd(out.out) show(out, out) - notify(ready) @test isreadable(out) @test iswritable(out) close(out.in) @@ -333,11 +332,8 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(stdout, " 1\t", r if Sys.iswindows() # WINNT kernel appears to not provide a fast mechanism for async propagation # of EOF for a blocking stream, so just wait for it to catch up. - # This shouldn't take much more than 32ms. + # This shouldn't take much more than 32ms more. Base.wait_close(out) - # it's closed now, but the other task is expected to be behind this task - # in emptying the read buffer - @test isreadable(out) end @test !isopen(out) end