Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading an Arrow file with no message batches after the schema seems to produce a partly initialized Table? #158

Closed
msavael opened this issue Mar 30, 2021 · 5 comments · Fixed by #175

Comments

@msavael
Copy link

msavael commented Mar 30, 2021

julia> t1 = load_t1();
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:294 checking for next arrow message: pos = 1
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:317 parsing message: pos = 217, msglen = 208, bodyLength = 0
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:217 parsing schema message
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:224 parsed column from schema: field = Arrow.Flatbuf.Field(name = "c1", nullable = false, type = Arrow.Flatbuf.Date(unit = Arrow.Flatbuf.DateUnitModule.DAY,), dictionary = nothing, children = Arrow.Flatbuf.Field[], custom_metadata = Arrow.Flatbuf.KeyValue[])
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:224 parsed column from schema: field = Arrow.Flatbuf.Field(name = "c2", nullable = false, type = Arrow.Flatbuf.Utf8(), dictionary = nothing, children = Arrow.Flatbuf.Field[], custom_metadata = Arrow.Flatbuf.KeyValue[])
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:294 checking for next arrow message: pos = 217
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:310 message has 0 length; terminating message parsing

julia> t2 = load_t2();
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:294 checking for next arrow message: pos = 1
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:317 parsing message: pos = 217, msglen = 208, bodyLength = 0
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:217 parsing schema message
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:224 parsed column from schema: field = Arrow.Flatbuf.Field(name = "c1", nullable = false, type = Arrow.Flatbuf.Date(unit = Arrow.Flatbuf.DateUnitModule.DAY,), dictionary = nothing, children = Arrow.Flatbuf.Field[], custom_metadata = Arrow.Flatbuf.KeyValue[])
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:224 parsed column from schema: field = Arrow.Flatbuf.Field(name = "c2", nullable = false, type = Arrow.Flatbuf.Utf8(), dictionary = nothing, children = Arrow.Flatbuf.Field[], custom_metadata = Arrow.Flatbuf.KeyValue[])
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:294 checking for next arrow message: pos = 217
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:317 parsing message: pos = 425, msglen = 200, bodyLength = 136
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:252 parsing record batch message: compression = nothing
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:294 checking for next arrow message: pos = 561
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:310 message has 0 length; terminating message parsing
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:335 building top-level column: field = Arrow.Flatbuf.Field(name = "c1", nullable = false, type = Arrow.Flatbuf.Date(unit = Arrow.Flatbuf.DateUnitModule.DAY,), dictionary = nothing, children = Arrow.Flatbuf.Field[], custom_metadata = Arrow.Flatbuf.KeyValue[]), columnidx = 1, nodeidx = 1, bufferidx = 1
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:520 building array: L = Arrow.Flatbuf.Date
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:527 storage type for primitive: T = Arrow.Date{Arrow.Flatbuf.DateUnitModule.DAY, Int32}
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:531 final julia type for primitive: T = Date
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:337 built top-level column: A = Arrow.Primitive{Date, Vector{Arrow.Date{Arrow.Flatbuf.DateUnitModule.DAY, Int32}}}, columnidx = 1, nodeidx = 2, bufferidx = 3
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:335 building top-level column: field = Arrow.Flatbuf.Field(name = "c2", nullable = false, type = Arrow.Flatbuf.Utf8(), dictionary = nothing, children = Arrow.Flatbuf.Field[], custom_metadata = Arrow.Flatbuf.KeyValue[]), columnidx = 2, nodeidx = 2, bufferidx = 3
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:408 building array: L = Arrow.Flatbuf.Utf8()
DEBUG: ~/.julia/packages/Arrow/Re9EM/src/table.jl:337 built top-level column: A = Arrow.List{String, Int32, Vector{UInt8}}, columnidx = 2, nodeidx = 3, bufferidx = 6

