Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for file appends #105

Closed
ayman-albaz opened this issue Jan 10, 2021 · 8 comments
Closed

Allow for file appends #105

ayman-albaz opened this issue Jan 10, 2021 · 8 comments

Comments

@ayman-albaz
Copy link

Hello,

I'm currently dealing with a problem where I need to be able to append to a *.arrow file. The data needs to be added row by row because its bigger than memory and can only be loaded row by row through a stream.

A good example of what I want to do is this.

using Arrow
using DataFrames
using Tables

open("out.arrow", "a") do file
    for i in massive_stream_iterator
        X = rand(1, 1000) |> Tables.table
        Arrow.write(file, X)
    end
end

Reading from this file generates a 1x1000 sized table not an Nx1000 sized table.

df = Arrow.Table("append.arrow") |> DataFrame
@info size(df)

Is it currently possible for Arrow.jl to do this? I've looked all over the documentation and the closest thing to this was using Table.partitioner, which I can't use since I'm iterating through a massive-bigger-than-memory iterator.

@quinnj
Copy link
Member

quinnj commented Jan 11, 2021

So the tricky thing here is that the arrow format is columnar; so rows have to be collected and columns have to be "built" at some point to write the data out. i.e. it's not physically possible to write out an arrow file row-by-row.

Now, we do have some new tools in the ecosystem that might help in this case. For arrow, if you're working with larger-than-memory data, the solution is to write the data in "batches", which Arrow.write supports when the table source implements Tables.partitions. This allows writing out each "partition" as a separate record batch in the arrow file.

So you're correct in thinking Tables.partitioner could help here, it allows "partitioning" an input lazily, like a lazy map that produces a valid table on each iteration. So the usage in your example above would be more like:

Arrow.write("out.arrow", Tables.partitioner(x->Iterators.take(x, 1000), massive_stream_iterator) )

What this will do is spin up a writing thread for each of Threads.nthreads(), each will call Iterators.take(massive_stream_iterator, 1000), which will iterate/return 1000 elements, those 1000 elements will be made into columns and written out as a single record batch. So the massive_stream_iterator will be written out in record batches of 1000 elements each. This assumes that each element iterated from massive_stream_iterator is a valid "row" according to the Tables.jl interface (i.e. some object that supports at least propertynames(x) and getproperty(x, nm)).

Anyway, let me know if that sounds like something in the right direction. I think it could also be possible to support something like Arrow.append(file, x) if we really needed: we would open file and parse the existing schema and then verify that the new table x matches that schema and then write the single table out as a record batch.

@ayman-albaz
Copy link
Author

Thanks for replying.

I did a slight variation of your post (on a single thread):

using Tables
using Arrow

# Processing to 1xN table
function f(x)
    vec = map(x->parse(Int64, x), split(x, ','))
    Tables.table(hcat(vec...))
end

iostream = open("myfile.csv", "r") # 1000 x 100 file
Arrow.write("out.arrow", Tables.partitioner(f, eachline(iostream) ) # Equivalent to take(x, 1) |> f ?

Looking at my activity monitor, it seems like its loading the entire iterable to memory before doing a single write, which defeats the purpose of what I'm trying to do (massive-sized file conversions).

Am I misinterpreting your post or is this the expected outcome?

I can technically rely on virtual memory for these massive files, however, I have no idea if that's viable on a colleagues PC or in production. Appending would be more reliable for these massive operations IMO, but if that's a limitation of .Arrow files, then it is what it is.

@quinnj
Copy link
Member

quinnj commented Jan 12, 2021

I don't think it's entirely unexpected, I think it's just a matter of the fact that the "processing" work is going so much faster than the IO work, which isn't too surprising. i.e. it's processing each line, parsing, and creating the record batch so quickly then passing it off to the writing thread which can't keep up, so you end up getting large chunks of the processing "in memory" at hte same time.

If you're willing, I'd be interested if you can utilize a new PR I just put up to limit: #106. If you call Arrow.write(file, tbl; ntasks=4), then that should only allow up to 4 record batches at a time to be in memory before being written out then hopefully garbage collected.

@ayman-albaz
Copy link
Author

I tried your branch, and it gets what I wanted done so +1 and thank you.

However, there are two major problems.

Im dealing with a CSV thats Int64[1000 x 10].

function f(x)
    print(1)
    vec = map(x->parse(Int64, x), split(x, ','))
    Tables.table(hcat(vec...))
end

# Input csv = 39 kb
@time Arrow.write(output, Tables.partitioner(f, eachline(input)), ntasks=5)      # 7.582670 seconds, 681 kb arrow file
@time Arrow.write(output Tables.partitioner(f, eachline(input)), ntasks=500)   # 7.582670 seconds, 681 kb arrow file
@time Arrow.write(output Tables.partitioner(f, eachline(myfile)) |> collect)       # 0.002 seconds, 43 kb arrow file

Opening these files with Arrow.Table |> DataFrame shows that they are the same size.

So the two problems are.

  1. Serious performance issues (compared with just collecting the Tables.partitioner iterator)
  2. Bigger file sizes for some reason

@quinnj
Copy link
Member

quinnj commented Jan 13, 2021

I'm pretty sure @time Arrow.write(output Tables.partitioner(f, eachline(myfile)) |> collect) # 0.002 seconds, 43 kb arrow file isn't generating the same output "table" you're expecting.

@ayman-albaz
Copy link
Author

Hmm I see, here is one with working code.

using Arrow
using Tables

function f(x)
    print(1)
    vec = map(x->parse(Int64, x), split(x, ','))
    Tables.table(hcat(vec...))
end

@info stat(input).size     # 38930

# Writing using ntasks
input = "small.csv" # Int[1000 x 10]
output1 = "output1.arrow"
@time Arrow.write(output1, Tables.partitioner(f, eachline(input)), ntasks=5)    # 7.58 seconds
@info stat(output1).size    # 681234

# Writing by collecting first
input = "small.csv"
output2 = "output2.arrow"
@time Arrow.write(output2,  Tables.partitions(((map(f, eachline(input)) |> collect)...)...))    # 0.03
@info stat(output2).size    # 38930

# Comparison
output1_table = Arrow.Table(output1) |> DataFrame
output2_table = Arrow.Table(output2) |> DataFrame
@info output1_table == output2_table    # true

Same output, different file size, different time required.

@baumgold
Copy link
Member

I put up #277 to support appending to Arrow files that are either in Arrow IPC format (already supported thanks to @tanmaykm's #160) or Arrow file format (currently unsupported). @quinnj - would you mind reviewing my PR? Many thanks.

@quinnj
Copy link
Member

quinnj commented Jun 9, 2023

This is supported now

@quinnj quinnj closed this as completed Jun 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants