Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Use Apache Arrow Parquet Crate #6735

Closed
tustvold opened this issue Feb 8, 2023 · 16 comments
Closed

RFC: Use Apache Arrow Parquet Crate #6735

tustvold opened this issue Feb 8, 2023 · 16 comments
Labels
enhancement New feature or an improvement of an existing feature

Comments

@tustvold
Copy link

tustvold commented Feb 8, 2023

Problem description

Following on from discussions between myself, @ritchie46 and @alamb, I would like to propose migrating polars to use the parquet crate. I am very happy to work on making this happen, but want to get consensus on the path forward first 😄

Disclaimer: I am one of the maintainers of the Apache Arrow crates

Why

Why Not

  • The parquet crate is a non-trivial additional dependency, and whilst it doesn't depend on the top-level arrow crate, it does depend on arrow-cast
  • There will need to be some non-trivial conversion logic, likely using the FFI interfaces, to convert between arrow2 and arrow
  • There will likely be performance regressions for some workloads, in particular the arrow-rs parquet writer is potentially slower
  • The schema inference logic for arrow types not natively supported in parquet may differ (Handling Unsupported Arrow Types in Parquet apache/arrow-rs#1666)
  • This will be a non-trivial amount of code churn. I am happy to write it, but am aware review capacity is a precious resource 😄
@tustvold tustvold added the enhancement New feature or an improvement of an existing feature label Feb 8, 2023
@alamb
Copy link

alamb commented Feb 8, 2023

cc @jorgecarleitao in case he has any thoughts as well.

@ritchie46
Copy link
Member

Thanks for starting this discussion! I think this would be a great addition for polars. Parquet data is one of the most abundant and important data formats for workloads that polars aims.

Given the importance of parquet to polars (and the work being done on parquet2), I'd rather not migrate but add the parquet crate as an additional engine. We already are using the object-store crate so the parquet reader would be very natural fit for that.

Provides a great case-study to improve interoperability between arrow and arrow2, and reduce apache/arrow-rs#1176

I very much agree on this one and I think that by working together on this, we will come up with things that close the gap.

I have been studying the parquet crate a bit, and one thing I wondered we could do was implementing ArrayReader trait for arrow2 arrays. This is something I could help with/do.

This will be a non-trivial amount of code churn. I am happy to write it, but am aware review capacity is a precious resource smile

This will be worth it. As mentioned above, parquet is important. :)

There will need to be some non-trivial conversion logic, likely using the FFI interfaces, to convert between arrow2 and arrow

This might be a change to come up with an abstraction one level higher. These might be traits only where arrow-rs/arrow2 implement those traits. I am just thinking aloud here, it might not be possible.

arrow-traits / nano-arrow
    /             \
  /                \ 
arrow-rs     arrow-2

In any case I think this can be great for both polars and the parquet crate (a lot of users to find bugs) and hopefully the arrow landscape.

@tustvold
Copy link
Author

tustvold commented Feb 8, 2023

I'd rather not migrate but add the parquet crate as an additional engine

Works for me, this would also allow side-by-side comparison

I have been studying the parquet crate a bit, and one thing I wondered we could do was implementing ArrayReader trait for arrow2 arrays. This is something I could help with/do.
This might be a change to come up with an abstraction one level higher

I think ArrayReader is a bit lower-level than we would want (it also is not public), and would involve quite a lot of complexity to reimplement for arrow2. It has very close interactions with RecordReader which is what performs the nested record shredding, you would effectively be reimplementing most of the crate 😆

I'm not sure traits are the way to go here, as then everything ends up generic polluting all the interfaces, it should be possible to transparently do zero-copy conversion from one arrow representation to another, without needing to change any interfaces?

@ritchie46
Copy link
Member

I'm not sure traits are the way to go here, as then everything ends up generic polluting all the interfaces, it should be possible to transparently do zero-copy conversion from one arrow representation to another, without needing to change any interfaces?

Yes, that should definitely be possible via FFI. I mean we already do this with pyarrow.

I think ArrayReader is a bit lower-level than we would want (it also is not public), and would involve quite a lot of complexity to reimplement for arrow2. It has very close interactions with RecordReader which is what performs the nested record shredding, you would effectively be reimplementing most of the crate laughing

Fair enough! 😄

Do you think it would it be possible to arrow-cast out and do the casting from physical to logical types on the polars side?

@tustvold
Copy link
Author

tustvold commented Feb 8, 2023

Do you think it would it be possible to arrow-cast out and do the casting from physical to logical types on the polars side?

It would be hard to handle this outside the crate, as the schema inference logic is fairly complex, see here, and needs to interact with the record shredding to support all the various forms of list. This would likely require exposing implementation details I'm not sure we really wish to expose.

Similarly when reading/writing dictionaries, there are scenarios where you need to go from a PLAIN to/from DICTIONARY representation. This could be reimplemented in parquet, and I actually had a PR that did this, but given the majority of arrow-cast is actually these dictionary conversions, you don't gain a huge amount by doing this.

