From 6c4f2165619449182693061e07654b399428e0cf Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 29 Jul 2021 12:05:08 -0400 Subject: [PATCH] add stream shutdown and support half-duplex operation (#40783) Fixes one of the issues mentioned in #24526 --- NEWS.md | 1 + base/coreio.jl | 1 + base/exports.jl | 1 + base/io.jl | 124 +++++++++++-------- base/iobuffer.jl | 6 + base/libuv.jl | 1 + base/process.jl | 1 + base/stream.jl | 132 +++++++++++++++------ doc/src/base/io-network.md | 1 + stdlib/Distributed/src/process_messages.jl | 4 +- stdlib/Sockets/src/Sockets.jl | 15 +-- stdlib/Sockets/test/runtests.jl | 35 +++++- test/iobuffer.jl | 21 ++-- test/spawn.jl | 8 ++ 14 files changed, 240 insertions(+), 111 deletions(-) diff --git a/NEWS.md b/NEWS.md index 84941b377daea..d64511283107d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -45,6 +45,7 @@ Standard library changes overflow in most cases. The new function `checked_length` is now available, which will try to use checked arithmetic to error if the result may be wrapping. Or use a package such as SaferIntegers.jl when constructing the range. ([#40382]) +* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#40783]). #### InteractiveUtils * A new macro `@time_imports` for reporting any time spent importing packages and their dependencies ([#41612]) diff --git a/base/coreio.jl b/base/coreio.jl index 9ef717383dedd..d0f8df290b41b 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -13,6 +13,7 @@ write(::DevNull, ::UInt8) = 1 unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n close(::DevNull) = nothing wait_close(::DevNull) = wait() +bytesavailable(io::DevNull) = 0 let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR} global write(io::CoreIO, x::UInt8) = Core.write(io, x) diff --git a/base/exports.jl b/base/exports.jl index 0a8e473055b4b..da0019ab46987 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -803,6 +803,7 @@ export # I/O and events close, + shutdown, countlines, eachline, readeach, diff --git a/base/io.jl b/base/io.jl index 30a87aa9e1cf3..2eface1ca9458 100644 --- a/base/io.jl +++ b/base/io.jl @@ -60,9 +60,49 @@ function isopen end Close an I/O stream. Performs a [`flush`](@ref) first. """ function close end + +""" + shutdown(stream) + +Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref) +first. Notify the other end that no more data will be written to the underlying +file. This is not supported by all IO types. + +# Examples +```jldoctest +julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task + +julia> write(io, "request"); + +julia> # calling `read(io)` here would block forever + +julia> shutdown(io); + +julia> read(io, String) +"request" +""" +function shutdown end + +""" + flush(stream) + +Commit all currently buffered writes to the given stream. +""" function flush end -function wait_readnb end -function wait_close end + +""" + bytesavailable(io) + +Return the number of bytes available for reading before a read from this stream or buffer will block. + +# Examples +```jldoctest +julia> io = IOBuffer("JuliaLang is a GitHub organization"); + +julia> bytesavailable(io) +34 +``` +""" function bytesavailable end """ @@ -81,7 +121,7 @@ function readavailable end """ isreadable(io) -> Bool -Return `true` if the specified IO object is readable (if that can be determined). +Return `false` if the specified IO object is not readable. # Examples ```jldoctest @@ -99,12 +139,12 @@ true julia> rm("myfile.txt") ``` """ -function isreadable end +isreadable(io::IO) = isopen(io) """ iswritable(io) -> Bool -Return `true` if the specified IO object is writable (if that can be determined). +Return `false` if the specified IO object is not writable. # Examples ```jldoctest @@ -122,10 +162,23 @@ false julia> rm("myfile.txt") ``` """ -function iswritable end -function copy end +iswritable(io::IO) = isopen(io) + +""" + eof(stream) -> Bool + +Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this +function will block to wait for more data if necessary, and then return `false`. Therefore +it is always safe to read one byte after seeing `eof` return `false`. `eof` will return +`false` as long as buffered data is still available, even if the remote end of a connection +is closed. +""" function eof end +function copy end +function wait_readnb end +function wait_close end + """ read(io::IO, T) @@ -357,65 +410,37 @@ end function pipe_reader end function pipe_writer end +for f in (:flush, :shutdown, :iswritable) + @eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO) +end write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte) write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from)) unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt} buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...) -flush(io::AbstractPipe) = flush(pipe_writer(io)::IO) +for f in ( + # peek/mark interface + :mark, :unmark, :reset, :ismarked, + # Simple reader functions + :read, :readavailable, :bytesavailable, :reseteof, :isreadable) + @eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO) +end read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8 unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb) -read(io::AbstractPipe) = read(pipe_reader(io)::IO) readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out) readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n) - -for f in ( - # peek/mark interface - :mark, :unmark, :reset, :ismarked, - # Simple reader functions - :readavailable, :isreadable) - @eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO) -end peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T +wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb) +eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool -iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO) isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO) close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO)) -wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb) wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO)) -""" - bytesavailable(io) - -Return the number of bytes available for reading before a read from this stream or buffer will block. - -# Examples -```jldoctest -julia> io = IOBuffer("JuliaLang is a GitHub organization"); - -julia> bytesavailable(io) -34 -``` -""" -bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO) -bytesavailable(io::DevNull) = 0 - -""" - eof(stream) -> Bool - -Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this -function will block to wait for more data if necessary, and then return `false`. Therefore -it is always safe to read one byte after seeing `eof` return `false`. `eof` will return -`false` as long as buffered data is still available, even if the remote end of a connection -is closed. -""" -eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool -reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO) - # Exception-safe wrappers (io = open(); try f(io) finally close(io)) @@ -1119,11 +1144,6 @@ ismarked(io::IO) = io.mark >= 0 # Make sure all IO streams support flush, even if only as a no-op, # to make it easier to write generic I/O code. -""" - flush(stream) - -Commit all currently buffered writes to the given stream. -""" flush(io::IO) = nothing """ diff --git a/base/iobuffer.jl b/base/iobuffer.jl index e204eca906cbf..f65ed3f894fe0 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -334,6 +334,12 @@ end eof(io::GenericIOBuffer) = (io.ptr-1 == io.size) +function shutdown(io::GenericIOBuffer) + io.writable = false + # OR throw(_UVError("shutdown", UV_ENOTSOCK)) + nothing +end + @noinline function close(io::GenericIOBuffer{T}) where T io.readable = false io.writable = false diff --git a/base/libuv.jl b/base/libuv.jl index c63045f4b1b68..c64cbff564b66 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -107,6 +107,7 @@ end function uv_alloc_buf end function uv_readcb end function uv_writecb_task end +function uv_shutdowncb_task end function uv_return_spawn end function uv_asynccb end function uv_timercb end diff --git a/base/process.jl b/base/process.jl index b3ec79fa1ab4e..6be7099f94f35 100644 --- a/base/process.jl +++ b/base/process.jl @@ -275,6 +275,7 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool) @warn "Process error" exception=(ex, catch_backtrace()) finally close(parent) + child_readable || shutdown(stdio) end end catch ex diff --git a/base/stream.jl b/base/stream.jl index 6cbd1d3b86a28..6e433b771f0d2 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -109,7 +109,7 @@ function eof(s::LibuvStream) # and that we won't return true if there's a readerror pending (it'll instead get thrown). # This requires some careful ordering here (TODO: atomic loads) bytesavailable(s) > 0 && return false - open = isopen(s) # must precede readerror check + open = isreadable(s) # must precede readerror check s.readerror === nothing || throw(s.readerror) return !open end @@ -270,6 +270,7 @@ show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(", function isreadable(io::LibuvStream) bytesavailable(io) > 0 && return true isopen(io) || return false + io.status == StatusEOF && return false return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0 end @@ -378,7 +379,7 @@ function isopen(x::Union{LibuvStream, LibuvServer}) if x.status == StatusUninit || x.status == StatusInit throw(ArgumentError("$x is not initialized")) end - return x.status != StatusClosed && x.status != StatusEOF + return x.status != StatusClosed end function check_open(x::Union{LibuvStream, LibuvServer}) @@ -390,13 +391,13 @@ end function wait_readnb(x::LibuvStream, nb::Int) # fast path before iolock acquire bytesavailable(x.buffer) >= nb && return - open = isopen(x) # must precede readerror check + open = isopen(x) && x.status != StatusEOF # must precede readerror check x.readerror === nothing || throw(x.readerror) open || return iolock_begin() # repeat fast path after iolock acquire, before other expensive work bytesavailable(x.buffer) >= nb && (iolock_end(); return) - open = isopen(x) + open = isopen(x) && x.status != StatusEOF x.readerror === nothing || throw(x.readerror) open || (iolock_end(); return) # now do the "real" work @@ -407,6 +408,7 @@ function wait_readnb(x::LibuvStream, nb::Int) while bytesavailable(x.buffer) < nb x.readerror === nothing || throw(x.readerror) isopen(x) || break + x.status != StatusEOF || break x.throttle = max(nb, x.throttle) start_reading(x) # ensure we are reading iolock_end() @@ -431,6 +433,50 @@ function wait_readnb(x::LibuvStream, nb::Int) nothing end +function shutdown(s::LibuvStream) + iolock_begin() + check_open(s) + req = Libc.malloc(_sizeof_uv_shutdown) + uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call + err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), + req, s, @cfunction(uv_shutdowncb_task, Cvoid, (Ptr{Cvoid}, Cint))) + if err < 0 + Libc.free(req) + uv_error("shutdown", err) + end + ct = current_task() + preserve_handle(ct) + sigatomic_begin() + uv_req_set_data(req, ct) + iolock_end() + status = try + sigatomic_end() + wait()::Cint + finally + # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end + sigatomic_end() + iolock_begin() + ct.queue === nothing || list_deletefirst!(ct.queue, ct) + if uv_req_data(req) != C_NULL + # req is still alive, + # so make sure we won't get spurious notifications later + uv_req_set_data(req, C_NULL) + else + # done with req + Libc.free(req) + end + iolock_end() + unpreserve_handle(ct) + if isopen(s) && (s.status == StatusEOF && !isa(s, TTY)) || ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), s.handle) == 0 + close(s) + end + end + if status < 0 + throw(_UVError("shutdown", status)) + end + nothing +end + function wait_close(x::Union{LibuvStream, LibuvServer}) preserve_handle(x) lock(x.cond) @@ -451,7 +497,7 @@ function close(stream::Union{LibuvStream, LibuvServer}) if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing - elseif isopen(stream) || stream.status == StatusEOF + elseif isopen(stream) should_wait = uv_handle_data(stream) != C_NULL if stream.status != StatusClosing ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) @@ -606,35 +652,33 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf) function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt) lock(stream.cond) - try - if nread < 0 - if nread == UV_ENOBUFS && nrequested == 0 - # remind the client that stream.buffer is full - notify(stream.cond) - elseif nread == UV_EOF - if isa(stream, TTY) - stream.status = StatusEOF # libuv called uv_stop_reading already + if nread < 0 + if nread == UV_ENOBUFS && nrequested == 0 + # remind the client that stream.buffer is full + notify(stream.cond) + elseif nread == UV_EOF # libuv called uv_stop_reading already + if stream.status != StatusClosing + if stream isa TTY || ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0 + # stream can still be used either by reseteof or write + stream.status = StatusEOF notify(stream.cond) - elseif stream.status != StatusClosing - # begin shutdown of the stream + else + # underlying stream is no longer useful: begin finalization ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end - else - stream.readerror = _UVError("read", nread) - # This is a fatal connection error. Shutdown requests as per the usual - # close function won't work and libuv will fail with an assertion failure - ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream) - stream.status = StatusClosing - notify(stream.cond) end else - notify_filled(stream.buffer, nread) - notify(stream.cond) + stream.readerror = _UVError("read", nread) + # This is a fatal connection error + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) + stream.status = StatusClosing end - finally - unlock(stream.cond) + else + notify_filled(stream.buffer, nread) + notify(stream.cond) end + unlock(stream.cond) # Stop background reading when # 1) there's nobody paying attention to the data we are reading @@ -651,6 +695,7 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) nothing end readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested)) + nothing end function reseteof(x::TTY) @@ -844,6 +889,7 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) while bytesavailable(buf) < nb s.readerror === nothing || throw(s.readerror) isopen(s) || break + s.status != StatusEOF || break iolock_end() wait_readnb(s, nb) iolock_begin() @@ -890,6 +936,7 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) while bytesavailable(buf) < nb s.readerror === nothing || throw(s.readerror) isopen(s) || throw(EOFError()) + s.status != StatusEOF || throw(EOFError()) iolock_end() wait_readnb(s, nb) iolock_begin() @@ -946,13 +993,14 @@ function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false) @assert buf.seekable == false if !occursin(c, buf) # fast path checks first x.readerror === nothing || throw(x.readerror) - if isopen(x) + if isopen(x) && x.status != StatusEOF preserve_handle(x) lock(x.cond) try while !occursin(c, x.buffer) x.readerror === nothing || throw(x.readerror) isopen(x) || break + x.status != StatusEOF || break start_reading(x) # ensure we are reading iolock_end() wait(x.cond) @@ -1115,6 +1163,20 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) nothing end +function uv_shutdowncb_task(req::Ptr{Cvoid}, status::Cint) + d = uv_req_data(req) + if d != C_NULL + uv_req_set_data(req, C_NULL) # let the Task know we got the shutdowncb + t = unsafe_pointer_to_objref(d)::Task + schedule(t, status) + else + # no owner for this req, safe to just free it + Libc.free(req) + end + nothing +end + + _fd(x::IOStream) = RawFD(fd(x)) _fd(x::Union{OS_HANDLE, RawFD}) = x @@ -1405,18 +1467,20 @@ mutable struct BufferStream <: LibuvStream buffer::IOBuffer cond::Threads.Condition readerror::Any - is_open::Bool buffer_writes::Bool lock::ReentrantLock # advisory lock + status::Int - BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, true, false, ReentrantLock()) + BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, false, ReentrantLock(), StatusActive) end -isopen(s::BufferStream) = s.is_open +isopen(s::BufferStream) = s.status != StatusClosed + +shutdown(s::BufferStream) = close(s) function close(s::BufferStream) lock(s.cond) do - s.is_open = false + s.status = StatusClosed notify(s.cond) nothing end @@ -1439,8 +1503,8 @@ function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) end bytesavailable(s::BufferStream) = bytesavailable(s.buffer) -isreadable(s::BufferStream) = s.buffer.readable -iswritable(s::BufferStream) = s.buffer.writable +isreadable(s::BufferStream) = (isopen(s) || bytesavailable(s) > 0) && s.buffer.readable +iswritable(s::BufferStream) = isopen(s) && s.buffer.writable function wait_readnb(s::BufferStream, nb::Int) lock(s.cond) do @@ -1450,7 +1514,7 @@ function wait_readnb(s::BufferStream, nb::Int) end end -show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open) +show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")") function readuntil(s::BufferStream, c::UInt8; keep::Bool=false) bytes = lock(s.cond) do diff --git a/doc/src/base/io-network.md b/doc/src/base/io-network.md index 2d6a462400813..8b2ea3c4c2412 100644 --- a/doc/src/base/io-network.md +++ b/doc/src/base/io-network.md @@ -13,6 +13,7 @@ Base.take!(::Base.GenericIOBuffer) Base.fdio Base.flush Base.close +Base.shutdown Base.write Base.read Base.read! diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 8d5dac5af571e..732b972858dc9 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -230,8 +230,8 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) deregister_worker(wpid) end - isopen(r_stream) && close(r_stream) - isopen(w_stream) && close(w_stream) + close(r_stream) + close(w_stream) if (myid() == 1) && (wpid > 1) if oldstate != W_TERMINATING diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 6952aa9bd8a0f..fb46b9255e6f0 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -139,9 +139,6 @@ function TCPServer(; delay=true) return tcp end -isreadable(io::TCPSocket) = isopen(io) || bytesavailable(io) > 0 -iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing - """ accept(server[, client]) @@ -578,11 +575,11 @@ Enables or disables Nagle's algorithm on a given TCP server or socket. """ function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool) # disable or enable Nagle's algorithm on all OSes - Sockets.iolock_begin() - Sockets.check_open(sock) + iolock_begin() + check_open(sock) err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) # TODO: check err - Sockets.iolock_end() + iolock_end() return err end @@ -592,15 +589,15 @@ end On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`. """ function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool) - Sockets.iolock_begin() - Sockets.check_open(sock) + iolock_begin() + check_open(sock) @static if Sys.islinux() # tcp_quickack is a linux only option if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0 @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1 end end - Sockets.iolock_end() + iolock_end() nothing end diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index b00eeeee2d068..552ca3eac528e 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -526,17 +526,42 @@ end r = @async close(s) @test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s) fetch(r) + close(srv) end end @testset "iswritable" begin let addr = Sockets.InetAddr(ip"127.0.0.1", 4445) srv = listen(addr) - s = Sockets.TCPSocket() - Sockets.connect!(s, addr) - @test iswritable(s) - close(s) - @test !iswritable(s) + let s = Sockets.TCPSocket() + Sockets.connect!(s, addr) + @test iswritable(s) broken=Sys.iswindows() + close(s) + @test !iswritable(s) + end + let s = Sockets.connect(addr) + @test iswritable(s) + shutdown(s) + @test !iswritable(s) + close(s) + end + close(srv) + srv = listen(addr) + let s = Sockets.connect(addr) + let c = accept(srv) + Base.errormonitor(@async try; write(c, c); finally; close(c); end) + end + @test iswritable(s) + write(s, "hello world\n") + shutdown(s) + @test !iswritable(s) + @test isreadable(s) + @test read(s, String) == "hello world\n" + @test !isreadable(s) + @test !isopen(s) + close(s) + end + close(srv) end end diff --git a/test/iobuffer.jl b/test/iobuffer.jl index 80972a7c65448..5514b6dc03f7f 100644 --- a/test/iobuffer.jl +++ b/test/iobuffer.jl @@ -9,7 +9,7 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size) @testset "Read/write empty IOBuffer" begin io = IOBuffer() @test eof(io) - @test_throws EOFError read(io,UInt8) + @test_throws EOFError read(io, UInt8) @test write(io,"abc") === 3 @test isreadable(io) @test iswritable(io) @@ -18,7 +18,7 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size) @test position(io) == 3 @test eof(io) seek(io, 0) - @test read(io,UInt8) == convert(UInt8, 'a') + @test read(io, UInt8) == convert(UInt8, 'a') a = Vector{UInt8}(undef, 2) @test read!(io, a) == a @test a == UInt8['b','c'] @@ -34,22 +34,24 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size) truncate(io, 10) @test position(io) == 0 @test all(io.data .== 0) - @test write(io,Int16[1,2,3,4,5,6]) === 12 + @test write(io, Int16[1, 2, 3, 4, 5, 6]) === 12 seek(io, 2) truncate(io, 10) @test ioslength(io) == 10 io.readable = false - @test_throws ArgumentError read!(io,UInt8[0]) + @test_throws ArgumentError read!(io, UInt8[0]) truncate(io, 0) @test write(io,"boston\ncambridge\n") > 0 @test String(take!(io)) == "boston\ncambridge\n" @test String(take!(io)) == "" @test write(io, ComplexF64(0)) === 16 @test write(io, Rational{Int64}(1//2)) === 16 - close(io) - @test_throws ArgumentError write(io,UInt8[0]) - @test_throws ArgumentError seek(io,0) + @test shutdown(io) === nothing + @test_throws ArgumentError write(io, UInt8[0]) @test eof(io) + @test close(io) === nothing + @test_throws ArgumentError write(io, UInt8[0]) + @test_throws ArgumentError seek(io, 0) end @testset "Read/write readonly IOBuffer" begin @@ -237,7 +239,7 @@ end @test isreadable(bstream) @test iswritable(bstream) @test bytesavailable(bstream) == 0 - @test sprint(show, bstream) == "BufferStream() bytes waiting:$(bytesavailable(bstream.buffer)), isopen:true" + @test sprint(show, bstream) == "BufferStream(bytes waiting=$(bytesavailable(bstream.buffer)), isopen=true)" a = rand(UInt8,10) write(bstream,a) @test !eof(bstream) @@ -251,9 +253,10 @@ end @test !eof(bstream) read!(bstream,c) @test c == a[3:10] - @test close(bstream) === nothing + @test shutdown(bstream) === nothing @test eof(bstream) @test bytesavailable(bstream) == 0 + @test close(bstream) === nothing flag = Ref{Bool}(false) event = Base.Event() bstream = Base.BufferStream() diff --git a/test/spawn.jl b/test/spawn.jl index 95915a5ad804b..9ec7c6842bedb 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -765,7 +765,15 @@ let text = "input-test-text" @test read(proc, String) == string(length(text), '\n') @test success(proc) @test String(take!(b)) == text + + out = Base.BufferStream() + proc = run(catcmd, IOBuffer(text), out, wait=false) + @test proc.out === out + @test read(out, String) == text + @test success(proc) end + + @test repr(Base.CmdRedirect(``, devnull, 0, false)) == "pipeline(``, stdin>Base.DevNull())" @test repr(Base.CmdRedirect(``, devnull, 1, true)) == "pipeline(``, stdout