Skip to content

Commit

Permalink
io: corrected error handling for streams
Browse files Browse the repository at this point in the history
After #32309, we became much more eager to forget about normal errors
and more eager to throw an EOFError at the wrong place.
This is intended to fix that oversight.
  • Loading branch information
vtjnash committed Jun 27, 2019
1 parent 67cdc55 commit 94f3ea6
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 78 deletions.
155 changes: 81 additions & 74 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ abstract type LibuvStream <: IO end
bytesavailable(s::LibuvStream) = bytesavailable(s.buffer)

function eof(s::LibuvStream)
if isopen(s) # fast path
bytesavailable(s) <= 0 || return false
else
return bytesavailable(s) <= 0
end
bytesavailable(s) > 0 && return false
wait_readnb(s, 1)
return !isopen(s) && bytesavailable(s) <= 0
# This function is race-y if used from multiple threads, but we guarantee
# it to never return false until the stream is definitively exhausted
# and that we won't return true if there's a readerror pending (it'll instead get thrown).
# This requires some careful ordering here (TODO: atomic loads)
bytesavailable(s) > 0 && return false
open = isopen(s) # must preceed readerror check
s.readerror === nothing || throw(s.readerror)
return !open
end

# Limit our default maximum read and buffer size,
Expand Down Expand Up @@ -327,17 +330,25 @@ function check_open(x::Union{LibuvStream, LibuvServer})
end

function wait_readnb(x::LibuvStream, nb::Int)
# fast path before iolock acquire
bytesavailable(x.buffer) >= nb && return
open = isopen(x) # must preceed readerror check
x.readerror === nothing || throw(x.readerror)
open || return
iolock_begin()
if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path
iolock_end()
return
end
# repeat fast path after iolock acquire, before other expensive work
bytesavailable(x.buffer) >= nb && (iolock_end(); return)
open = isopen(x)
x.readerror === nothing || throw(x.readerror)
open || (iolock_end(); return)
# now do the "real" work
oldthrottle = x.throttle
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && bytesavailable(x.buffer) < nb
while bytesavailable(x.buffer) < nb
x.readerror === nothing || throw(x.readerror)
isopen(x) || break
x.throttle = max(nb, x.throttle)
start_reading(x) # ensure we are reading
iolock_end()
Expand All @@ -351,6 +362,8 @@ function wait_readnb(x::LibuvStream, nb::Int)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
if oldthrottle <= x.throttle <= nb
# if we're interleaving readers, we might not get back to the "original" throttle
# but we consider that an acceptable "risk", since we can't be quite sure what the intended value is now
x.throttle = oldthrottle
end
unpreserve_handle(x)
Expand Down Expand Up @@ -543,7 +556,6 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
# remind the client that stream.buffer is full
notify(stream.cond)
elseif nread == UV_EOF
stream.readerror = EOFError()
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.cond)
Expand Down Expand Up @@ -589,7 +601,6 @@ function reseteof(x::TTY)
iolock_begin()
if x.status == StatusEOF
x.status = StatusOpen
x.readerror isa EOFError && (x.readerror = nothing)
end
iolock_end()
nothing
Expand Down Expand Up @@ -772,40 +783,33 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb

local nread
if bytesavailable(sbuf) >= nb
nread = readbytes!(sbuf, a, nb)
iolock_end()
return nread
end

if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
while isopen(s) && bytesavailable(sbuf) < nb
function wait_locked(s, buf, nb)
while bytesavailable(buf) < nb
s.readerror === nothing || throw(s.readerror)
isopen(s) || break
iolock_end()
wait_readnb(s, nb)
iolock_begin()
end
nread = readbytes!(sbuf, a, nb)
iolock_end()
return nread
end

nread = try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
newbuf = PipeBuffer(a, maxsize = nb)
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_locked(s, sbuf, nb)
end
if bytesavailable(sbuf) >= nb
nread = readbytes!(sbuf, a, nb)
else
newbuf = PipeBuffer(a, maxsize=nb)
newbuf.size = 0 # reset the write pointer to the beginning
s.buffer = newbuf
write(newbuf, sbuf)
iolock_end()
wait_readnb(s, Int(nb))
iolock_begin()
compact(newbuf)
bytesavailable(newbuf)
finally
s.buffer = sbuf
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
nread = try
s.buffer = newbuf
write(newbuf, sbuf)
wait_locked(s, newbuf, nb)
bytesavailable(newbuf)
finally
s.buffer = sbuf
end
compact(newbuf)
end
iolock_end()
return nread
Expand All @@ -825,31 +829,30 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb

if bytesavailable(sbuf) >= nb
unsafe_read(sbuf, p, nb)
elseif nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
while isopen(s) && bytesavailable(sbuf) < nb
function wait_locked(s, buf, nb)
while bytesavailable(buf) < nb
s.readerror === nothing || throw(s.readerror)
isopen(s) || throw(EOFError())
iolock_end()
wait_readnb(s, Int(nb))
wait_readnb(s, nb)
iolock_begin()
end
end

if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_locked(s, sbuf, Int(nb))
end
if bytesavailable(sbuf) >= nb
unsafe_read(sbuf, p, nb)
else
newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize=Int(nb))
newbuf.size = 0 # reset the write pointer to the beginning
try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize = Int(nb))
newbuf.size = 0 # reset the write pointer to the beginning
s.buffer = newbuf
write(newbuf, sbuf)
iolock_end()
wait_readnb(s, Int(nb))
iolock_begin()
nb == bytesavailable(newbuf) || throw(EOFError())
wait_locked(s, newbuf, Int(nb))
finally
s.buffer = sbuf
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
end
end
iolock_end()
Expand All @@ -860,9 +863,9 @@ function read(this::LibuvStream, ::Type{UInt8})
iolock_begin()
sbuf = this.buffer
@assert sbuf.seekable == false
while isopen(this) && bytesavailable(sbuf) < 1
while bytesavailable(sbuf) < 1
iolock_end()
wait_readnb(this, 1)
eof(this) && throw(EOFError())
iolock_begin()
end
c = read(sbuf, UInt8)
Expand All @@ -871,7 +874,7 @@ function read(this::LibuvStream, ::Type{UInt8})
end

function readavailable(this::LibuvStream)
wait_readnb(this, 1)
wait_readnb(this, 1) # unlike the other `read` family of functions, this one doesn't guarantee error reporting
iolock_begin()
buf = this.buffer
@assert buf.seekable == false
Expand All @@ -884,25 +887,29 @@ function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false)
iolock_begin()
buf = x.buffer
@assert buf.seekable == false
if isopen(x) && !occursin(c, buf) # fast path
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && !occursin(c, x.buffer)
x.readerror === nothing || throw(x.readerror)
start_reading(x) # ensure we are reading
iolock_end()
wait(x.cond)
if !occursin(c, buf) # fast path checks first
x.readerror === nothing || throw(x.readerror)
if isopen(x)
preserve_handle(x)
lock(x.cond)
try
while !occursin(c, x.buffer)
x.readerror === nothing || throw(x.readerror)
isopen(x) || break
start_reading(x) # ensure we are reading
iolock_end()
wait(x.cond)
unlock(x.cond)
iolock_begin()
lock(x.cond)
end
finally
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
unlock(x.cond)
iolock_begin()
lock(x.cond)
unpreserve_handle(x)
end
finally
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
unlock(x.cond)
unpreserve_handle(x)
end
end
bytes = readuntil(buf, c, keep=keep)
Expand Down
17 changes: 13 additions & 4 deletions stdlib/REPL/test/repl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -750,11 +750,20 @@ let exename = Base.julia_cmd()
@test output == "1\r\nexit()\r\n1\r\n\r\njulia> "
end
@test bytesavailable(pty_master) == 0
@test try # possibly consume child-exited notification
eof(pty_master)
catch ex
(ex isa Base.IOError && ex.code == Base.UV_EIO) || rethrow()
@test if Sys.iswindows() || Sys.isbsd()
eof(pty_master)
else
# Some platforms (such as linux) report EIO instead of EOF
# possibly consume child-exited notification
# for example, see discussion in https://bugs.python.org/issue5380
try
eof(pty_master) && !Sys.islinux()
catch ex
(ex isa Base.IOError && ex.code == Base.UV_EIO) || rethrow()
@test_throws ex eof(pty_master) # make sure the error is sticky
pty_master.readerror = nothing
eof(pty_master)
end
end
@test read(pty_master, String) == ""
wait(p)
Expand Down

0 comments on commit 94f3ea6

Please sign in to comment.