Skip to content

Commit

Permalink
make start_reading/stop_reading automatic exactly when needed. fix #1925
Browse files Browse the repository at this point in the history
, fix #10655 (closes #11530)

this also makes the read throttle more intelligent so that it doesn't
get tripped up by wait_nb requests for more than READ_BUFFER_SZ bytes
  • Loading branch information
vtjnash committed Jun 3, 2015
1 parent 11d8dde commit 63cda45
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 47 deletions.
1 change: 0 additions & 1 deletion base/LineEdit.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,6 @@ function prompt!(term, prompt, s = init_state(term, prompt))
raw!(term, true)
enable_bracketed_paste(term)
try
Base.start_reading(term)
activate(prompt, s, term)
while true
map = keymap(s, prompt)
Expand Down
2 changes: 0 additions & 2 deletions base/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,9 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
LineEdit.commit_line(s)
# This is slightly ugly but ok for now
terminal = LineEdit.terminal(s)
Base.stop_reading(terminal)
raw!(terminal, false) && disable_bracketed_paste(terminal)
LineEdit.mode(s).on_done(s, LineEdit.buffer(s), true)
raw!(terminal, true) && enable_bracketed_paste(terminal)
Base.start_reading(terminal)
else
break
end
Expand Down
4 changes: 0 additions & 4 deletions base/Terminals.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import Base:
read,
readuntil,
size,
start_reading,
stop_reading,
write,
writemime,
reseteof,
Expand Down Expand Up @@ -194,8 +192,6 @@ readuntil(t::UnixTerminal, s::AbstractString) = readuntil(t.in_stream, s)
readuntil(t::UnixTerminal, c::Char) = readuntil(t.in_stream, c)
readuntil(t::UnixTerminal, s) = readuntil(t.in_stream, s)
read(t::UnixTerminal, ::Type{UInt8}) = read(t.in_stream, UInt8)
start_reading(t::UnixTerminal) = start_reading(t.in_stream)
stop_reading(t::UnixTerminal) = stop_reading(t.in_stream)
eof(t::UnixTerminal) = eof(t.in_stream)

@unix_only hascolor(t::TTYTerminal) = (startswith(t.term_type, "xterm") || success(`tput setaf 0`))
Expand Down
44 changes: 44 additions & 0 deletions base/inference_types.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# parameters limiting potentially-infinite types

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Jun 3, 2015

Member

This file doesn't seem to be referenced anywhere.

const MAX_TYPEUNION_LEN = 3
const MAX_TYPE_DEPTH = 4
const MAX_TUPLETYPE_LEN = 8
const MAX_TUPLE_DEPTH = 4

type NotFound
end

const NF = NotFound()

type StaticVarInfo
sp::SimpleVector # static parameters
cenv::ObjectIdDict # types of closed vars
vars::Array{Any,1} # names of args and locals
gensym_types::Array{Any,1} # types of the GenSym's in this function
vinfo::Array{Any,1} # variable properties
label_counter::Int # index of the current highest label for this function
fedbackvars::ObjectIdDict
end

type VarState
typ
undef::Bool
end

type EmptyCallStack
end

type CallStack
ast
mod::Module
types::Type
recurred::Bool
cycleid::Int
result
prev::Union(EmptyCallStack,CallStack)
sv::StaticVarInfo

CallStack(ast, mod, types::ANY, prev) = new(ast, mod, types, false, 0, Bottom, prev)
end

inference_stack = EmptyCallStack()

1 change: 0 additions & 1 deletion base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ IOBuffer(readable::Bool, writable::Bool) = IOBuffer(UInt8[], readable, writable)
IOBuffer() = IOBuffer(true, true)
IOBuffer(maxsize::Int) = (x=IOBuffer(Array(UInt8,maxsize), true, true, maxsize); x.size=0; x)

is_maxsize_unlimited(io::IOBuffer) = (io.maxsize == typemax(Int))
maxsize(io::IOBuffer) = io.maxsize

