Skip to content

Commit

Permalink
make start_reading/stop_reading automatic exactly when needed. fix #1925
Browse files Browse the repository at this point in the history
, fix #10655 (closes #11530)

this also makes the read throttle more intelligent so that it doesn't
get tripped up by wait_nb requests for more than READ_BUFFER_SZ bytes
  • Loading branch information
vtjnash committed Jun 3, 2015
1 parent 11d8dde commit 9de4a9a
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 109 deletions.
4 changes: 0 additions & 4 deletions base/LineEdit.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,6 @@ function prompt!(term, prompt, s = init_state(term, prompt))
raw!(term, true)
enable_bracketed_paste(term)
try
Base.start_reading(term)
activate(prompt, s, term)
while true
map = keymap(s, prompt)
Expand All @@ -1585,14 +1584,11 @@ function prompt!(term, prompt, s = init_state(term, prompt))
state = :done
end
if state == :abort
Base.stop_reading(term)
return buffer(s), false, false
elseif state == :done
Base.stop_reading(term)
return buffer(s), true, false
elseif state == :suspend
@unix_only begin
Base.stop_reading(term)
return buffer(s), true, true
end
else
Expand Down
2 changes: 0 additions & 2 deletions base/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,9 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
LineEdit.commit_line(s)
# This is slightly ugly but ok for now
terminal = LineEdit.terminal(s)
Base.stop_reading(terminal)
raw!(terminal, false) && disable_bracketed_paste(terminal)
LineEdit.mode(s).on_done(s, LineEdit.buffer(s), true)
raw!(terminal, true) && enable_bracketed_paste(terminal)
Base.start_reading(terminal)
else
break
end
Expand Down
8 changes: 0 additions & 8 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,6 @@ function eval_user_input(ast::ANY, show_value)
isa(STDIN,TTY) && println()
end

function repl_callback(ast::ANY, show_value)
global _repl_enough_stdin = true
Base.stop_reading(STDIN)
put!(repl_channel, (ast, show_value))
end

_repl_start = Condition()

syntax_deprecation_warnings(warn::Bool) =
ccall(:jl_parse_depwarn, Cint, (Cint,), warn)!=0

Expand Down
93 changes: 67 additions & 26 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# Stateful string
type IOBuffer{T<:AbstractVector{UInt8}} <: IO
data::T # T should support: getindex, setindex!, length, copy!, resize!, T(), and (in a few cases) pointer
data::T # T should support: getindex, setindex!, length, copy!, resize!, and T()
readable::Bool
writable::Bool
seekable::Bool # if not seekable, implementation is free to destroy (compact) past read data
Expand All @@ -29,6 +29,8 @@ function copy(b::IOBuffer)
ret
end

