Skip to content

Commit

Permalink
Fix read!(::LibuvStream) and readbytes!(::LibuvStream)
Browse files Browse the repository at this point in the history
to be consisitent with other ::IOs
See test case
  • Loading branch information
samoconnor committed Jan 19, 2016
1 parent 680573e commit cf6fdc1
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 65 deletions.
23 changes: 12 additions & 11 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -894,32 +894,32 @@ function stop_reading(stream::LibuvStream)
end
end

function readbytes!(s::LibuvStream, b::AbstractArray{UInt8}, nb=length(b))
wait_readnb(s, nb)
nr = nb_available(s)
resize!(b, nr) # shrink to just contain input data if was resized
read!(s.buffer, b)
return nr
function read!(s::LibuvStream, b::Vector{UInt8})
nb = length(b)
r = readbytes!(s, b, nb)
if r < nb
throw(EOFError())
end
return b
end

function readbytes(stream::LibuvStream)
wait_readnb(stream, typemax(Int))
return takebuf_array(stream.buffer)
end

function read!(s::LibuvStream, a::Array{UInt8, 1})
nb = length(a)
function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a))
sbuf = s.buffer
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb

if nb_available(sbuf) >= nb
return read!(sbuf, a)
return readbytes!(sbuf, a, nb)
end

if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_readnb(s, nb)
read!(sbuf, a)
r = readbytes!(sbuf, a, nb)
else
try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
Expand All @@ -928,14 +928,15 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
s.buffer = newbuf
write(newbuf, sbuf)
wait_readnb(s, nb)
r = nb_available(newbuf)
finally
s.buffer = sbuf
if !isempty(s.readnotify.waitq)
start_reading(x) # resume reading iff there are currently other read clients of the stream
end
end
end
return a
return r
end

function read(this::LibuvStream, ::Type{UInt8})
Expand Down
176 changes: 122 additions & 54 deletions test/read.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,99 @@ tasks = []
# Create test file...
filename = joinpath(dir, "file.txt")
text = "C1,C2\n1,2\na,b\n"
open(io-> write(io, text), filename, "w")

# List of IO producers...
l = Vector{Tuple{AbstractString,Function}}()


# File
io = ()->Base.Filesystem.open(filename, Base.Filesystem.JL_O_RDONLY)
s = io()
io = (text) -> begin
open(io-> write(io, text), filename, "w")
Base.Filesystem.open(filename, Base.Filesystem.JL_O_RDONLY)
end
s = io(text)
@test isa(s, IO)
@test isa(s, Base.Filesystem.File)
close(s)
push!(l, ("File", io))


# IOStream
io = ()->open(filename)
s = io()
io = (text) -> begin
open(io-> write(io, text), filename, "w")
open(filename)
end
s = io(text)
@test isa(s, IO)
@test isa(s, IOStream)
close(s)
push!(l, ("IOStream", io))


# IOBuffer
io = ()->IOBuffer(text)
s = io()
io = (text)->IOBuffer(text)
s = io(text)
@test isa(s, IO)
@test isa(s, IOBuffer)
close(s)
push!(l, ("IOBuffer", io))


# TCPSocket

# PR#14627
Base.connect!(sock::TCPSocket, addr::Base.InetAddr) = Base.connect!(sock, addr.host, addr.port)

addr = Base.InetAddr(ip"127.0.0.1", 4444)
io = (text) -> begin
c = Condition()
tsk = @async begin
srv = listen(addr)
notify(c)
sock = accept(srv)
write(sock,text)
close(sock)
close(srv)
end
push!(tasks, tsk)
wait(c)
connect(addr)
end
s = io(text)
@test isa(s, IO)
@test isa(s, TCPSocket)
close(s)
push!(l, ("TCPSocket", io))


@windows ? nothing : begin

# PipeEndpoint
socketname = joinpath(dir, "socket")
io = ()-> begin
c = Base.Condition()
io = (text)-> begin
c = Condition()
tsk = @async begin
con = listen(socketname)
Base.notify(c)
notify(c)
sock = accept(con)
write(sock,text)
close(con)
try write(sock,text) end
close(sock)
close(con)
end
push!(tasks, tsk)
wait(c)
connect(socketname)
end
s = io()
s = io(text)
@test isa(s, IO)
@test isa(s, Base.PipeEndpoint)
close(s)
for tsk in tasks
wait(tsk)
end
push!(l, ("PipeEndpoint", io))


# Pipe
io = () -> open(`echo -n $text`)[1]
s = io()
io = (text) -> open(`echo -n $text`)[1]
s = io(text)
@test isa(s, IO)
@test isa(s, Pipe)
close(s)
Expand All @@ -76,16 +106,26 @@ push!(l, ("Pipe", io))
end

open_streams = []
function cleanup()
for s in open_streams
try close(s) end
end
for tsk in tasks
wait(tsk)
end
end

verbose = false

for (name, f) in l

io = ()->(s=f(); push!(open_streams, s); s)
io = ()->(s=f(text); push!(open_streams, s); s)

#println("$name readall...")
verbose && println("$name readall...")
@test readall(io()) == text
@test readall(io()) == readall(filename)