read!(from::IOBuffer, a::Array) = read_sub(from, a, 1, length(a))
Expand Down
1 change: 0 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,6 @@ end
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...)
@schedule begin
disable_nagle(r_stream)
Base.start_reading(r_stream)
wait_connected(r_stream)
if r_stream != w_stream
disable_nagle(w_stream)
Expand Down
1 change: 0 additions & 1 deletion base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull)
function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull)
if mode == "r"
processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR))
start_reading(out)
(out, processes)
elseif mode == "w"
processes = @tmp_wpipe tmp inpipe spawn(false, cmds, (tmp,stdio,STDERR))
Expand Down
8 changes: 6 additions & 2 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type TCPSocket <: Socket
closenotify::Condition
sendbuf::Nullable{SimpleIOBuffer}
lock::ReentrantLock
throttle::Int

TCPSocket(handle) = new(
handle,
Expand All @@ -273,7 +274,8 @@ type TCPSocket <: Socket
false, Condition(),
false, Condition(),
nothing,
ReentrantLock()
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ
)
end
function TCPSocket()
Expand Down Expand Up @@ -367,13 +369,15 @@ type UDPSocket <: Socket
recvnotify::Condition
sendnotify::Condition
closenotify::Condition
throttle::Int

UDPSocket(handle::Ptr) = new(
handle,
StatusUninit,
Condition(),
Condition(),
Condition()
Condition(),
DEFAULT_READ_BUFFER_SZ
)
end
function UDPSocket()
Expand Down
90 changes: 55 additions & 35 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ function eof(s::AsyncStream)
!isopen(s) && nb_available(s.buffer)<=0
end

const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB

const StatusUninit = 0 # handle is allocated, but not initialized
const StatusInit = 1 # handle is valid, but not connected/active
const StatusConnecting = 2 # handle is in process of connecting
Expand Down Expand Up @@ -104,6 +106,7 @@ type Pipe <: AsyncStream
closenotify::Condition
sendbuf::Nullable{SimpleIOBuffer}
lock::ReentrantLock
throttle::Int

Pipe(handle) = new(
handle,
Expand All @@ -113,7 +116,8 @@ type Pipe <: AsyncStream
false,Condition(),
false,Condition(),
false,Condition(),
nothing, ReentrantLock())
nothing, ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
end
function Pipe()
handle = Libc.malloc(_sizeof_uv_named_pipe)
Expand Down Expand Up @@ -182,6 +186,7 @@ type TTY <: AsyncStream
closenotify::Condition
sendbuf::Nullable{SimpleIOBuffer}
lock::ReentrantLock
throttle::Int
@windows_only ispty::Bool
function TTY(handle)
tty = new(
Expand All @@ -191,7 +196,8 @@ type TTY <: AsyncStream
PipeBuffer(),
false,Condition(),
false,Condition(),
nothing, ReentrantLock())
nothing, ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
@windows_only tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Void},), handle)!=0
tty
end
Expand Down Expand Up @@ -269,7 +275,25 @@ function init_stdio(handle)
end
end

function stream_wait(x,c...)
function stream_waitn(x::AsyncStream, nb, c...)
preserve_handle(x)
oldthrottle = x.throttle
try
x.throttle = max(nb, oldthrottle)
start_reading(x) # ensure we are reading
return wait(c...)
finally
x.throttle = oldthrottle
if isempty(x.readnotify) && isempty(x.closenotify)
stop_reading(x) # stop reading iff there are no other clients of the stream currently
end
unpreserve_handle(x)
end
end

stream_wait(x::AsyncStream, c...) = stream_waitn(x,1,c...)

function stream_wait(x, c...)
preserve_handle(x)
try
return wait(c...)
Expand Down Expand Up @@ -307,26 +331,24 @@ end
function wait_connected(x)
check_open(x)
while x.status == StatusConnecting
stream_wait(x,x.connectnotify)
stream_wait(x, x.connectnotify)
check_open(x)
end
end

function wait_readbyte(x::AsyncStream, c::UInt8)
while isopen(x) && search(x.buffer,c) <= 0
start_reading(x)
stream_wait(x,x.readnotify)
stream_waitn(x, 1, x.readnotify)
end
end

function wait_readnb(x::AsyncStream, nb::Int)
while isopen(x) && nb_available(x.buffer) < nb
start_reading(x)
stream_wait(x,x.readnotify)
stream_waitn(x, nb - nb_available(x.buffer), x.readnotify)
end
end

wait_close(x) = if isopen(x) stream_wait(x,x.closenotify); end
wait_close(x) = wait_readnb(x, typemax(Int))

