From 63cda45276525bcdb6f2027b0aa320b1b6cf5a08 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 3 Jun 2015 00:38:47 -0400 Subject: [PATCH] make start_reading/stop_reading automatic exactly when needed. fix #1925, fix #10655 (closes #11530) this also makes the read throttle more intelligent so that it doesn't get tripped up by wait_nb requests for more than READ_BUFFER_SZ bytes --- base/LineEdit.jl | 1 - base/REPL.jl | 2 - base/Terminals.jl | 4 -- base/inference_types.jl | 44 ++++++++++++++++++++ base/iobuffer.jl | 1 - base/multi.jl | 1 - base/process.jl | 1 - base/socket.jl | 8 +++- base/stream.jl | 90 +++++++++++++++++++++++++---------------- base/testfile.jl | 12 ++++++ 10 files changed, 117 insertions(+), 47 deletions(-) create mode 100644 base/inference_types.jl create mode 100644 base/testfile.jl diff --git a/base/LineEdit.jl b/base/LineEdit.jl index e341fe3e509b2..12a16c0b23cb8 100644 --- a/base/LineEdit.jl +++ b/base/LineEdit.jl @@ -1569,7 +1569,6 @@ function prompt!(term, prompt, s = init_state(term, prompt)) raw!(term, true) enable_bracketed_paste(term) try - Base.start_reading(term) activate(prompt, s, term) while true map = keymap(s, prompt) diff --git a/base/REPL.jl b/base/REPL.jl index 21f9ac124bb53..4738e36d33979 100644 --- a/base/REPL.jl +++ b/base/REPL.jl @@ -801,11 +801,9 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep LineEdit.commit_line(s) # This is slightly ugly but ok for now terminal = LineEdit.terminal(s) - Base.stop_reading(terminal) raw!(terminal, false) && disable_bracketed_paste(terminal) LineEdit.mode(s).on_done(s, LineEdit.buffer(s), true) raw!(terminal, true) && enable_bracketed_paste(terminal) - Base.start_reading(terminal) else break end diff --git a/base/Terminals.jl b/base/Terminals.jl index a62e4b85d59e2..c366fdc5a5b62 100644 --- a/base/Terminals.jl +++ b/base/Terminals.jl @@ -30,8 +30,6 @@ import Base: read, readuntil, size, - start_reading, - stop_reading, write, writemime, reseteof, @@ -194,8 +192,6 @@ readuntil(t::UnixTerminal, s::AbstractString) = readuntil(t.in_stream, s) readuntil(t::UnixTerminal, c::Char) = readuntil(t.in_stream, c) readuntil(t::UnixTerminal, s) = readuntil(t.in_stream, s) read(t::UnixTerminal, ::Type{UInt8}) = read(t.in_stream, UInt8) -start_reading(t::UnixTerminal) = start_reading(t.in_stream) -stop_reading(t::UnixTerminal) = stop_reading(t.in_stream) eof(t::UnixTerminal) = eof(t.in_stream) @unix_only hascolor(t::TTYTerminal) = (startswith(t.term_type, "xterm") || success(`tput setaf 0`)) diff --git a/base/inference_types.jl b/base/inference_types.jl new file mode 100644 index 0000000000000..ec252e173676d --- /dev/null +++ b/base/inference_types.jl @@ -0,0 +1,44 @@ +# parameters limiting potentially-infinite types +const MAX_TYPEUNION_LEN = 3 +const MAX_TYPE_DEPTH = 4 +const MAX_TUPLETYPE_LEN = 8 +const MAX_TUPLE_DEPTH = 4 + +type NotFound +end + +const NF = NotFound() + +type StaticVarInfo + sp::SimpleVector # static parameters + cenv::ObjectIdDict # types of closed vars + vars::Array{Any,1} # names of args and locals + gensym_types::Array{Any,1} # types of the GenSym's in this function + vinfo::Array{Any,1} # variable properties + label_counter::Int # index of the current highest label for this function + fedbackvars::ObjectIdDict +end + +type VarState + typ + undef::Bool +end + +type EmptyCallStack +end + +type CallStack + ast + mod::Module + types::Type + recurred::Bool + cycleid::Int + result + prev::Union(EmptyCallStack,CallStack) + sv::StaticVarInfo + + CallStack(ast, mod, types::ANY, prev) = new(ast, mod, types, false, 0, Bottom, prev) +end + +inference_stack = EmptyCallStack() + diff --git a/base/iobuffer.jl b/base/iobuffer.jl index 14737a1fd5f17..d691a6bacd304 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -50,7 +50,6 @@ IOBuffer(readable::Bool, writable::Bool) = IOBuffer(UInt8[], readable, writable) IOBuffer() = IOBuffer(true, true) IOBuffer(maxsize::Int) = (x=IOBuffer(Array(UInt8,maxsize), true, true, maxsize); x.size=0; x) -is_maxsize_unlimited(io::IOBuffer) = (io.maxsize == typemax(Int)) maxsize(io::IOBuffer) = io.maxsize read!(from::IOBuffer, a::Array) = read_sub(from, a, 1, length(a)) diff --git a/base/multi.jl b/base/multi.jl index ad7cb43163659..e1fd604925ee5 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -811,7 +811,6 @@ end function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...) @schedule begin disable_nagle(r_stream) - Base.start_reading(r_stream) wait_connected(r_stream) if r_stream != w_stream disable_nagle(w_stream) diff --git a/base/process.jl b/base/process.jl index 4fad366fd37a8..8031957a101a0 100644 --- a/base/process.jl +++ b/base/process.jl @@ -434,7 +434,6 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull) function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull) if mode == "r" processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR)) - start_reading(out) (out, processes) elseif mode == "w" processes = @tmp_wpipe tmp inpipe spawn(false, cmds, (tmp,stdio,STDERR)) diff --git a/base/socket.jl b/base/socket.jl index 80bf936aa0585..62050577605c5 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -263,6 +263,7 @@ type TCPSocket <: Socket closenotify::Condition sendbuf::Nullable{SimpleIOBuffer} lock::ReentrantLock + throttle::Int TCPSocket(handle) = new( handle, @@ -273,7 +274,8 @@ type TCPSocket <: Socket false, Condition(), false, Condition(), nothing, - ReentrantLock() + ReentrantLock(), + DEFAULT_READ_BUFFER_SZ ) end function TCPSocket() @@ -367,13 +369,15 @@ type UDPSocket <: Socket recvnotify::Condition sendnotify::Condition closenotify::Condition + throttle::Int UDPSocket(handle::Ptr) = new( handle, StatusUninit, Condition(), Condition(), - Condition() + Condition(), + DEFAULT_READ_BUFFER_SZ ) end function UDPSocket() diff --git a/base/stream.jl b/base/stream.jl index 8ff6679309bf4..a1028c4a0605d 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -49,6 +49,8 @@ function eof(s::AsyncStream) !isopen(s) && nb_available(s.buffer)<=0 end +const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB + const StatusUninit = 0 # handle is allocated, but not initialized const StatusInit = 1 # handle is valid, but not connected/active const StatusConnecting = 2 # handle is in process of connecting @@ -104,6 +106,7 @@ type Pipe <: AsyncStream closenotify::Condition sendbuf::Nullable{SimpleIOBuffer} lock::ReentrantLock + throttle::Int Pipe(handle) = new( handle, @@ -113,7 +116,8 @@ type Pipe <: AsyncStream false,Condition(), false,Condition(), false,Condition(), - nothing, ReentrantLock()) + nothing, ReentrantLock(), + DEFAULT_READ_BUFFER_SZ) end function Pipe() handle = Libc.malloc(_sizeof_uv_named_pipe) @@ -182,6 +186,7 @@ type TTY <: AsyncStream closenotify::Condition sendbuf::Nullable{SimpleIOBuffer} lock::ReentrantLock + throttle::Int @windows_only ispty::Bool function TTY(handle) tty = new( @@ -191,7 +196,8 @@ type TTY <: AsyncStream PipeBuffer(), false,Condition(), false,Condition(), - nothing, ReentrantLock()) + nothing, ReentrantLock(), + DEFAULT_READ_BUFFER_SZ) @windows_only tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Void},), handle)!=0 tty end @@ -269,7 +275,25 @@ function init_stdio(handle) end end -function stream_wait(x,c...) +function stream_waitn(x::AsyncStream, nb, c...) + preserve_handle(x) + oldthrottle = x.throttle + try + x.throttle = max(nb, oldthrottle) + start_reading(x) # ensure we are reading + return wait(c...) + finally + x.throttle = oldthrottle + if isempty(x.readnotify) && isempty(x.closenotify) + stop_reading(x) # stop reading iff there are no other clients of the stream currently + end + unpreserve_handle(x) + end +end + +stream_wait(x::AsyncStream, c...) = stream_waitn(x,1,c...) + +function stream_wait(x, c...) preserve_handle(x) try return wait(c...) @@ -307,26 +331,24 @@ end function wait_connected(x) check_open(x) while x.status == StatusConnecting - stream_wait(x,x.connectnotify) + stream_wait(x, x.connectnotify) check_open(x) end end function wait_readbyte(x::AsyncStream, c::UInt8) while isopen(x) && search(x.buffer,c) <= 0 - start_reading(x) - stream_wait(x,x.readnotify) + stream_waitn(x, 1, x.readnotify) end end function wait_readnb(x::AsyncStream, nb::Int) while isopen(x) && nb_available(x.buffer) < nb - start_reading(x) - stream_wait(x,x.readnotify) + stream_waitn(x, nb - nb_available(x.buffer), x.readnotify) end end -wait_close(x) = if isopen(x) stream_wait(x,x.closenotify); end +wait_close(x) = wait_readnb(x, typemax(Int)) #from `connect` function _uv_hook_connectcb(sock::AsyncStream, status::Int32) @@ -394,7 +416,6 @@ function notify_filled(stream::AsyncStream, nread::Int) end end -const READ_BUFFER_SZ=10485760 # 10 MB function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len::UInt) if nread < 0 if nread != UV_EOF @@ -418,10 +439,10 @@ function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len:: end # Stop reading when - # 1) when we have an infinite buffer, and we have accumulated a lot of unread data OR + # 1) we have accumulated a lot of unread data OR # 2) we have an alternate buffer that has reached its limit. - if (is_maxsize_unlimited(stream.buffer) && (nb_available(stream.buffer) > READ_BUFFER_SZ )) || - (nb_available(stream.buffer) == stream.buffer.maxsize) + if (nb_available(stream.buffer) >= stream.throttle) || + (nb_available(stream.buffer) >= stream.buffer.maxsize) stop_reading(stream) end end @@ -532,7 +553,7 @@ function sleep(sec::Real) end) start_timer(timer, float(sec), 0) try - stream_wait(timer,w) + stream_wait(timer, w) finally stop_timer(timer) end @@ -640,15 +661,15 @@ function start_reading(stream::AsyncStream) end end function start_reading(stream::AsyncStream, cb::Function) - start_reading(stream) + failure = start_reading(stream) stream.readcb = cb nread = nb_available(stream.buffer) if nread > 0 notify_filled(stream, nread) end - nothing + return failure_code end -start_reading(stream::AsyncStream, cb::Bool) = (start_reading(stream); stream.readcb = cb; nothing) +start_reading(stream::AsyncStream, cb::Bool) = (failure_code = start_reading(stream); stream.readcb = cb; return failure_code) function stop_reading(stream::AsyncStream) if stream.status == StatusActive @@ -663,7 +684,7 @@ function stop_reading(stream::AsyncStream) end function readall(stream::AsyncStream) - start_reading(stream) + stream.throttle = typemax(Int) wait_close(stream) return takebuf_string(stream.buffer) end @@ -687,16 +708,22 @@ function read!(s::AsyncStream, a::Vector{UInt8}) end if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - wait_readnb(s,nb) + wait_readnb(s, nb) read!(sbuf, a) else - stop_reading(s) # Just playing it safe, since we are going to switch buffers. - newbuf = PipeBuffer(a, nb) - newbuf.size = 0 - s.buffer = newbuf - write(newbuf, sbuf) - wait_readnb(s,nb) - s.buffer = sbuf + try + stop_reading(s) # Just playing it safe, since we are going to switch buffers. + newbuf = PipeBuffer(a, nb) + newbuf.size = 0 # reset the write pointer to the beginning + s.buffer = newbuf + write(newbuf, sbuf) + wait_readnb(s, nb) + finally + s.buffer = sbuf + if !isempty(x.readnotify) || !isempty(x.closenotify) + start_reading(s) + end + end end return a end @@ -711,8 +738,8 @@ end function read(this::AsyncStream,::Type{UInt8}) buf = this.buffer @assert buf.seekable == false - wait_readnb(this,1) - read(buf,UInt8) + wait_readnb(this, 1) + read(buf, UInt8) end readline(this::AsyncStream) = readuntil(this, '\n') @@ -992,15 +1019,8 @@ function wait_readnb(s::BufferStream, nb::Int) (nb_available(s.buffer) < nb) && error("closed BufferStream") end -function eof(s::BufferStream) - wait_readnb(s,1) - !isopen(s) && nb_available(s.buffer)<=0 -end - show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open) -nb_available(s::BufferStream) = nb_available(s.buffer) - function wait_readbyte(s::BufferStream, c::UInt8) while isopen(s) && search(s.buffer,c) <= 0 wait(s.r_c) diff --git a/base/testfile.jl b/base/testfile.jl new file mode 100644 index 0000000000000..ddd8bdcf13c0c --- /dev/null +++ b/base/testfile.jl @@ -0,0 +1,12 @@ +function collect{T}(::Type{T}, itr) + # when length() isn't defined this branch might pollute the + # type of the other. + a = Array(T,length(itr)::Integer) + i = 0 + for x in itr + a[i+=1] = x + end + return a +end +Base.Inference.println(Base.Inference.code_typed(collect, Tuple{Type{Int64},Int64})) +