diff --git a/base/LineEdit.jl b/base/LineEdit.jl index e341fe3e509b2..61b532a338565 100644 --- a/base/LineEdit.jl +++ b/base/LineEdit.jl @@ -1569,7 +1569,6 @@ function prompt!(term, prompt, s = init_state(term, prompt)) raw!(term, true) enable_bracketed_paste(term) try - Base.start_reading(term) activate(prompt, s, term) while true map = keymap(s, prompt) @@ -1585,14 +1584,11 @@ function prompt!(term, prompt, s = init_state(term, prompt)) state = :done end if state == :abort - Base.stop_reading(term) return buffer(s), false, false elseif state == :done - Base.stop_reading(term) return buffer(s), true, false elseif state == :suspend @unix_only begin - Base.stop_reading(term) return buffer(s), true, true end else diff --git a/base/REPL.jl b/base/REPL.jl index 21f9ac124bb53..4738e36d33979 100644 --- a/base/REPL.jl +++ b/base/REPL.jl @@ -801,11 +801,9 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep LineEdit.commit_line(s) # This is slightly ugly but ok for now terminal = LineEdit.terminal(s) - Base.stop_reading(terminal) raw!(terminal, false) && disable_bracketed_paste(terminal) LineEdit.mode(s).on_done(s, LineEdit.buffer(s), true) raw!(terminal, true) && enable_bracketed_paste(terminal) - Base.start_reading(terminal) else break end diff --git a/base/client.jl b/base/client.jl index 093b53fc362ce..3cf3378da5ccf 100644 --- a/base/client.jl +++ b/base/client.jl @@ -132,14 +132,6 @@ function eval_user_input(ast::ANY, show_value) isa(STDIN,TTY) && println() end -function repl_callback(ast::ANY, show_value) - global _repl_enough_stdin = true - Base.stop_reading(STDIN) - put!(repl_channel, (ast, show_value)) -end - -_repl_start = Condition() - syntax_deprecation_warnings(warn::Bool) = ccall(:jl_parse_depwarn, Cint, (Cint,), warn)!=0 diff --git a/base/iobuffer.jl b/base/iobuffer.jl index 14737a1fd5f17..7b399a8639bf2 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -4,7 +4,7 @@ # Stateful string type IOBuffer{T<:AbstractVector{UInt8}} <: IO - data::T # T should support: getindex, setindex!, length, copy!, resize!, T(), and (in a few cases) pointer + data::T # T should support: getindex, setindex!, length, copy!, resize!, and T() readable::Bool writable::Bool seekable::Bool # if not seekable, implementation is free to destroy (compact) past read data @@ -29,6 +29,8 @@ function copy(b::IOBuffer) ret end +show(io::IO, b::Type{SimpleIOBuffer}) = print(io, "IOBuffer") +show(io::IO, b::Type{IOBuffer}) = print(io, "IOBuffer{?}") show(io::IO, b::IOBuffer) = print(io, "IOBuffer(data=UInt8[...], ", "readable=", b.readable, ", ", "writable=", b.writable, ", ", @@ -50,26 +52,32 @@ IOBuffer(readable::Bool, writable::Bool) = IOBuffer(UInt8[], readable, writable) IOBuffer() = IOBuffer(true, true) IOBuffer(maxsize::Int) = (x=IOBuffer(Array(UInt8,maxsize), true, true, maxsize); x.size=0; x) -is_maxsize_unlimited(io::IOBuffer) = (io.maxsize == typemax(Int)) maxsize(io::IOBuffer) = io.maxsize read!(from::IOBuffer, a::Array) = read_sub(from, a, 1, length(a)) -function read_sub{T}(from::IOBuffer, a::Array{T}, offs, nel) +function read_sub{T}(from::IOBuffer, a::AbstractArray{T}, offs, nel) from.readable || throw(ArgumentError("read failed, IOBuffer is not readable")) - isbits(T) || throw(ArgumentError("read from IOBuffer only supports bits types or arrays of bits types; got "*string(T))) if offs+nel-1 > length(a) || offs < 1 || nel < 0 throw(BoundsError()) end - nb = nel * sizeof(T) - avail = nb_available(from) - adv = min(avail, nb) - copy!(reinterpret(UInt8, a), 1 + (1 - offs) * sizeof(T), - from.data, from.ptr, - adv) - from.ptr += adv - if nb > avail - throw(EOFError()) + if isbits(T) && isa(a,Array) + nb = nel * sizeof(T) + avail = nb_available(from) + adv = min(avail, nb) + copy!(pointer_to_array(convert(Ptr{UInt8},pointer(a)), sizeof(a)), # reinterpret(UInt8,a) but without setting the shared data property on a + 1 + (1 - offs) * sizeof(T), + from.data, + from.ptr, + adv) + from.ptr += adv + if nb > avail + throw(EOFError()) + end + else + for i = offs:offs+nel-1 + a[i] = read(to, T) + end end return a end @@ -186,13 +194,10 @@ function ensureroom(io::IOBuffer, nshort::Int) end return io end + eof(io::IOBuffer) = (io.ptr-1 == io.size) + function close{T}(io::IOBuffer{T}) - if io.writable - resize!(io.data, 0) - else - io.data = T() - end io.readable = false io.writable = false io.seekable = false @@ -200,15 +205,39 @@ function close{T}(io::IOBuffer{T}) io.maxsize = 0 io.ptr = 1 io.mark = -1 + if io.writable + resize!(io.data, 0) + else + io.data = T() + end nothing end + isopen(io::IOBuffer) = io.readable || io.writable || io.seekable || nb_available(io) > 0 + function bytestring(io::IOBuffer) io.readable || throw(ArgumentError("bytestring read failed, IOBuffer is not readable")) io.seekable || throw(ArgumentError("bytestring read failed, IOBuffer is not seekable")) - bytestring(pointer(io.data), io.size) + b = copy!(Array(UInt8, io.size), 1, io.data, 1, io.size) + return isvalid(ASCIIString, b) ? ASCIIString(b) : UTF8String(b) end + function takebuf_array(io::IOBuffer) + ismarked(io) && unmark(io) + if io.seekable + nbytes = io.size + data = copy!(Array(UInt8, nbytes), 1, io.data, 1, nbytes) + else + nbytes = nb_available(io) + data = read!(io, Array(UInt8, nbytes)) + end + if io.writable + io.ptr = 1 + io.size = 0 + end + data +end +function takebuf_array(io::SimpleIOBuffer) ismarked(io) && unmark(io) if io.seekable data = io.data @@ -230,10 +259,17 @@ function takebuf_array(io::IOBuffer) end data end -takebuf_string(io::IOBuffer) = bytestring(takebuf_array(io)) +function takebuf_string(io::IOBuffer) + b = takebuf_array(io) + return isvalid(ASCIIString, b) ? ASCIIString(b) : UTF8String(b) +end function write(to::IOBuffer, from::IOBuffer) - written = write(to, from.a, from.ptr, nb_available(from)) + if to === from + from.ptr = from.size + 1 + return 0 + end + written::Int = write_sub(to, from.data, from.ptr, nb_available(from)) from.ptr += written written end @@ -243,29 +279,34 @@ function write(to::IOBuffer, p::Ptr, nb::Int) ensureroom(to, nb) ptr = (to.append ? to.size+1 : to.ptr) written = min(nb, length(to.data) - ptr + 1) - unsafe_copy!(pointer(to.data, ptr), p, written) + p_u8 = convert(Ptr{UInt8}, p) + for i = 0:written - 1 + @inbounds to.data[ptr + i] = unsafe_load(p_u8 + i) + end to.size = max(to.size, ptr - 1 + written) if !to.append to.ptr += written end written end -function write_sub{T}(to::IOBuffer, a::Array{T}, offs, nel) +function write_sub{T}(to::IOBuffer, a::AbstractArray{T}, offs, nel) if offs+nel-1 > length(a) || offs < 1 || nel < 0 throw(BoundsError()) end local written::Int - if isbits(T) + if isbits(T) && isa(a,Array) nb = nel * sizeof(T) ensureroom(to, nb) ptr = (to.append ? to.size+1 : to.ptr) written = min(nb, length(to.data) - ptr + 1) copy!(to.data, ptr, - reinterpret(UInt8, a), 1 + (offs - 1) * sizeof(T), + pointer_to_array(convert(Ptr{UInt8},pointer(a)), sizeof(a)), # reinterpret(UInt8,a) but without setting the shared data property on a + 1 + (offs - 1) * sizeof(T), written) to.size = max(to.size, ptr - 1 + written) if !to.append to.ptr += written end else written = 0 + ensureroom(to, sizeof(a)) for i = offs:offs+nel-1 written += write(to, a[i]) end @@ -310,7 +351,7 @@ end function search(buf::IOBuffer, delim::UInt8) data = buf.data for i = buf.ptr : buf.size - b = data[i] + @inbounds b = data[i] if b == delim return i - buf.ptr + 1 end diff --git a/base/multi.jl b/base/multi.jl index ad7cb43163659..e1fd604925ee5 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -811,7 +811,6 @@ end function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...) @schedule begin disable_nagle(r_stream) - Base.start_reading(r_stream) wait_connected(r_stream) if r_stream != w_stream disable_nagle(w_stream) diff --git a/base/precompile.jl b/base/precompile.jl index e5c57f7c1580b..dbb55f40b615b 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -350,7 +350,6 @@ precompile(Base.rehash!, (Dict{UInt8, Any}, Int)) precompile(Base.reinit_stdio, ()) precompile(Base.reload_path, (ASCIIString,)) precompile(Base.repeat, (ASCIIString, Int)) -precompile(Base.repl_callback, (Expr, Int)) precompile(Base.repl_cmd, (Cmd,)) precompile(Base.require, (ASCIIString,)) precompile(Base.rr2id, (RemoteRef,)) diff --git a/base/process.jl b/base/process.jl index 4fad366fd37a8..a0c0a5a7d1f6a 100644 --- a/base/process.jl +++ b/base/process.jl @@ -434,7 +434,6 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull) function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull) if mode == "r" processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR)) - start_reading(out) (out, processes) elseif mode == "w" processes = @tmp_wpipe tmp inpipe spawn(false, cmds, (tmp,stdio,STDERR)) @@ -467,8 +466,7 @@ end function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull) (out,pc) = open(cmd, "r", stdin) !success(pc) && pipeline_error(pc) - wait_close(out) - return takebuf_array(out.buffer) + return readbytes(out) end function readall(cmd::AbstractCmd, stdin::AsyncStream=DevNull) diff --git a/base/socket.jl b/base/socket.jl index 80bf936aa0585..f2926a346a1d4 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -263,6 +263,7 @@ type TCPSocket <: Socket closenotify::Condition sendbuf::Nullable{SimpleIOBuffer} lock::ReentrantLock + throttle::Int TCPSocket(handle) = new( handle, @@ -273,7 +274,8 @@ type TCPSocket <: Socket false, Condition(), false, Condition(), nothing, - ReentrantLock() + ReentrantLock(), + DEFAULT_READ_BUFFER_SZ ) end function TCPSocket() diff --git a/base/stream.jl b/base/stream.jl index 8ff6679309bf4..b2a7b802249fe 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -49,6 +49,8 @@ function eof(s::AsyncStream) !isopen(s) && nb_available(s.buffer)<=0 end +const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB + const StatusUninit = 0 # handle is allocated, but not initialized const StatusInit = 1 # handle is valid, but not connected/active const StatusConnecting = 2 # handle is in process of connecting @@ -104,6 +106,7 @@ type Pipe <: AsyncStream closenotify::Condition sendbuf::Nullable{SimpleIOBuffer} lock::ReentrantLock + throttle::Int Pipe(handle) = new( handle, @@ -113,7 +116,8 @@ type Pipe <: AsyncStream false,Condition(), false,Condition(), false,Condition(), - nothing, ReentrantLock()) + nothing, ReentrantLock(), + DEFAULT_READ_BUFFER_SZ) end function Pipe() handle = Libc.malloc(_sizeof_uv_named_pipe) @@ -182,6 +186,7 @@ type TTY <: AsyncStream closenotify::Condition sendbuf::Nullable{SimpleIOBuffer} lock::ReentrantLock + throttle::Int @windows_only ispty::Bool function TTY(handle) tty = new( @@ -191,7 +196,8 @@ type TTY <: AsyncStream PipeBuffer(), false,Condition(), false,Condition(), - nothing, ReentrantLock()) + nothing, ReentrantLock(), + DEFAULT_READ_BUFFER_SZ) @windows_only tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Void},), handle)!=0 tty end @@ -269,7 +275,7 @@ function init_stdio(handle) end end -function stream_wait(x,c...) +function stream_wait(x, c...) # for x::LibuvObject preserve_handle(x) try return wait(c...) @@ -307,26 +313,51 @@ end function wait_connected(x) check_open(x) while x.status == StatusConnecting - stream_wait(x,x.connectnotify) + stream_wait(x, x.connectnotify) check_open(x) end end function wait_readbyte(x::AsyncStream, c::UInt8) - while isopen(x) && search(x.buffer,c) <= 0 - start_reading(x) - stream_wait(x,x.readnotify) + preserve_handle(x) + try + while isopen(x) && search(x.buffer,c) <= 0 + start_reading(x) # ensure we are reading + wait(x.readnotify) + end + finally + if isempty(x.readnotify.waitq) + stop_reading(x) # stop reading iff there are currently no other read clients of the stream + end + unpreserve_handle(x) end end function wait_readnb(x::AsyncStream, nb::Int) - while isopen(x) && nb_available(x.buffer) < nb - start_reading(x) - stream_wait(x,x.readnotify) + oldthrottle = x.throttle + preserve_handle(x) + try + while isopen(x) && nb_available(x.buffer) < nb + x.throttle = max(nb, x.throttle) + start_reading(x) # ensure we are reading + wait(x.readnotify) + end + finally + if oldthrottle <= x.throttle <= nb + x.throttle = oldthrottle + end + if isempty(x.readnotify.waitq) + stop_reading(x) # stop reading iff there are currently no other read clients of the stream + end + unpreserve_handle(x) end end -wait_close(x) = if isopen(x) stream_wait(x,x.closenotify); end +function wait_close(x::AsyncStream) + if isopen(x) + stream_wait(x, x.closenotify) + end +end #from `connect` function _uv_hook_connectcb(sock::AsyncStream, status::Int32) @@ -367,8 +398,7 @@ function alloc_request(buffer::SimpleIOBuffer, recommended_size::UInt) end function _uv_hook_alloc_buf(stream::AsyncStream, recommended_size::UInt) (buf,size) = alloc_request(stream.buffer, recommended_size) - @assert size>0 # because libuv requires this (TODO: possibly stop reading too if it fails) - (buf,UInt(size)) + return (buf,UInt(size)) end function notify_filled(buffer::SimpleIOBuffer, nread::Int, base::Ptr{Void}, len::UInt) @@ -394,22 +424,24 @@ function notify_filled(stream::AsyncStream, nread::Int) end end -const READ_BUFFER_SZ=10485760 # 10 MB function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len::UInt) if nread < 0 - if nread != UV_EOF - # This is a fatal connectin error. Shutdown requests as per the usual - # close function won't work and libuv will fail with an assertion failure - ccall(:jl_forceclose_uv,Void,(Ptr{Void},),stream.handle) - notify_error(stream.readnotify, UVError("readcb",nread)) - else + if nread == UV_ENOBUFS && len == 0 + # remind the client that stream.buffer is full + notify(stream.readnotify) + elseif nread == UV_EOF if isa(stream,TTY) - stream.status = StatusEOF + stream.status = StatusEOF # libuv called stop_reading already notify(stream.readnotify) notify(stream.closenotify) else close(stream) end + else + # This is a fatal connection error. Shutdown requests as per the usual + # close function won't work and libuv will fail with an assertion failure + ccall(:jl_forceclose_uv,Void,(Ptr{Void},),stream.handle) + notify_error(stream.readnotify, UVError("readcb",nread)) end else notify_filled(stream.buffer, nread, base, len) @@ -417,13 +449,14 @@ function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len:: notify(stream.readnotify) end - # Stop reading when - # 1) when we have an infinite buffer, and we have accumulated a lot of unread data OR + # Stop background reading when + # 1) we have accumulated a lot of unread data OR # 2) we have an alternate buffer that has reached its limit. - if (is_maxsize_unlimited(stream.buffer) && (nb_available(stream.buffer) > READ_BUFFER_SZ )) || - (nb_available(stream.buffer) == stream.buffer.maxsize) + if (nb_available(stream.buffer) >= stream.throttle) || + (nb_available(stream.buffer) >= stream.buffer.maxsize) stop_reading(stream) end + nothing end reseteof(x::IO) = nothing @@ -532,7 +565,7 @@ function sleep(sec::Real) end) start_timer(timer, float(sec), 0) try - stream_wait(timer,w) + stream_wait(timer, w) finally stop_timer(timer) end @@ -640,15 +673,15 @@ function start_reading(stream::AsyncStream) end end function start_reading(stream::AsyncStream, cb::Function) - start_reading(stream) + failure = start_reading(stream) stream.readcb = cb nread = nb_available(stream.buffer) if nread > 0 notify_filled(stream, nread) end - nothing + return failure_code end -start_reading(stream::AsyncStream, cb::Bool) = (start_reading(stream); stream.readcb = cb; nothing) +start_reading(stream::AsyncStream, cb::Bool) = (failure_code = start_reading(stream); stream.readcb = cb; return failure_code) function stop_reading(stream::AsyncStream) if stream.status == StatusActive @@ -662,10 +695,9 @@ function stop_reading(stream::AsyncStream) end end -function readall(stream::AsyncStream) - start_reading(stream) - wait_close(stream) - return takebuf_string(stream.buffer) +function readbytes(stream::AsyncStream) + wait_readnb(stream, typemax(Int)) + return takebuf_array(stream.buffer) end function read!{T}(s::AsyncStream, a::Array{T}) @@ -687,16 +719,22 @@ function read!(s::AsyncStream, a::Vector{UInt8}) end if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - wait_readnb(s,nb) + wait_readnb(s, nb) read!(sbuf, a) else - stop_reading(s) # Just playing it safe, since we are going to switch buffers. - newbuf = PipeBuffer(a, nb) - newbuf.size = 0 - s.buffer = newbuf - write(newbuf, sbuf) - wait_readnb(s,nb) - s.buffer = sbuf + try + stop_reading(s) # Just playing it safe, since we are going to switch buffers. + newbuf = PipeBuffer(a, nb) + newbuf.size = 0 # reset the write pointer to the beginning + s.buffer = newbuf + write(newbuf, sbuf) + wait_readnb(s, nb) + finally + s.buffer = sbuf + if !isempty(s.readnotify.waitq) + start_reading(x) # resume reading iff there are currently other read clients of the stream + end + end end return a end @@ -711,8 +749,8 @@ end function read(this::AsyncStream,::Type{UInt8}) buf = this.buffer @assert buf.seekable == false - wait_readnb(this,1) - read(buf,UInt8) + wait_readnb(this, 1) + read(buf, UInt8) end readline(this::AsyncStream) = readuntil(this, '\n') @@ -988,19 +1026,10 @@ function wait_readnb(s::BufferStream, nb::Int) while isopen(s) && nb_available(s.buffer) < nb wait(s.r_c) end - - (nb_available(s.buffer) < nb) && error("closed BufferStream") -end - -function eof(s::BufferStream) - wait_readnb(s,1) - !isopen(s) && nb_available(s.buffer)<=0 end show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open) -nb_available(s::BufferStream) = nb_available(s.buffer) - function wait_readbyte(s::BufferStream, c::UInt8) while isopen(s) && search(s.buffer,c) <= 0 wait(s.r_c) diff --git a/base/testfile.jl b/base/testfile.jl new file mode 100644 index 0000000000000..ddd8bdcf13c0c --- /dev/null +++ b/base/testfile.jl @@ -0,0 +1,12 @@ +function collect{T}(::Type{T}, itr) + # when length() isn't defined this branch might pollute the + # type of the other. + a = Array(T,length(itr)::Integer) + i = 0 + for x in itr + a[i+=1] = x + end + return a +end +Base.Inference.println(Base.Inference.code_typed(collect, Tuple{Type{Int64},Int64})) + diff --git a/test/iobuffer.jl b/test/iobuffer.jl index 96abf73d55400..bc7fac5881a6f 100644 --- a/test/iobuffer.jl +++ b/test/iobuffer.jl @@ -148,7 +148,7 @@ let io=IOBuffer("hello") end # pr #11554 -let io=IOBuffer(SubString("***αhelloworldω***",4,16)) +let io=IOBuffer(SubString("***αhelloworldω***",4,16)), io2 = IOBuffer(b"goodnightmoon", true, true) @test read(io, Char) == 'α' @test_throws ArgumentError write(io,"!") @test_throws ArgumentError write(io,'β') @@ -161,4 +161,18 @@ let io=IOBuffer(SubString("***αhelloworldω***",4,16)) @test readall(io) == "dω" @test bytestring(io) == "αhelloworldω" @test_throws ArgumentError write(io,"!") + @test takebuf_array(io) == b"αhelloworldω" + seek(io, 2) + seekend(io2) + write(io2, io) + @test readall(io) == "" + @test readall(io2) == "" + @test takebuf_string(io) == "αhelloworldω" + seek(io2, 0) + truncate(io2, io2.size - 2) + @test readall(io2) == "goodnightmoonhelloworld" + seek(io2, 0) + write(io2, io2) + @test readall(io2) == "" + @test bytestring(io2) == "goodnightmoonhelloworld" end diff --git a/test/repl.jl b/test/repl.jl index d10d4bc49fb19..231e962f90da7 100644 --- a/test/repl.jl +++ b/test/repl.jl @@ -16,7 +16,7 @@ function fake_repl() Base.link_pipe(stderr_read,true,stderr_write,true) repl = Base.REPL.LineEditREPL(TestHelpers.FakeTerminal(stdin_read, stdout_write, stderr_write)) - stdin_write, stdout_read, stdout_read, repl + stdin_write, stdout_read, stderr_read, repl end # Writing ^C to the repl will cause sigint, so let's not die on that @@ -28,7 +28,7 @@ ccall(:jl_exit_on_sigint, Void, (Cint,), 0) # this should make sure nothing crashes without depending on how exactly the control # characters are being used. if @unix? true : (Base.windows_version() >= Base.WINDOWS_VISTA_VER) -stdin_write, stdout_read, stdout_read, repl = fake_repl() +stdin_write, stdout_read, stderr_read, repl = fake_repl() repl.specialdisplay = Base.REPL.REPLDisplay(repl) repl.history_file = false @@ -261,17 +261,19 @@ master = Base.TTY(RawFD(fdm); readable = true) nENV = copy(ENV) nENV["TERM"] = "dumb" p = spawn(setenv(`$exename --startup-file=no --quiet`,nENV),slave,slave,slave) -Base.start_reading(master) -Base.wait_readnb(master,1) +output = readuntil(master,"julia> ") +if ccall(:jl_running_on_valgrind,Cint,()) == 0 + # If --trace-children=yes is passed to valgrind, we will get a + # valgrind banner here, not just the prompt. + @test output == "julia> " +end write(master,"1\nquit()\n") wait(p) - -ccall(:close,Cint,(Cint,),fds) -output = readall(master) -if ccall(:jl_running_on_valgrind,Cint,()) == 0 - @test output == "julia> 1\r\nquit()\r\n1\r\n\r\njulia> " -end +output = readuntil(master,' ') +@test output == "1\r\nquit()\r\n1\r\n\r\njulia> " +ccall(:close,Cint,(Cint,),fds) # XXX: why does this cause the kernel to throw away all unread data (on OS X)??? +@test readall(master) == "" close(master) end