julia> t1
Arrow.Table: Error showing value of type Arrow.Table:
ERROR: KeyError: key :c1 not found
Stacktrace:
  [1] getindex
    @ ./dict.jl:482 [inlined]
  [2] getcolumn
    @ ~/.julia/packages/Arrow/Re9EM/src/table.jl:175 [inlined]
  [3] #1
    @ ./none:0 [inlined]
  [4] iterate
    @ ./generator.jl:47 [inlined]
  [5] collect(itr::Base.Generator{Vector{Symbol}, Tables.var"#1#2"{Arrow.Table}})
    @ Base ./array.jl:678
  [6] _totuple
    @ ./tuple.jl:331 [inlined]
  [7] Tuple
    @ ./tuple.jl:303 [inlined]
  [8] NamedTuple(r::Arrow.Table)
    @ Tables ~/.julia/packages/Tables/8FVkV/src/Tables.jl:190
  [9] show(io::IOContext{Base.TTY}, x::Arrow.Table)
    @ Tables ~/.julia/packages/Tables/8FVkV/src/Tables.jl:196
 [10] show(io::IOContext{Base.TTY}, #unused#::MIME{Symbol("text/plain")}, x::Arrow.Table)
    @ Base.Multimedia ./multimedia.jl:47

julia> t2
Arrow.Table: (c1 = [...], c2 = [...])

I've tried to put together the above demonstration, though I can't share the exact files unfortunately. The first example, t1, has no rows / message batches, but otherwise has the same schema as t2. t1 errors when trying to access any column (or show it, etc), but t2 is fine.
Arrow.schema(t1) looks healthy, and identical to Arrow.schema(t2).
Both files read fine with pyarrow.

If you're able to make sense of it, it would be very appreciated. Thanks!

Versions:
julia 1.6.0
Arrow v1.2.4

@msavael
Copy link
Author

msavael commented Apr 7, 2021

A reproduce is actually quite easy. I think this is close to the Arrow message I deal with - schema/metadata, but zero record batches:

import Arrow
import PyCall
pyarrow = PyCall.pyimport("pyarrow")

# write in julia
io = IOBuffer()
t = (;a=Int[])
Arrow.write(seekstart(io), t)
jl_buf = read(seekstart(io))
length(jl_buf) # 256

# write in python
py_data = [pyarrow.array([], type=pyarrow.int64())]
batch = pyarrow.record_batch(py_data, names=["a"])
sink = pyarrow.BufferOutputStream()
writer = pyarrow.ipc.new_stream(sink, batch.schema)
writer.close()
py_buf = collect(reinterpret(UInt8, sink.getvalue())) # comes as Vector{Int8} for some reason
length(py_buf) # 136

# Reading back in python - all 3 of the following work:
pyarrow.ipc.open_stream(py_buf).read_pandas()
pyarrow.ipc.open_stream(jl_buf).read_pandas()
pyarrow.ipc.open_stream(jl_buf[1:128]).read_pandas() # Turns out you only need first 128 bytes (metadata is repeated)

# Reading back in julia: can't read python
Arrow.Table(jl_buf) # reads fine
Arrow.Table(py_buf) # errors

# Arrow.Table: ERROR: KeyError: key :a not found
# Stacktrace:
#   [1] getindex
#     @ ./dict.jl:482 [inlined]
#   [2] getcolumn
#     @ ~/.julia/packages/Arrow/Re9EM/src/table.jl:175 [inlined]
#   [3] #1
#     @ ./none:0 [inlined]
#   [4] iterate
#     @ ./generator.jl:47 [inlined]
#   [5] collect(itr::Base.Generator{Vector{Symbol}, Tables.var"#1#2"{Arrow.Table}})
#     @ Base ./array.jl:678
#   [6] _totuple
#     @ ./tuple.jl:331 [inlined]
#   [7] Tuple
#     @ ./tuple.jl:303 [inlined]
#   [8] NamedTuple(r::Arrow.Table)
#     @ Tables ~/.julia/packages/Tables/8FVkV/src/Tables.jl:190
#   [9] show(io::IOContext{Base.TTY}, x::Arrow.Table)

julia> pyarrow.__version__
"1.0.1"

@quinnj
Copy link
Member

quinnj commented Apr 14, 2021

Digging in now (sorry for the slow reply); it looks like in Arrow.jl, we're writing an empty record batch whereas pyarrow doesn't.

quinnj added a commit that referenced this issue Apr 14, 2021
Fixes #158. While the Julia implementation currently doesn't provide
way to avoid writing any record batches, the pyarrow implementation has
more fine-grained control over writing and allows closing an ipc stream
without writing any record batches. In that case, on the Julia side when
reading, we just need to check for this case specifically and if so,
populate some empty columns, since we're currently relying on them being
populated when record batches are read.
@quinnj
Copy link
Member

quinnj commented Apr 14, 2021

Ok, fix is up: #175

@msavael
Copy link
Author

msavael commented Apr 14, 2021

Thanks! This fixes the example I gave above, but seems to choke when the field supplies custom_metadata. Here's an example:

import Arrow
import PyCall
pyarrow = PyCall.pyimport("pyarrow")

py_data = [pyarrow.array([], type=pyarrow.date32())]
my_schema = pyarrow.schema([
    pyarrow.field("a", pyarrow.date32(), false, metadata=Dict("asd" => "asd")),
])
batch = pyarrow.record_batch(py_data, schema=my_schema)
sink = pyarrow.BufferOutputStream()
writer = pyarrow.ipc.new_stream(sink, batch.schema)
writer.close()
py_buf = collect(reinterpret(UInt8, sink.getvalue())) # comes as Vector{Int8} for some reason
length(py_buf) # 136

Arrow.Table(py_buf) 

ERROR: LoadError: MethodError: no method matching juliaeltype(::Arrow.Flatbuf.Field, ::Arrow.FlatBuffers.Array{Arrow.Flatbuf.KeyValue, UInt32, Arrow.Flatbuf.Field}, ::Bool)
Closest candidates are:
  juliaeltype(::Arrow.Flatbuf.Field, ::Union{Arrow.Flatbuf.Binary, Arrow.Flatbuf.LargeBinary}, ::Any) at ~/.julia/packages/Arrow/u4Ye6/src/eltypes.jl:136
  juliaeltype(::Arrow.Flatbuf.Field, ::Union{Arrow.Flatbuf.LargeUtf8, Arrow.Flatbuf.Utf8}, ::Any) at ~/.julia/packages/Arrow/u4Ye6/src/eltypes.jl:131
  juliaeltype(::Arrow.Flatbuf.Field, ::Union{Arrow.Flatbuf.LargeList, Arrow.Flatbuf.List}, ::Any) at ~/.julia/packages/Arrow/u4Ye6/src/eltypes.jl:380
  ...
Stacktrace:
 [1] Arrow.Table(bytes::Vector{UInt8}, off::Int64, tlen::Nothing; convert::Bool)
   @ Arrow ~/.julia/packages/Arrow/u4Ye6/src/table.jl:278
 [2] Table (repeats 2 times)
   @ ~/.julia/packages/Arrow/u4Ye6/src/table.jl:191 [inlined]

@quinnj
Copy link
Member

quinnj commented Apr 14, 2021

Ah, thanks for checking that out. There was something in my brain that was telling me that just passing f.custom_metadata wasn't quite right, but I didn't take enough time to remember that we need to convert arrow metadata from a Vector{Pair{String, String}} to a Dict{String, String}. I just pushed a commit that should fix it.

quinnj added a commit that referenced this issue Apr 15, 2021
* Fix case when ipc stream has no record batches, only schema

Fixes #158. While the Julia implementation currently doesn't provide
way to avoid writing any record batches, the pyarrow implementation has
more fine-grained control over writing and allows closing an ipc stream
without writing any record batches. In that case, on the Julia side when
reading, we just need to check for this case specifically and if so,
populate some empty columns, since we're currently relying on them being
populated when record batches are read.

* fix metadata
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants