Skip to content

Commit

Permalink
Base.isdone for Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
baumgold committed May 30, 2023
1 parent 6fe4ec0 commit b073d85
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,17 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression)
end

Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
function Stream(input, pos::Integer=1, len=nothing; kw...)
b = tobytes(input)
isempty(b) ? Stream(ArrowBlob[]; kw...) : Stream([ArrowBlob(b, pos, len)]; kw...)
end

function Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...)
b = tobytes(input)
isempty(b) ? Stream(ArrowBlob[]; kw...) : Stream([ArrowBlob(b, pos, len)]; kw...)
end

Stream(inputs::AbstractVector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)

function initialize!(x::Stream)
isempty(getfield(x, :names)) || return
Expand All @@ -116,8 +124,14 @@ end

Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
Base.eltype(::Type{Stream}) = Table
Base.isdone(x::Stream) = x.inputindex > length(x.inputs)

function Base.iterate(x::Stream, (pos, id)=(1, 0))
if Base.isdone(x)
x.inputindex = 1
x.batchiterator = nothing
return nothing
end
if isnothing(x.batchiterator)
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
Expand All @@ -132,7 +146,11 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
# check for additional inputs
while state === nothing
x.inputindex += 1
x.inputindex > length(x.inputs) && return nothing
if Base.isdone(x)
x.inputindex = 1
x.batchiterator = nothing
return nothing
end
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
pos = x.batchiterator.startpos
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ tt = Arrow.Table(io)
seekstart(io)
str = Arrow.Stream(io)
@test eltype(str) == Arrow.Table
@test !Base.isdone(str)
state = iterate(str)
@test state !== nothing
tt, st = state
Expand Down

0 comments on commit b073d85

Please sign in to comment.