#from `connect`
function _uv_hook_connectcb(sock::AsyncStream, status::Int32)
Expand Down Expand Up @@ -394,7 +416,6 @@ function notify_filled(stream::AsyncStream, nread::Int)
end
end

const READ_BUFFER_SZ=10485760 # 10 MB
function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len::UInt)
if nread < 0
if nread != UV_EOF
Expand All @@ -418,10 +439,10 @@ function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len::
end

# Stop reading when
# 1) when we have an infinite buffer, and we have accumulated a lot of unread data OR
# 1) we have accumulated a lot of unread data OR
# 2) we have an alternate buffer that has reached its limit.
if (is_maxsize_unlimited(stream.buffer) && (nb_available(stream.buffer) > READ_BUFFER_SZ )) ||
(nb_available(stream.buffer) == stream.buffer.maxsize)
if (nb_available(stream.buffer) >= stream.throttle) ||
(nb_available(stream.buffer) >= stream.buffer.maxsize)
stop_reading(stream)
end
end
Expand Down Expand Up @@ -532,7 +553,7 @@ function sleep(sec::Real)
end)
start_timer(timer, float(sec), 0)
try
stream_wait(timer,w)
stream_wait(timer, w)
finally
stop_timer(timer)
end
Expand Down Expand Up @@ -640,15 +661,15 @@ function start_reading(stream::AsyncStream)
end
end
function start_reading(stream::AsyncStream, cb::Function)
start_reading(stream)
failure = start_reading(stream)
stream.readcb = cb
nread = nb_available(stream.buffer)
if nread > 0
notify_filled(stream, nread)
end
nothing
return failure_code
end
start_reading(stream::AsyncStream, cb::Bool) = (start_reading(stream); stream.readcb = cb; nothing)
start_reading(stream::AsyncStream, cb::Bool) = (failure_code = start_reading(stream); stream.readcb = cb; return failure_code)

function stop_reading(stream::AsyncStream)
if stream.status == StatusActive
Expand All @@ -663,7 +684,7 @@ function stop_reading(stream::AsyncStream)
end

function readall(stream::AsyncStream)
start_reading(stream)
stream.throttle = typemax(Int)
wait_close(stream)
return takebuf_string(stream.buffer)
end
Expand All @@ -687,16 +708,22 @@ function read!(s::AsyncStream, a::Vector{UInt8})
end

if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_readnb(s,nb)
wait_readnb(s, nb)
read!(sbuf, a)
else
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
newbuf = PipeBuffer(a, nb)
newbuf.size = 0
s.buffer = newbuf
write(newbuf, sbuf)
wait_readnb(s,nb)
s.buffer = sbuf
try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
newbuf = PipeBuffer(a, nb)
newbuf.size = 0 # reset the write pointer to the beginning
s.buffer = newbuf
write(newbuf, sbuf)
wait_readnb(s, nb)
finally
s.buffer = sbuf
if !isempty(x.readnotify) || !isempty(x.closenotify)
start_reading(s)
end
end
end
return a
end
Expand All @@ -711,8 +738,8 @@ end
function read(this::AsyncStream,::Type{UInt8})
buf = this.buffer
@assert buf.seekable == false
wait_readnb(this,1)
read(buf,UInt8)
wait_readnb(this, 1)
read(buf, UInt8)
end

readline(this::AsyncStream) = readuntil(this, '\n')
Expand Down Expand Up @@ -992,15 +1019,8 @@ function wait_readnb(s::BufferStream, nb::Int)
(nb_available(s.buffer) < nb) && error("closed BufferStream")
end

function eof(s::BufferStream)
wait_readnb(s,1)
!isopen(s) && nb_available(s.buffer)<=0
end

show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)

nb_available(s::BufferStream) = nb_available(s.buffer)

function wait_readbyte(s::BufferStream, c::UInt8)
while isopen(s) && search(s.buffer,c) <= 0
wait(s.r_c)
Expand Down
12 changes: 12 additions & 0 deletions base/testfile.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
function collect{T}(::Type{T}, itr)
# when length() isn't defined this branch might pollute the
# type of the other.
a = Array(T,length(itr)::Integer)
i = 0
for x in itr
a[i+=1] = x
end
return a
end
Base.Inference.println(Base.Inference.code_typed(collect, Tuple{Type{Int64},Int64}))

0 comments on commit 63cda45

Please sign in to comment.