show(io::IO, b::Type{SimpleIOBuffer}) = print(io, "IOBuffer")
show(io::IO, b::Type{IOBuffer}) = print(io, "IOBuffer{?}")
show(io::IO, b::IOBuffer) = print(io, "IOBuffer(data=UInt8[...], ",
"readable=", b.readable, ", ",
"writable=", b.writable, ", ",
Expand All @@ -50,26 +52,32 @@ IOBuffer(readable::Bool, writable::Bool) = IOBuffer(UInt8[], readable, writable)
IOBuffer() = IOBuffer(true, true)
IOBuffer(maxsize::Int) = (x=IOBuffer(Array(UInt8,maxsize), true, true, maxsize); x.size=0; x)

is_maxsize_unlimited(io::IOBuffer) = (io.maxsize == typemax(Int))
maxsize(io::IOBuffer) = io.maxsize

read!(from::IOBuffer, a::Array) = read_sub(from, a, 1, length(a))

function read_sub{T}(from::IOBuffer, a::Array{T}, offs, nel)
function read_sub{T}(from::IOBuffer, a::AbstractArray{T}, offs, nel)
from.readable || throw(ArgumentError("read failed, IOBuffer is not readable"))
isbits(T) || throw(ArgumentError("read from IOBuffer only supports bits types or arrays of bits types; got "*string(T)))
if offs+nel-1 > length(a) || offs < 1 || nel < 0
throw(BoundsError())
end
nb = nel * sizeof(T)
avail = nb_available(from)
adv = min(avail, nb)
copy!(reinterpret(UInt8, a), 1 + (1 - offs) * sizeof(T),
from.data, from.ptr,
adv)
from.ptr += adv
if nb > avail
throw(EOFError())
if isbits(T) && isa(a,Array)
nb = nel * sizeof(T)
avail = nb_available(from)
adv = min(avail, nb)
copy!(pointer_to_array(convert(Ptr{UInt8},pointer(a)), sizeof(a)), # reinterpret(UInt8,a) but without setting the shared data property on a
1 + (1 - offs) * sizeof(T),
from.data,
from.ptr,
adv)
from.ptr += adv
if nb > avail
throw(EOFError())
end
else
for i = offs:offs+nel-1
a[i] = read(to, T)
end
end
return a
end
Expand Down Expand Up @@ -186,29 +194,50 @@ function ensureroom(io::IOBuffer, nshort::Int)
end
return io
end

eof(io::IOBuffer) = (io.ptr-1 == io.size)

function close{T}(io::IOBuffer{T})
if io.writable
resize!(io.data, 0)
else
io.data = T()
end
io.readable = false
io.writable = false
io.seekable = false
io.size = 0
io.maxsize = 0
io.ptr = 1
io.mark = -1
if io.writable
resize!(io.data, 0)
else
io.data = T()
end
nothing
end

isopen(io::IOBuffer) = io.readable || io.writable || io.seekable || nb_available(io) > 0

function bytestring(io::IOBuffer)
io.readable || throw(ArgumentError("bytestring read failed, IOBuffer is not readable"))
io.seekable || throw(ArgumentError("bytestring read failed, IOBuffer is not seekable"))
bytestring(pointer(io.data), io.size)
b = copy!(Array(UInt8, io.size), 1, io.data, 1, io.size)
return isvalid(ASCIIString, b) ? ASCIIString(b) : UTF8String(b)
end

function takebuf_array(io::IOBuffer)
ismarked(io) && unmark(io)
if io.seekable
nbytes = io.size
data = copy!(Array(UInt8, nbytes), 1, io.data, 1, nbytes)
else
nbytes = nb_available(io)
data = read!(io, Array(UInt8, nbytes))
end
if io.writable
io.ptr = 1
io.size = 0
end
data
end
function takebuf_array(io::SimpleIOBuffer)
ismarked(io) && unmark(io)
if io.seekable
data = io.data
Expand All @@ -230,10 +259,17 @@ function takebuf_array(io::IOBuffer)
end
data
end
takebuf_string(io::IOBuffer) = bytestring(takebuf_array(io))
function takebuf_string(io::IOBuffer)
b = takebuf_array(io)
return isvalid(ASCIIString, b) ? ASCIIString(b) : UTF8String(b)
end

function write(to::IOBuffer, from::IOBuffer)
written = write(to, from.a, from.ptr, nb_available(from))
if to === from
from.ptr = from.size + 1
return 0
end
written::Int = write_sub(to, from.data, from.ptr, nb_available(from))
from.ptr += written
written
end
Expand All @@ -243,29 +279,34 @@ function write(to::IOBuffer, p::Ptr, nb::Int)
ensureroom(to, nb)
ptr = (to.append ? to.size+1 : to.ptr)
written = min(nb, length(to.data) - ptr + 1)
unsafe_copy!(pointer(to.data, ptr), p, written)
p_u8 = convert(Ptr{UInt8}, p)
for i = 0:written - 1
@inbounds to.data[ptr + i] = unsafe_load(p_u8 + i)
end
to.size = max(to.size, ptr - 1 + written)
if !to.append to.ptr += written end
written
end

function write_sub{T}(to::IOBuffer, a::Array{T}, offs, nel)
function write_sub{T}(to::IOBuffer, a::AbstractArray{T}, offs, nel)
if offs+nel-1 > length(a) || offs < 1 || nel < 0
throw(BoundsError())
end
local written::Int
if isbits(T)
if isbits(T) && isa(a,Array)
nb = nel * sizeof(T)
ensureroom(to, nb)
ptr = (to.append ? to.size+1 : to.ptr)
written = min(nb, length(to.data) - ptr + 1)
copy!(to.data, ptr,
reinterpret(UInt8, a), 1 + (offs - 1) * sizeof(T),
pointer_to_array(convert(Ptr{UInt8},pointer(a)), sizeof(a)), # reinterpret(UInt8,a) but without setting the shared data property on a
1 + (offs - 1) * sizeof(T),
written)
to.size = max(to.size, ptr - 1 + written)
if !to.append to.ptr += written end
else
written = 0
ensureroom(to, sizeof(a))
for i = offs:offs+nel-1
written += write(to, a[i])
end
Expand Down Expand Up @@ -310,7 +351,7 @@ end
function search(buf::IOBuffer, delim::UInt8)
data = buf.data
for i = buf.ptr : buf.size
b = data[i]
@inbounds b = data[i]
if b == delim
return i - buf.ptr + 1
end
Expand Down
1 change: 0 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,6 @@ end
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...)
@schedule begin
disable_nagle(r_stream)
Base.start_reading(r_stream)
wait_connected(r_stream)
if r_stream != w_stream
disable_nagle(w_stream)
Expand Down
1 change: 0 additions & 1 deletion base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ precompile(Base.rehash!, (Dict{UInt8, Any}, Int))
precompile(Base.reinit_stdio, ())
precompile(Base.reload_path, (ASCIIString,))
precompile(Base.repeat, (ASCIIString, Int))
precompile(Base.repl_callback, (Expr, Int))
precompile(Base.repl_cmd, (Cmd,))
precompile(Base.require, (ASCIIString,))
precompile(Base.rr2id, (RemoteRef,))
Expand Down
4 changes: 1 addition & 3 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull)
function open(cmds::AbstractCmd, mode::AbstractString="r", stdio::AsyncStream=DevNull)
if mode == "r"
processes = @tmp_rpipe out tmp spawn(false, cmds, (stdio,tmp,STDERR))
start_reading(out)
(out, processes)
elseif mode == "w"
processes = @tmp_wpipe tmp inpipe spawn(false, cmds, (tmp,stdio,STDERR))
Expand Down Expand Up @@ -467,8 +466,7 @@ end
function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
(out,pc) = open(cmd, "r", stdin)
!success(pc) && pipeline_error(pc)
wait_close(out)
return takebuf_array(out.buffer)
return readbytes(out)
end

function readall(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
Expand Down
4 changes: 3 additions & 1 deletion base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type TCPSocket <: Socket
closenotify::Condition
sendbuf::Nullable{SimpleIOBuffer}
lock::ReentrantLock
throttle::Int

TCPSocket(handle) = new(
handle,
Expand All @@ -273,7 +274,8 @@ type TCPSocket <: Socket
false, Condition(),
false, Condition(),
nothing,
ReentrantLock()
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ
)
end
function TCPSocket()
Expand Down
Loading

0 comments on commit 9de4a9a

Please sign in to comment.