Skip to content

Commit

Permalink
Fix case when ipc stream has no record batches, only schema (#175)
Browse files Browse the repository at this point in the history
* Fix case when ipc stream has no record batches, only schema

Fixes #158. While the Julia implementation currently doesn't provide
way to avoid writing any record batches, the pyarrow implementation has
more fine-grained control over writing and allows closing an ipc stream
without writing any record batches. In that case, on the Julia side when
reading, we just need to check for this case specifically and if so,
populate some empty columns, since we're currently relying on them being
populated when record batches are read.

* fix metadata
  • Loading branch information
quinnj authored Apr 15, 2021
1 parent c5c77e6 commit bdd0e54
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
i += 1
end
end
anyrecordbatches = false
for batch in BatchIterator(bytes, off)
# store custom_metadata of batch.msg?
header = batch.msg.header
Expand Down Expand Up @@ -258,6 +259,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debug 1 "parsing record batch message: compression = $(header.compression)"
put!(tsks, Threads.@spawn begin
collect(VectorIterator(sch, batch, dictencodings, convert))
Expand All @@ -270,13 +272,20 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
wait(tsk)
lu = lookup(t)
ty = types(t)
# 158; some implementations may send 0 record batches
if !anyrecordbatches
for field in sch.fields
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
end
end
for (nm, col) in zip(names(t), columns(t))
lu[nm] = col
push!(ty, eltype(col))
end
meta = sch !== nothing ? sch.custom_metadata : nothing
if meta !== nothing
getfield(t, :metadata)[] = Dict(x.key=>x.value for x in meta)
getfield(t, :metadata)[] = buildmetadata(meta)
end
return t
end
Expand Down Expand Up @@ -337,6 +346,7 @@ end
buildmetadata(f::Meta.Field) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = Dict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(::Nothing) = nothing
buildmetadata(x::Dict{String, String}) = x

function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)))
columnidx > length(x.schema.fields) && return nothing
Expand Down
13 changes: 13 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ c = Arrow.ToTimestamp(x)
@test eltype(c) == Arrow.Timestamp{Arrow.Flatbuf.TimeUnitModule.MILLISECOND, Symbol("Europe/Paris")}
@test c[1] == Arrow.Timestamp{Arrow.Flatbuf.TimeUnitModule.MILLISECOND, Symbol("Europe/Paris")}(1577836800000)

# 158
# arrow ipc stream generated from pyarrow with no record batches
bytes = UInt8[0xff, 0xff, 0xff, 0xff, 0x78, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00,
0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00,
0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x14, 0x00,
0x00, 0x00, 0x10, 0x00, 0x14, 0x00, 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x10, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x10, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x61, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00]
tbl = Arrow.Table(bytes)
@test length(tbl.a) == 0
@test eltype(tbl.a) == Union{Int64, Missing}

end # @testset "misc"

end

0 comments on commit bdd0e54

Please sign in to comment.