TLDR I wouldn't do this as part of a first-pass at an integration, longer-term who knows - perhaps polars might even use arrow-cast if the integration effort goes smoothly 😅

@ritchie46
Copy link
Member

Right, I trust you on this. I agree this is very premature optimization on my part. 👍

@tustvold
Copy link
Author

Just starting to work on this now, here is the high-level outline of what I plan to do. I'm not familiar with the polars codebase, so please correct me if I'm about to undertake something stupid 😅

Add a parquet_apache feature flag to polars, polars-plan, polars-pipe, py-polars and polars-io, that will gate the new functionality.

Add a new parquet_apache module to polars-io containing a ApacheParquetReader and ApacheParquetAsyncReader, gated by the parquet_apache feature

Add a new parquet_apache module to polars_pipe::sources called ApacheParquetSource that uses ApacheParquetReader and ApacheParquetAsyncReader as appropriate.

Modify polars_pipe::convert::get_source to use ApacheParquetSource if parquet_apache is enabled and not parquet, this will allow passing the predicate down, which appears to not be done for ParquetSource??

Modify LogicalPlanBuilder::scan_parquet to use ApacheParquetReader or ApacheParquetAsyncReader to obtain row counts, if parquet_apache is enabled and not parquet.

I believe the above should be sufficient to enable apache parquet for streaming execution if parquet_apache is enabled and not parquet.

This is predicated on my understanding that if streaming is set to true optimize_with_scratch will always convert ParquetScan into ParquetSource, and therefore the execution engine won't make use of ParquetExec and friends to collect everything into a single DataFrame. Is this correct, or am I missing some detail? I would very much like to keep the scope of this change down, and restricting to streaming execution seems like the highest value area to target.

@ritchie46
Copy link
Member

Add a new parquet_apache module to polars_pipe::sources called ApacheParquetSource that uses ApacheParquetReader and ApacheParquetAsyncReader as appropriate.

Modify polars_pipe::convert::get_source to use ApacheParquetSource if parquet_apache is enabled and not parquet, this will allow passing the predicate down, which appears to not be done for ParquetSource??

Modify LogicalPlanBuilder::scan_parquet to use ApacheParquetReader or ApacheParquetAsyncReader to obtain row counts, if parquet_apache is enabled and not parquet.

I think we should not dictate control flow by feature flags. This makes sense for Rust users, but for python users I want to be able to compile both readers and expose an engine to the users. So I was thinking of a ParquetEngine enum that dictates which reader we build.

This is predicated on my understanding that if streaming is set to true optimize_with_scratch will always convert ParquetScan into ParquetSource, and therefore the execution engine won't make use of ParquetExec and friends to collect everything into a single DataFrame. Is this correct, or am I missing some detail? I would very much like to keep the scope of this change down, and restricting to streaming execution seems like the highest value area to target.

This is correct. Though I think once we have a BatchedApacheParquetReader exposing them to both is fairly trivial. I also think we should expose them to both as I want to respect the users engine selection. But this final dispatching is something that you can leave to me. ;)

Add a parquet_apache feature flag to polars, polars-plan, polars-pipe, py-polars and polars-io, that will gate the new functionality.

Yes that should do it. 👍

@tustvold
Copy link
Author

So I was thinking of a ParquetEngine enum that dictates which reader we build.

How would this be plumbed through, the reason for feature flags was to avoid having to make breaking API changes. ParquetOptions would be the obvious choice perhaps, but most of the APIs seem to list arguments manually?

@ritchie46
Copy link
Member

We could put the specific options in the enum itself and keep the generic onces as list arguments? Do you expect the arguments to differ much?

Somewhat breaking the API is fine. 👍

@kylebarron
Copy link
Contributor

kylebarron commented Feb 14, 2023

  • There will need to be some non-trivial conversion logic, likely using the FFI interfaces, to convert between arrow2 and arrow

This might be a change to come up with an abstraction one level higher. These might be traits only where arrow-rs/arrow2 implement those traits. I am just thinking aloud here, it might not be possible.

arrow-traits / nano-arrow
    /             \
  /                \ 
arrow-rs     arrow-2

In any case I think this can be great for both polars and the parquet crate (a lot of users to find bugs) and hopefully the arrow landscape.

This sounds like arrow2arrow from this discussion jorgecarleitao/arrow2#629. IMO it would be great to have a small library to handle this, to better integrate the rust arrow ecosystem at large, including but not limited to polars

@tustvold
Copy link
Author

tustvold commented Feb 14, 2023

We could put the specific options in the enum itself and keep the generic onces as list arguments

I'm not sure I follow what you are saying. Where would this enum be placed? I could add it to ParquetOptions but then I'm not sure how to plumb it through methods like LogicalPlanBuilder::scan_parquet which take a list of separate arguments, or LazyFrame::scan_parquet which takes ScanArgsParquet.

Should I make LogicalPlanBuilder::scan_parquet take ScanArgsParquet perhaps, and then add a engine enumeration to ParquetOptions?

This sounds like arrow2arrow from this discussion jorgecarleitao/arrow2#629.