#println("$name read...")
verbose && println("$name read...")
@test readbytes(io()) == Vector{UInt8}(text)
@test readbytes(io()) == open(readbytes,filename)
@test read(io(), UInt8) == read(IOBuffer(text), UInt8)
Expand All @@ -105,12 +145,12 @@ for (name, f) in l
close(s1)
close(s2)

#println("$name readuntil...")
verbose && println("$name readuntil...")
@test readuntil(io(), '\n') == open(io->readuntil(io,'\n'),filename)
@test readuntil(io(), "\n") == open(io->readuntil(io,"\n"),filename)
@test readuntil(io(), ',') == open(io->readuntil(io,','),filename)

#println("$name eof...")
verbose && println("$name eof...")
n = length(text) - 1
@test read!(io(), Vector{UInt8}(n)) ==
read!(IOBuffer(text), Vector{UInt8}(n))
Expand All @@ -123,55 +163,83 @@ for (name, f) in l
@test_throws EOFError read!(io(), Vector{UInt8}(n))
@test_throws EOFError read!(io(), Vector{UInt8}(n))

#println("$name read!...")
for n = 1:length(text)
@test read!(io(), Vector{UInt8}(n)) ==
read!(IOBuffer(text), Vector{UInt8}(n))
@test read!(io(), Vector{UInt8}(n)) ==
open(io->read!(io, Vector{UInt8}(n)), filename)
end
@test_throws EOFError read!(io(), Vector{UInt8}(length(text)+1))
old_text = text

for text in [
old_text,
UTF8String(['A' + i % 52 for i in 1:(div(Base.SZ_UNBUFFERED_IO,2))]),
UTF8String(['A' + i % 52 for i in 1:( Base.SZ_UNBUFFERED_IO -1)]),
UTF8String(['A' + i % 52 for i in 1:( Base.SZ_UNBUFFERED_IO )]),
UTF8String(['A' + i % 52 for i in 1:( Base.SZ_UNBUFFERED_IO +1)]),
UTF8String(['A' + i % 52 for i in 1:(7 + Base.SZ_UNBUFFERED_IO *3)])
]

verbose && println("$name readbytes!...")
l = length(text)
for n = [1, 2, l-2, l-1, l, l+1, l+2]
a1 = Vector{UInt8}(n);
a2 = Vector{UInt8}(n)
s1 = io()
s2 = IOBuffer(text)
n1 = readbytes!(s1, a1)
n2 = readbytes!(s2, a2)
@test n1 == n2
@test length(a1) == length(a2)
@test a1[1:n1] == a2[1:n2]
@test n <= length(text) || eof(s1)
@test n <= length(text) || eof(s2)
cleanup()
end

#println("$name readline...")
@test readline(io()) == readline(IOBuffer(text))
@test readline(io()) == open(readline,filename)
verbose && println("$name read!...")
l = length(text)
for n = [1, 2, l-2, l-1, l]
@test read!(io(), Vector{UInt8}(n)) ==
read!(IOBuffer(text), Vector{UInt8}(n))
cleanup()
end
@test_throws EOFError read!(io(), Vector{UInt8}(length(text)+1))

#println("$name readlines...")
@test readlines(io()) == readlines(IOBuffer(text))
@test readlines(io()) == open(readlines,filename)
@test collect(eachline(io())) == collect(eachline(IOBuffer(text)))
cleanup()

#println("$name countlines...")
@test countlines(io()) == countlines(IOBuffer(text))
verbose && println("$name readline...")
@test readline(io()) == readline(IOBuffer(text))

#println("$name readcsv...")
@test readcsv(io()) == readcsv(IOBuffer(text))
verbose && println("$name readlines...")
@test readlines(io()) == readlines(IOBuffer(text))
@test collect(eachline(io())) == collect(eachline(IOBuffer(text)))

if !(typeof(io()) in [Base.PipeEndpoint, Pipe])
verbose && println("$name countlines...")
@test countlines(io()) == countlines(IOBuffer(text))

#println("$name position...")
verbose && println("$name readcsv...")
@test readcsv(io()) == readcsv(IOBuffer(text))
end

text = old_text


if !(typeof(io()) in [Base.PipeEndpoint, Pipe, TCPSocket])

verbose && println("$name position...")
@test (s = io(); read!(s, Vector{UInt8}(4)); position(s)) == 4

#println("$name seek...")
verbose && println("$name seek...")
for n = 0:length(text)-1
@test readlines(seek(io(), n)) == readlines(seek(IOBuffer(text), n))
cleanup()
end
#println("$name skip...")
verbose && println("$name skip...")
for n = 0:length(text)-1
@test readlines(seek(io(), n)) == readlines(seek(IOBuffer(text), n))
@test readlines(skip(io(), n)) == readlines(skip(IOBuffer(text), n))
cleanup()
end
#println("$name seekend...")
verbose && println("$name seekend...")
@test readall(seekend(io())) == ""
end
end

for s in open_streams
try close(s) end
end

for tsk in tasks
wait(tsk)
cleanup()
end

end

0 comments on commit cf6fdc1

Please sign in to comment.