From 6a7d93a7d64871f4719b1bb768a5273ed6a5d0ac Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 12 Jun 2019 12:07:00 -0400 Subject: [PATCH] more bugfixes and docs --- base/coreio.jl | 1 - base/io.jl | 2 - base/process.jl | 4 +- base/stream.jl | 128 ++++++++++++------------ doc/src/devdocs/locks.md | 32 +++--- stdlib/FileWatching/src/FileWatching.jl | 1 + stdlib/Sockets/src/Sockets.jl | 2 +- test/read.jl | 4 +- 8 files changed, 88 insertions(+), 86 deletions(-) diff --git a/base/coreio.jl b/base/coreio.jl index 948b341410de8..2796c53e759f5 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -18,7 +18,6 @@ unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n close(::DevNull) = nothing flush(::DevNull) = nothing wait_readnb(::DevNull) = wait() -wait_readbyte(::DevNull) = wait() wait_close(::DevNull) = wait() eof(::DevNull) = true diff --git a/base/io.jl b/base/io.jl index cd919f7d46a31..047862ec227b3 100644 --- a/base/io.jl +++ b/base/io.jl @@ -62,7 +62,6 @@ Close an I/O stream. Performs a [`flush`](@ref) first. function close end function flush end function wait_readnb end -function wait_readbyte end function wait_close end function bytesavailable end @@ -259,7 +258,6 @@ iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)) isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io)) close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io))) wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb) -wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte) wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io))) """ diff --git a/base/process.jl b/base/process.jl index f01588fd2aa6a..f7797297e8fe8 100644 --- a/base/process.jl +++ b/base/process.jl @@ -999,14 +999,16 @@ function wait(x::Process) if !process_exited(x) preserve_handle(x) lock(x.exitnotify) + iolock_end() try wait(x.exitnotify) finally unlock(x.exitnotify) unpreserve_handle(x) end + else + iolock_end() end - iolock_end() nothing end diff --git a/base/stream.jl b/base/stream.jl index d1acdfde89c8f..d7ff4c8cc0b7f 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -326,35 +326,6 @@ function check_open(x::Union{LibuvStream, LibuvServer}) end end -function wait_readbyte(x::LibuvStream, c::UInt8) - iolock_begin() - if !isopen(x) || occursin(c, x.buffer) # fast path - iolock_end() - return - end - preserve_handle(x) - lock(x.cond) - try - while isopen(x) && !occursin(c, x.buffer) - x.readerror === nothing || throw(x.readerror) - start_reading(x) # ensure we are reading - iolock_end() - wait(x.cond) - unlock(x.cond) - iolock_begin() - lock(x.cond) - end - finally - if isempty(x.cond) - stop_reading(x) # stop reading iff there are currently no other read clients of the stream - end - unpreserve_handle(x) - unlock(x.cond) - end - iolock_end() - nothing -end - function wait_readnb(x::LibuvStream, nb::Int) iolock_begin() if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path @@ -390,27 +361,22 @@ function wait_readnb(x::LibuvStream, nb::Int) end function wait_close(x::Union{LibuvStream, LibuvServer}) - iolock_begin() preserve_handle(x) lock(x.cond) try while isopen(x) - iolock_end() wait(x.cond) - unlock(x.cond) - iolock_begin() - lock(x.cond) end finally unlock(x.cond) unpreserve_handle(x) end - iolock_end() nothing end function close(stream::Union{LibuvStream, LibuvServer}) iolock_begin() + should_wait = false if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing @@ -420,9 +386,9 @@ function close(stream::Union{LibuvStream, LibuvServer}) ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end - should_wait && wait_close(stream) end iolock_end() + should_wait && wait_close(stream) nothing end @@ -824,7 +790,7 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) return nread end - try + nread = try stop_reading(s) # Just playing it safe, since we are going to switch buffers. newbuf = PipeBuffer(a, maxsize = nb) newbuf.size = 0 # reset the write pointer to the beginning @@ -834,15 +800,15 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) wait_readnb(s, Int(nb)) iolock_begin() compact(newbuf) - nread = bytesavailable(newbuf) - return nread + bytesavailable(newbuf) finally s.buffer = sbuf if !isempty(s.cond) start_reading(s) # resume reading iff there are currently other read clients of the stream end - iolock_end() end + iolock_end() + return nread end function read(stream::LibuvStream) @@ -914,11 +880,31 @@ function readavailable(this::LibuvStream) return bytes end -function readuntil(this::LibuvStream, c::UInt8; keep::Bool=false) +function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false) iolock_begin() - wait_readbyte(this, c) - buf = this.buffer + buf = x.buffer @assert buf.seekable == false + if isopen(x) && !occursin(c, buf) # fast path + preserve_handle(x) + lock(x.cond) + try + while isopen(x) && !occursin(c, x.buffer) + x.readerror === nothing || throw(x.readerror) + start_reading(x) # ensure we are reading + iolock_end() + wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) + end + finally + if isempty(x.cond) + stop_reading(x) # stop reading iff there are currently no other read clients of the stream + end + unlock(x.cond) + unpreserve_handle(x) + end + end bytes = readuntil(buf, c, keep=keep) iolock_end() return bytes @@ -926,8 +912,8 @@ end uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p))) +# caller must have acquired the iolock function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) - iolock_begin() uvw = uv_write_async(s, p, n) ct = current_task() preserve_handle(ct) @@ -954,7 +940,7 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) if status < 0 throw(_UVError("write", status)) end - return Int(status) + return Int(n) end # helper function for uv_write that returns the uv_write_t struct for the write @@ -989,23 +975,24 @@ end # - large isbits arrays are unbuffered and written directly function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) - if s.sendbuf === nothing - return uv_write(s, p, UInt(n)) - end - - buf = s.sendbuf - totb = bytesavailable(buf) + n - if totb < buf.maxsize - nb = unsafe_write(buf, p, n) - else - flush(s) - if n > buf.maxsize - nb = uv_write(s, p, n) - else + while true + # try to add to the send buffer + iolock_begin() + buf = s.sendbuf + buf === nothing && break + totb = bytesavailable(buf) + n + if totb < buf.maxsize nb = unsafe_write(buf, p, n) + iolock_end() + return nb end + bytesavailable(buf) == 0 && break + # perform flush(s) + arr = take!(buf) + uv_write(s, arr) end - return nb + # perform the output to the kernel + return uv_write(s, p, n) end function flush(s::LibuvStream) @@ -1013,27 +1000,34 @@ function flush(s::LibuvStream) buf = s.sendbuf if buf !== nothing if bytesavailable(buf) > 0 - arr = take!(buf) # Array of UInt8s + arr = take!(buf) uv_write(s, arr) - iolock_end() return end end uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue - iolock_end() return end -buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf = PipeBuffer(bufsize); s) +function buffer_writes(s::LibuvStream, bufsize) + sendbuf = PipeBuffer(bufsize) + iolock_begin() + s.sendbuf = sendbuf + iolock_end() + return s +end ## low-level calls to libuv ## function write(s::LibuvStream, b::UInt8) buf = s.sendbuf if buf !== nothing + iolock_begin() if bytesavailable(buf) + 1 < buf.maxsize + iolock_end() return write(buf, b) end + iolock_end() end return write(s, Ref{UInt8}(b)) end @@ -1245,14 +1239,16 @@ function wait_readnb(s::BufferStream, nb::Int) end end -show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",bytesavailable(s.buffer),", isopen:", s.is_open) +show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open) -function wait_readbyte(s::BufferStream, c::UInt8) - lock(s.cond) do +function readuntil(s::BufferStream, c::UInt8; keep::Bool=false) + bytes = lock(s.cond) do while isopen(s) && !occursin(c, s.buffer) wait(s.cond) end + readuntil(s.buffer, c, keep=keep) end + return bytes end function wait_close(s::BufferStream) diff --git a/doc/src/devdocs/locks.md b/doc/src/devdocs/locks.md index 12dc8e4561c18..912fe663f325d 100644 --- a/doc/src/devdocs/locks.md +++ b/doc/src/devdocs/locks.md @@ -58,6 +58,17 @@ trying to acquire it: > > > > currently the lock is merged with the codegen lock, since they call each other recursively +The following lock synchronizes IO operation. Be aware that doing any I/O (for example, +printing warning messages or debug information) while holding any other lock listed above +may result in pernicious and hard-to-find deadlocks. BE VERY CAREFUL! + +> * iolock +> * Individual ThreadSynchronizers locks +> +> > this may continue to be held after releasing the iolock, or acquired without it, +> > but be very careful to never attempt to acquire the iolock while holding it + + The following is the root lock, meaning no other lock shall be held when trying to acquire it: > * toplevel @@ -95,37 +106,32 @@ Module serializer : toplevel lock JIT & type-inference : codegen lock -MethodInstance updates : codegen lock +MethodInstance/CodeInstance updates : Method->writelock, codegen lock -> * These fields are generally lazy initialized, using the test-and-test-and-set pattern. > * These are set at construction and immutable: -> > * specTypes > * sparam_vals > * def + > * These are set by `jl_type_infer` (while holding codegen lock): -> +> * cache > * rettype > * inferred -> * these can also be reset, see `jl_set_lambda_rettype` for that logic as it needs to keep `functionObjectsDecls` -> in sync + * valid ages + > * `inInference` flag: -> > * optimization to quickly avoid recurring into `jl_type_infer` while it is already running > * actual state (of setting `inferred`, then `fptr`) is protected by codegen lock -> * Function pointers (`jlcall_api` and `fptr`, `unspecialized_ducttape`): -> + +> * Function pointers: > * these transition once, from `NULL` to a value, while the codegen lock is held -> * Code-generator cache (the contents of `functionObjectsDecls`): > +> * Code-generator cache (the contents of `functionObjectsDecls`): > * these can transition multiple times, but only while the codegen lock is held > * it is valid to use old version of this, or block for new versions of this, so races are benign, > as long as the code is careful not to reference other data in the method instance (such as `rettype`) > and assume it is coordinated, unless also holding the codegen lock -> * `compile_traced` flag: > -> * unknown - LLVMContext : codegen lock Method : Method->writelock diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 04aa05b78395c..9c241e3c7bf21 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -168,6 +168,7 @@ mutable struct _FDWatcher elseif FDWatchers[fdnum] !== nothing this = FDWatchers[fdnum]::_FDWatcher this.refcount = (this.refcount[1] + Int(readable), this.refcount[2] + Int(writable)) + iolock_end() return this end if ccall(:jl_uv_unix_fd_is_watched, Int32, (RawFD, Ptr{Cvoid}, Ptr{Cvoid}), fd, C_NULL, eventloop()) == 1 diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 2fe9a54e46bd7..f883bbdd29c71 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -664,8 +664,8 @@ function accept(server::LibuvServer, client::LibuvStream) end preserve_handle(server) lock(server.cond) + iolock_end() try - iolock_end() wait(server.cond) finally unlock(server.cond) diff --git a/test/read.jl b/test/read.jl index 8e74e038553bf..5782481b76587 100644 --- a/test/read.jl +++ b/test/read.jl @@ -127,11 +127,11 @@ end open_streams = [] function cleanup() for s_ in open_streams - try close(s_); catch; end + close(s_) end empty!(open_streams) for tsk in tasks - Base.wait(tsk) + wait(tsk) end empty!(tasks) end