I plan to see what shakes out of this effort, using FFI to convert between the two should be case of a couple of lines of code so may not warrant a separate crate.

It should be as simple as

fn arrow_to_arrow2(array: &dyn Array) -> Result<Box<dyn arrow2::Array>> {
    let array = FFI_ArrowArray::new(array.data());
    let data_type = FFI_ArrowSchema::try_from(array.data_type())?;
    // Safety:
    // Array is valid, and C structs are ABI compatible
    unsafe {import_array_from_c(transmute(array), transmute(data_type))}
}

@jorgecarleitao
Copy link
Collaborator

cc @jorgecarleitao in case he has any thoughts as well.

Thanks for the cc and sorry for the late reply. Lots of great input here!

My opinion of arrow-rs has not fundamentally changed - it continues to be a crate with a design that is prone to unsoundness, and it continues to have unsound cases being found on basic functionality (such as FFI). Exposing Polars to this crate (through the parquet crate) will make people using Polars on servers (e.g. through fastAPI) more likely to be vulnerable.

Regarding parquet itself, my understanding is that:

  • the parquet crate uses unsafe code and this code is not sufficiently tested for edge cases (the invariants of the decoders are hard to understand, e.g. Get MIRI running against parquet crate apache/arrow-rs#614). In comparison, parquet2 is #![forbid(unsafe_code)] (both itself and all its dependencies that do not hit compression) and arrow2's IO module is also #![forbid(unsafe_code)]. All invariants in arrow2 are fully described in the # Safety sections of the documentation.
  • the parquet crate does not protect against basic thrift deserialization panics. parquet2 does so, including situations that result in OOM.
  • the parquet crate has a better support for nested types than parquet2 - we continue to to address this (most issues are now resolved, that I had the time to look at them closely).
  • both parquet and parquet2 crate have the same level of support for filter push down. I believe the benchmarks show a difference because Polars does not leverage page pruning atm.
  • In particular, for queries without filtering where page pruning is relevant, the benchmark is actually 2x faster for Polars for a single column selection, which imo is quite telling about the performance of arrow2 in reading parquet files!
  • afaik arrow2 does not support late materialization yet - I did not know about this neat trick until the awesome blog post - it is a matter of implementing it.
  • parquet2 and arrow2 fully support async and are thus compatible with the (awesome) object_store crate.

One aspect to take into account is that the main contributors of parquet and arrow-rs are paid / part/full time on it, while this is not the case in arrow2 / parquet2. I read somewhere that people cannot work on arrow2 / parquet2 because it is not part of the Apache foundation, but Polars is also not, so I am confused.

Given the above, in view, the main work that needs to happen to address the issues in this post are:

  • stabilize the support for nested Parquet (I have taken this up in arrow2)
  • make Polars leverage Parquet's functionality around filter push down (that Parquet supports)
  • interoperate object_store with arrow2 and Polars

Doing this would result in a significantly simpler dependency tree, in likely faster reading, and in Polars having its core dependencies that fulfill Rust's hypothesis that zero-cost abstractions and idiomatic safe code result in less memory bugs.

Regarding the JSON and CSV, my suggestion is to lift the abstraction to a crate that everyone can benefit from. I have been doing this many times in arrow2 so that others can benefit from this without having to depend on arrow2 itself. Examples:

Case in point is: I love the implementation of the push based json in arrow-rs, but I can't use it in arrow2 without dragging arrow-rs as dependency.

@tustvold
Copy link
Author

tustvold commented Feb 15, 2023

it continues to be a crate with a design that is prone to unsoundness

I'm sorry you still feel this way, but I think we are going to have to agree to disagree at this point. I'm not aware of any major soundness issues in the last 6 months, and frankly I grow tired of this FUD. My stance remains unchanged from apache/arrow-rs#1176 (comment) and I would rather spend my time moving the community forward than continuing to fragment it.

I will defer to @ritchie46, but given this crate makes significant use of unsafe, not to mention pyarrow, I'm not sure how important this is to polars.

the benchmark is actually 2x faster for Polars for a single column selection

DataFusion does not automatically parallelize parquet scans, polars does. The fact this is the only situation in which it is slower despite only using a single thread is perhaps telling. I suppose we shall see...

Regarding the JSON and CSV, my suggestion is to lift the abstraction to a crate that everyone can benefit from

Or we could pool our efforts on making one implementation work well 😄

@jorgecarleitao
Copy link
Collaborator

I'm not aware of any major soundness issues in the last 6 months, and frankly I grow tired of this FUD. My stance remains unchanged from apache/arrow-rs#1176 (comment) and I would rather spend my time moving the community forward than continuing to fragment it.

Ok, that was not my intention and I am sorry that you feel this way. This is certainly not productive. I replied to your comment there to try to find a way forward.

@stinodego
Copy link
Member

Since we've recently incorporated arrow2 and parquet2 into Polars as polars-arrow and polars-parquet, I don't think this is currently planned. I will close this, but feel free to continue discussion here - we can reopen it if warranted.

@stinodego stinodego closed this as not planned Won't fix, can't repro, duplicate, stale Oct 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

6 participants