Skip to content

Commit

Permalink
more bugfixes and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Jun 12, 2019
1 parent 075d9c8 commit 6a7d93a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 86 deletions.
1 change: 0 additions & 1 deletion base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
close(::DevNull) = nothing
flush(::DevNull) = nothing
wait_readnb(::DevNull) = wait()
wait_readbyte(::DevNull) = wait()
wait_close(::DevNull) = wait()
eof(::DevNull) = true

Expand Down
2 changes: 0 additions & 2 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ Close an I/O stream. Performs a [`flush`](@ref) first.
function close end
function flush end
function wait_readnb end
function wait_readbyte end
function wait_close end
function bytesavailable end

Expand Down Expand Up @@ -259,7 +258,6 @@ iswritable(io::AbstractPipe) = iswritable(pipe_writer(io))
isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io))
close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io)))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb)
wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io)))

"""
Expand Down
4 changes: 3 additions & 1 deletion base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -999,14 +999,16 @@ function wait(x::Process)
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
end
else
iolock_end()
end
iolock_end()
nothing
end

Expand Down
128 changes: 62 additions & 66 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -326,35 +326,6 @@ function check_open(x::Union{LibuvStream, LibuvServer})
end
end

function wait_readbyte(x::LibuvStream, c::UInt8)
iolock_begin()
if !isopen(x) || occursin(c, x.buffer) # fast path
iolock_end()
return
end
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && !occursin(c, x.buffer)
x.readerror === nothing || throw(x.readerror)
start_reading(x) # ensure we are reading
iolock_end()
wait(x.cond)
unlock(x.cond)
iolock_begin()
lock(x.cond)
end
finally
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
unpreserve_handle(x)
unlock(x.cond)
end
iolock_end()
nothing
end

function wait_readnb(x::LibuvStream, nb::Int)
iolock_begin()
if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path
Expand Down Expand Up @@ -390,27 +361,22 @@ function wait_readnb(x::LibuvStream, nb::Int)
end

function wait_close(x::Union{LibuvStream, LibuvServer})
iolock_begin()
preserve_handle(x)
lock(x.cond)
try
while isopen(x)
iolock_end()
wait(x.cond)
unlock(x.cond)
iolock_begin()
lock(x.cond)
end
finally
unlock(x.cond)
unpreserve_handle(x)
end
iolock_end()
nothing
end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
Expand All @@ -420,9 +386,9 @@ function close(stream::Union{LibuvStream, LibuvServer})
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
should_wait && wait_close(stream)
end
iolock_end()
should_wait && wait_close(stream)
nothing
end

Expand Down Expand Up @@ -824,7 +790,7 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
return nread
end

try
nread = try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
newbuf = PipeBuffer(a, maxsize = nb)
newbuf.size = 0 # reset the write pointer to the beginning
Expand All @@ -834,15 +800,15 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
wait_readnb(s, Int(nb))
iolock_begin()
compact(newbuf)
nread = bytesavailable(newbuf)
return nread
bytesavailable(newbuf)
finally
s.buffer = sbuf
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
iolock_end()
end
iolock_end()
return nread
end

function read(stream::LibuvStream)
Expand Down Expand Up @@ -914,20 +880,40 @@ function readavailable(this::LibuvStream)
return bytes
end

function readuntil(this::LibuvStream, c::UInt8; keep::Bool=false)
function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false)
iolock_begin()
wait_readbyte(this, c)
buf = this.buffer
buf = x.buffer
@assert buf.seekable == false
if isopen(x) && !occursin(c, buf) # fast path
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && !occursin(c, x.buffer)
x.readerror === nothing || throw(x.readerror)
start_reading(x) # ensure we are reading
iolock_end()
wait(x.cond)
unlock(x.cond)
iolock_begin()
lock(x.cond)
end
finally
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
unlock(x.cond)
unpreserve_handle(x)
end
end
bytes = readuntil(buf, c, keep=keep)
iolock_end()
return bytes
end

uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p)))

# caller must have acquired the iolock
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
iolock_begin()
uvw = uv_write_async(s, p, n)
ct = current_task()
preserve_handle(ct)
Expand All @@ -954,7 +940,7 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
if status < 0
throw(_UVError("write", status))
end
return Int(status)
return Int(n)
end

# helper function for uv_write that returns the uv_write_t struct for the write
Expand Down Expand Up @@ -989,51 +975,59 @@ end
# - large isbits arrays are unbuffered and written directly

function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
if s.sendbuf === nothing
return uv_write(s, p, UInt(n))
end

buf = s.sendbuf
totb = bytesavailable(buf) + n
if totb < buf.maxsize
nb = unsafe_write(buf, p, n)
else
flush(s)
if n > buf.maxsize
nb = uv_write(s, p, n)
else
while true
# try to add to the send buffer
iolock_begin()
buf = s.sendbuf
buf === nothing && break
totb = bytesavailable(buf) + n
if totb < buf.maxsize
nb = unsafe_write(buf, p, n)
iolock_end()
return nb
end
bytesavailable(buf) == 0 && break
# perform flush(s)
arr = take!(buf)
uv_write(s, arr)
end
return nb
# perform the output to the kernel
return uv_write(s, p, n)
end

function flush(s::LibuvStream)
iolock_begin()
buf = s.sendbuf
if buf !== nothing
if bytesavailable(buf) > 0
arr = take!(buf) # Array of UInt8s
arr = take!(buf)
uv_write(s, arr)
iolock_end()
return
end
end
uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue
iolock_end()
return
end

buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf = PipeBuffer(bufsize); s)
function buffer_writes(s::LibuvStream, bufsize)
sendbuf = PipeBuffer(bufsize)
iolock_begin()
s.sendbuf = sendbuf
iolock_end()
return s
end

## low-level calls to libuv ##

function write(s::LibuvStream, b::UInt8)
buf = s.sendbuf
if buf !== nothing
iolock_begin()
if bytesavailable(buf) + 1 < buf.maxsize
iolock_end()
return write(buf, b)
end
iolock_end()
end
return write(s, Ref{UInt8}(b))
end
Expand Down Expand Up @@ -1245,14 +1239,16 @@ function wait_readnb(s::BufferStream, nb::Int)
end
end

show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",bytesavailable(s.buffer),", isopen:", s.is_open)
show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open)

function wait_readbyte(s::BufferStream, c::UInt8)
lock(s.cond) do
function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)
bytes = lock(s.cond) do
while isopen(s) && !occursin(c, s.buffer)
wait(s.cond)
end
readuntil(s.buffer, c, keep=keep)
end
return bytes
end

function wait_close(s::BufferStream)
Expand Down
32 changes: 19 additions & 13 deletions doc/src/devdocs/locks.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ trying to acquire it:
> >
> > currently the lock is merged with the codegen lock, since they call each other recursively
The following lock synchronizes IO operation. Be aware that doing any I/O (for example,
printing warning messages or debug information) while holding any other lock listed above
may result in pernicious and hard-to-find deadlocks. BE VERY CAREFUL!

> * iolock
> * Individual ThreadSynchronizers locks
>
> > this may continue to be held after releasing the iolock, or acquired without it,
> > but be very careful to never attempt to acquire the iolock while holding it

The following is the root lock, meaning no other lock shall be held when trying to acquire it:

> * toplevel
Expand Down Expand Up @@ -95,37 +106,32 @@ Module serializer : toplevel lock

JIT & type-inference : codegen lock

MethodInstance updates : codegen lock
MethodInstance/CodeInstance updates : Method->writelock, codegen lock

> * These fields are generally lazy initialized, using the test-and-test-and-set pattern.
> * These are set at construction and immutable:
>
> * specTypes
> * sparam_vals
> * def
> * These are set by `jl_type_infer` (while holding codegen lock):
>
> * cache
> * rettype
> * inferred
> * these can also be reset, see `jl_set_lambda_rettype` for that logic as it needs to keep `functionObjectsDecls`
> in sync
* valid ages

> * `inInference` flag:
>
> * optimization to quickly avoid recurring into `jl_type_infer` while it is already running
> * actual state (of setting `inferred`, then `fptr`) is protected by codegen lock
> * Function pointers (`jlcall_api` and `fptr`, `unspecialized_ducttape`):
>
> * Function pointers:
> * these transition once, from `NULL` to a value, while the codegen lock is held
> * Code-generator cache (the contents of `functionObjectsDecls`):
>
> * Code-generator cache (the contents of `functionObjectsDecls`):
> * these can transition multiple times, but only while the codegen lock is held
> * it is valid to use old version of this, or block for new versions of this, so races are benign,
> as long as the code is careful not to reference other data in the method instance (such as `rettype`)
> and assume it is coordinated, unless also holding the codegen lock
> * `compile_traced` flag:
>
> * unknown
LLVMContext : codegen lock

Method : Method->writelock
Expand Down
1 change: 1 addition & 0 deletions stdlib/FileWatching/src/FileWatching.jl
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ mutable struct _FDWatcher
elseif FDWatchers[fdnum] !== nothing
this = FDWatchers[fdnum]::_FDWatcher
this.refcount = (this.refcount[1] + Int(readable), this.refcount[2] + Int(writable))
iolock_end()
return this
end
if ccall(:jl_uv_unix_fd_is_watched, Int32, (RawFD, Ptr{Cvoid}, Ptr{Cvoid}), fd, C_NULL, eventloop()) == 1
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Sockets/src/Sockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ function accept(server::LibuvServer, client::LibuvStream)
end
preserve_handle(server)
lock(server.cond)
iolock_end()
try
iolock_end()
wait(server.cond)
finally
unlock(server.cond)
Expand Down
4 changes: 2 additions & 2 deletions test/read.jl
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ end
open_streams = []
function cleanup()
for s_ in open_streams
try close(s_); catch; end
close(s_)
end
empty!(open_streams)
for tsk in tasks
Base.wait(tsk)
wait(tsk)
end
empty!(tasks)
end
Expand Down

0 comments on commit 6a7d93a

Please sign in to comment.