Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use iterators to increase performance of creating Arrow arrays #200

Closed
alamb opened this issue Apr 26, 2021 · 18 comments · Fixed by #384
Closed

Use iterators to increase performance of creating Arrow arrays #200

alamb opened this issue Apr 26, 2021 · 18 comments · Fixed by #384
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate

Comments

@alamb
Copy link
Contributor

alamb commented Apr 26, 2021

Note: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-11897

The overall goal is to create an efficient pipeline from Parquet page data into Arrow arrays, with as little intermediate conversion and memory allocation as possible. It is assumed, that for best performance, we favor doing fewer but larger copy operations (rather than more but smaller). 

Such a pipeline would need to be flexible in order to enable high performance implementations in several different cases:
(1) In some cases, such as plain-encoded number array, it might even be possible to copy / create the array from a single contiguous section from a page buffer. 
(2) In other cases, such as plain-encoded string array, since values are encoded in non-contiguous slices (where value bytes are separated by length bytes) in a page buffer contains multiple values, individual values will have to be copied separately and it's not obvious how this can be avoided.
(3) Finally, in the case of bit-packing encoding and smaller numeric values, page buffer data has to be decoded / expanded before it is ready to copy into an arrow arrow, so a Vec<u8> will have to be returned instead of a slice pointing to a page buffer.

I propose that the implementation is split into three layers - (1) decoder, (2) column reader and (3) array converter layers (not too dissimilar from the current implementation, except it would be based on Iterators), as follows:

(1) Decoder layer:

A decoder output abstraction that enables all of the above cases and minimizes intermediate memory allocation is Iterator<Item = (count, AsRef<[u8]>)>.
Then in case (1) above, where a numeric array could be created from a single contiguous byte slice, such an iterator could return a single item such as (1024, &[u8])
In case (2) above, where each string value is encoded as an individual byte slice, but it is still possible to copy directly from a page buffer, a decoder iterator could return a sequence of items such as (1, &[u8])
And finally in case (3) above, where bit-packed values have to be unpacked/expanded, and it's NOT possible to copy value bytes directly from a page buffer, a decoder iterator could return items representing chunks of values such as (32, Vec<u8>) where bit-packed values have been unpacked and  the chunk size is configured for best performance.

Another benefit of an Iterator-based abstraction is that it would prepare the parquet crate for  migration to async Streams (my understanding is that a Stream is effectively an async Iterator).

(2) Column reader layer:

Then a higher level iterator could combine a value iterator and a (def) level iterator to produce a sequence of ValueSequence(count, AsRef<[u8]>) and NullSequence(count) items from which an arrow array can be created efficiently.

In future, a higher level iterator (for the keys) could be combined with a dictionary value iterator to create a dictionary array.

(3) Array converter layer:

Finally, Arrow arrays would be created from a (generic) higher-level iterator, using a layer of array converters that know what the value bytes and nulls mean for each type of array.

 

[~nevime] , [~Dandandan] , [~jorgecarleitao] let me know what you think

Next steps:
* split work into smaller tasks that could be done over time

@alamb alamb added the arrow Changes to the arrow crate label Apr 26, 2021
@alamb
Copy link
Contributor Author

alamb commented Apr 26, 2021

Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-06T19:50:27.340+0000:

Not much to say other than AWESOME! Look very nuch forward to it!

Comment from Neville Dipale(nevi_me) @ 2021-03-07T09:34:38.277+0000:

This sounds like a solid proposal, I also like the split that you suggest :)

Comment from Daniël Heres(Dandandan) @ 2021-03-07T09:56:26.934+0000:

That sounds like a cool idea. I like the idea of a very thin abstraction that doesn't sacrifice performance.

For the iterator type, I think the count might not be (always) necessary? As it can depend on the datatype, or will be always be the same (1 or 32 / etc) for the other types? Are there situations were we really need the count?

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-07T11:00:25.271+0000:

[~Dandandan] regarding the count values, yes you are right - in the case of string arrays, the generated count values will always equal 1. But the count values may still be useful in cases where a primitive array is split across multiple non-contiguous slices, e.g. due to page boundaries. It may be possible to calculate the count values based on the data type (I have to think more about that), but at the moment I still like how they make the expected value count explicit. This could change during implementation though.

Comment from Daniël Heres(Dandandan) @ 2021-03-07T21:38:03.584+0000:

[~yordan-pavlov] makes sense, thanks!

Comment from Daniël Heres(Dandandan) @ 2021-03-27T11:54:01.798+0000:

[~yordan-pavlov] just checking - any updates to share and could you use some help?
Any idea yet how the work could be split into multiple issues / PRs?
Maybe I could focus on a subtask if we can split the work.

I think it would be amazing to have a faster Parquet reader, even if it "only" is 5-10% - as it's a large performance bottleneck now :).

Do you have some WIP code & experiments that could use a review?

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-28T23:04:23.361+0000:

Hi [~Dandandan], apologies I should have updated on my progress earlier, but I was busy trying things out.

My thinking so far has been in the lines of how to replace pretty much the entire path from parquet pages all the way into arrow arrays using iterators (because I am hoping that an iterator-based implementation would minimize unnecessary memory allocation). Something like this: 
Iterator >> Iterator<(ColumnChunkContext, Page)> >> Iterator<(ValueSliceIterator, DefLevelIterator, RepLevelIterator)>
>> (Iterator, Iterator, Iterator)
So far I have implemented splitting an iterator into multiple (parallel) iterators based on [https://stackoverflow.com/questions/25586681/splitting-iteratora-b-into-iteratora-and-iteratorb#25588440]

This will be useful, as illustrated above, for splitting an iterator over pages into iterators over values, def levels and rep levels which can be consume independently (but usually in parallel).

Also, in the past week I have been working on an splitting an iterator of byte slices into iterators that return no more than batch_size items - I have almost figured out how to do this, I just have to make it a bit more generic and do some more benchmarking. I would also like to do some benchmarking with [https://docs.rs/hyper/0.14.4/hyper/body/struct.Bytes.html] (which appears to be an alternative implementation of the ByteBufferPtr that already exists in the parquet crate).

Figuring out exactly how the work will be split into different PRs is what I will focus on next, but I already have some ideas:

I think would be to start small, by building on PageIterator::next() -> PageReader to produce an iterator of pages, something like:

 
// create iterator of (contiguous) data slices across all pages from all row groups
row_group_iter // iter of PageReader
  // add row group context using the scan() operator
  .iter_mut().flat_map(|x| {
      // the column chunk / row group context is used to store dictionaries for dictionary-encoded chunks
      let context = Rc::new(RefCell::new(IterContext::new()));
      x.map(move |v| (context.clone(), v))
  }) // iter of (mut RowGroupContext, Page)
  .map(|(c, p)| { 
    let mut context = c.borrow_mut();
    get_decoder(p)
  }) // iter of AsRef<[u8]>
  .flatten()
 

Iterating over pages is something that is implemented inconsistently for primitive and complex types, and I would like to ultimately merge the two implementations, so that there is no more primitive or complex array reader, just a single arrow array reader using adapters / converters for different types of arrays.

Also the decoding functionality implemented in each parquet type is only used by the plain decoder (and not used by any other decoder) and I would look to move this away from the types and into the plain decoder where it belongs.

Then, I would look into implementing the Iterator> idea for the different decoders and also into how exactly the adaptors / converters for different types of arrays would work.

I am open to suggestions on how we could collaborate better on this. Let me know what you think.

Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-29T05:40:15.197+0000:

FWIW, I started going through the parquet crate and re-write some parts of it. There are many, many opportunities to improve performance there.

I also agree with you that we should push the "to arrow" to the page level. Also, IMO we should scratch the "DataType" and instead implement a specific implementation for boolean, (i32, i64, float, double), byteArray, FixedByteArray.

I am looking into the encodings, and IMO there is some work groundwork that we need to take before going for the arrow-specific problem.

I am looking at the RLE encoding, and I think that it may not be correct atm. Parquet [expects a 4-byte length|https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3], but we only take a 1-byte length (i.e. up to 255 values). I do not know how we can even read def, ref levels, and boolean values with our encoder atm.

I also found [this crate|https://github.com/tantivy-search/bitpacking/issues] that seems to be implementing the encoding we need, including ordered, with SIMD instructions. We could probably think about depending on it.

What I did so far: created a new repo from scratch and started moving bits by bits things there, going through a full review of the code (my personal way of reading and understanding code).

I think that the easiest way would be to have a call where we would align knowledge and priorities.

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-29T09:24:04.726+0000:

[~jorgecarleitao] thank you for looking into the parquet encoding code; I was also looking into the RLE code, because I needed to understand how it would fit with an Iterator> abstraction. I do agree that the RLE code needs improvement / simplification and it could also be made faster (e.g. using SIMD) and if a library can be used to do all that - great. I also agree that there are many improvement opportunities throughout the parquet crate and it will continue to be an area of focus for me for a while, but sadly I only have a couple of hours per day to spare. 

When you said "to have a call" what did you have in mind in terms of frequency (e.g. weekly, bi-weekly, etc.) and channel (zoom, telegram, etc.) ?

Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-30T06:14:34.779+0000:

Ok, so, just to give a heads up: I have been experimenting with the code, and here is the result so far: [https://github.com/jorgecarleitao/parquet2]

I was able to read a parquet file with arbitrary parallelism (the IO-CPU tradeoff is delegated to downstream). The missing parts are decoding and deserialization, which IMO is what [~yordan-pavlov] is thinking about.

I reduced the problem to: given an iterator of decompressed (but encoded) pages, convert it to an arrow Array. IMO when no encoding is used, we either use a back-to-back or similar (e.g. Int96 is special). When encoding is used, we should probably decode directly to buffers, so that we avoid an extra memcopy.

[~yordan-pavlov], do you use slack? There is an arrow-rust channel on the official Apache slack: [https://the-asf.slack.com/archives/C01QUFS30TD] We could sync there.

 

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-03-30T08:54:56.298+0000:

[~jorgecarleitao] I would be happy to have a chat in Slack, but it appears that an @apache.org email address is necessary to join and I don't have one.

Also, I noticed that in your parquet2 repo, a separate page iterator is created for each row group, very similar to how it works currently. I was planning to wrap multiple row group page iterators into a single iterator returning a sequence of pages from multiple row groups (see the code snippet in my previous comment).

Comment from Daniël Heres(Dandandan) @ 2021-03-30T10:12:09.111+0000:

[~yordan-pavlov] you can join the apache slack here: https://s.apache.org/slack-invite

Comment from Jorge Leitão(jorgecarleitao) @ 2021-03-30T10:48:14.441+0000:

I see. To understand: is there a reason why this should be in [Parquet] instead of in [DataFusion]? I.e. why should we push a specific parallelism strategy to the library?

Asking this because the way I see it, the parquet crate can't tell which use-case is being used on and provide an optimal strategy for (one record per page, per group or per file or per files?). For example, s3 vs hdfs vs local file-system typically require different parallelism strategies.

My hypothesis (which may be wrong!) is that the parquet crate should offer "units of work" that can be divided/parallelized according to IO (e.g. s3 vs filesystem), memory and CPU constraints that each consumer has, and allow consumers of the library (e.g. DataFusion, Polars, Ballista, s3 vs hdfs vs file-system) to design strategies that fit their constraints the best, by assembling these units according to their compute model.

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-05T18:51:49.565+0000:

UPDATE: after spending the past few weeks figuring out how different steps could be implemented on the way from page buffer to arrow array (such as create iterator of pages across row groups, share dictionary data between pages in the same row column chunk, split page buffer into different iterators for data, rep and def levels, and reading batches of values), my next step is going to be implementing this idea end-to-end for a particular type of array (StringArray). In this way the idea can be tested sooner (in terms of performance, etc.), reviewed and feedback collected, before expanding the implementation for more types. I hope to have an initial implementation in about a week.

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-13T21:51:19.494+0000:

I have finally been able to assemble enough code to demonstrate the core idea and have created a branch here [https://github.com/yordan-pavlov/arrow/commit/c62c5394726b79d428a93e2593d0c24da3c9d286#diff-dce1a37fc60ea0c8d13a61bf530abbf9f82aef43224597f31a7ba4d9fe7bd10dR258]

The test doesn't pass yet, but the code compiles and demonstrates how an iterator could be created over many pages from many row groups / column chunks, and then split into separate iterators for (values, def levels, rep levels) and then read in batches.

The iterator is created in ArrowArrayReader::try_new and used in ::next_batch.

My plan is that ArrowArrayReader will replace both PrimitiveArrayReader and ComplexObjectArrayReader when arrow array converters have been implemented for all types.

Feedback is most welcome.

Next steps are:
 * complete implementation to define arrow array converter interface
 * implement decoder iterator for def / rep levels
 * implement decoder iterator for plain encoding
 * implement StringArray converter
 * make unit test pass
 * attempt to replace ComplexObjectArrayReader for StringArrays
 * benchmark performance
 * create initial PR

After this initial PR, implementing arrow array converters for the remaining types could be done in separate PRs.

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-18T20:13:44.587+0000:

UPDATE: over the past few days I managed to finish the core implementation of the new ArrowArrayReader with the key bits being:
 * the converters will only produce an all-value / no-null ArrayData instance - this simplifies the converter interface and keeps all other logic generic
 * if no def levels are available, this no-null ArrayData produced from the converter is simply converted to an array and returned without changes
 * if def levels are available, a BooleanArray is created from the def levels and used to efficiently determine how many values to read and also efficiently insert NULLs using MutableArrayData (with an algorithm very similar to zip()) - this implementation re-uses as much of the existing arrow code as possible
 * the StringArray converter has been implemented as a function before moving to a converter in a later change

Next steps are:
 * implement decoder iterator for def / rep levels
 * implement decoder iterator for plain encoding
 * make unit test pass
 * attempt to replace ComplexObjectArrayReader for StringArrays
 * benchmark performance
 * create initial PR

the latest changes can be found here:

https://github.com/yordan-pavlov/arrow/commit/7299f2a747cc52237c21b9d85df994a66097d731#diff-dce1a37fc60ea0c8d13a61bf530abbf9f82aef43224597f31a7ba4d9fe7bd10dR418

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-21T21:49:50.476+0000:

UPDATE: I have now implemented the level decoder iterator and support for def and rep levels in the ArrowArrayReader here:

[https://github.com/yordan-pavlov/arrow/commit/3a820c58747cf692efaf90b7bc3716d60b6ecb85]

This commit incudes a change to load def / rep levels into Int16Array which is used to efficiently calculate the null bitmap for values from def levels using arrow::compute::eq_scalar.

Comment from Yordan Pavlov(yordan-pavlov) @ 2021-04-25T20:58:50.034+0000:

UPDATE: I have added the ArrayConverter trait, implemented decoder iterators for plain encoding, and the string array test now passes;

the latest changes can be found here: [https://github.com/yordan-pavlov/arrow/commit/dc93466510c6be1c6a21a61b1e948a3fa7959a9a]

Next steps are:
 * attempt to replace ComplexObjectArrayReader for StringArrays
 * implement missing parts to make ArrowArrayReader work for StringArrays (likely RLE and dictionary encodings)
 * benchmark performance
 * create initial PR

@jorgecarleitao jorgecarleitao added enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate and removed arrow Changes to the arrow crate labels Apr 26, 2021
@jorgecarleitao jorgecarleitao changed the title [Rust][Parquet] Use iterators to increase performance of creating Arrow arrays Use iterators to increase performance of creating Arrow arrays Apr 29, 2021
@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented May 1, 2021

UPDATE: I have finally been able to implement enough to replace ComplexObjectArrayReader with ArrowArrayReader for reading StringArrays and run a test query which I have been using for a lot of my performance testing.
Initial results look promising - the overall time of the query has reduced from about 125ms to about 100ms.
I will try to write some proper benchmarks next, in the next couple of days, in order to better compare performance against the previous implementation.

In general I have found that avoiding use of intermediate arrays as much as possible does help for performance and I believe I have finally been able to validate the idea of using iterators. I also think that switching from iterators to async streams should bring further performance improvements as an async runtime should be able to better schedule a combination of disk and CPU intensive tasks.

the latest changes can be found here:
yordan-pavlov/arrow@95ed8a0

@alamb
Copy link
Contributor Author

alamb commented May 2, 2021

FYI @jorgecarleitao

@jorgecarleitao
Copy link
Member

I have been following it and I think this makes a lot of sense; I think we are fully aligned.

fwiw parquet2 uses an equivalent approach, AFAI understood, the difference is that it expects uncompressed pages on the iterator and performs all CPU-intensive operations inside the iteration. Regardless, I think that this is definitely something to PR and merge.

@Dandandan
Copy link
Contributor

Thanks @yordan-pavlov ! a 20% improvement would be already be awesome I think 😎

@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented May 3, 2021

I wrote some benchmarks over the weekend, and the performance improvement ranges from about 10% to about 40% depending on the type of encoding (plain vs dictionary) and type of column (optional vs mandatory), except for the 50% NULL values case where the new implementation is slower.

I will be looking at the code again later this week, to see if I can make some adjustments to improve performance further.

Here are the results:

plain-encoded string values
read StringArray, plain encoded, mandatory, no NULLs - old: time: [3.2249 ms 3.2446 ms 3.2677 ms]
read StringArray, plain encoded, mandatory, no NULLs - new: time: [1.9518 ms 1.9721 ms 1.9952 ms]

read StringArray, plain encoded, optional, no NULLs - old: time: [3.5536 ms 3.5909 ms 3.6302 ms]
read StringArray, plain encoded, optional, no NULLs - new: time: [2.2593 ms 2.2807 ms 2.3042 ms]

read StringArray, plain encoded, optional, half NULLs - old: time: [3.0227 ms 3.0588 ms 3.0978 ms]
read StringArray, plain encoded, optional, half NULLs - new: time: [3.2593 ms 3.3919 ms 3.5220 ms]

dictionary-encoded string values
read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [2.9892 ms 3.0591 ms 3.1435 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [2.7054 ms 2.7271 ms 2.7497 ms]

read StringArray, dictionary encoded, optional, no NULLs - old: time: [3.1137 ms 3.1453 ms 3.1809 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [2.9156 ms 2.9581 ms 3.0054 ms]

read StringArray, dictionary encoded, optional, half NULLs - old: time: [2.7450 ms 2.7824 ms 2.8196 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [3.4127 ms 3.4430 ms 3.4747 ms]

@yordan-pavlov
Copy link
Contributor

UPDATE: I finally had time today to commit the benchmarks I wrote over the weekend, and this latest commit can be found here:
yordan-pavlov/arrow@77129a8

I reduced the size of the generated benchmark data so that the benchmarks take less time, so the results will be different from the ones in my previous post.

With this commit as a baseline, I will be spending the next few days to better understand what affects performance and hopefully do some tweaks to improve performance of the new ArrowArrayReader.
I would also like, as a next step, to do some benchmarking with primitive arrays.

@yordan-pavlov
Copy link
Contributor

UPDATE: after some more benchmarking and some tweaks, the new arrow array reader is now consistently faster in all cases, when reading string arrays, with performance improvement between 14% and 44%. As a next step I would like to also benchmark performance of reading primitive arrays, for a more complete picture, before I create a PR.

Here are the latest benchmark results:

read StringArray, plain encoded, mandatory, no NULLs - old: time: [1.6621 ms 1.6881 ms 1.7159 ms]
read StringArray, plain encoded, mandatory, no NULLs - new: time: [933.62 us 944.74 us 959.72 us]

read StringArray, plain encoded, optional, no NULLs - old: time: [1.7153 ms 1.7293 ms 1.7462 ms]
read StringArray, plain encoded, optional, no NULLs - new: time: [1.0153 ms 1.0186 ms 1.0221 ms]

read StringArray, plain encoded, optional, half NULLs - old: time: [1.4809 ms 1.5016 ms 1.5241 ms]
read StringArray, plain encoded, optional, half NULLs - new: time: [889.95 us 903.29 us 919.65 us]

read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [1.4234 ms 1.4501 ms 1.4812 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [1.2338 ms 1.2401 ms 1.2473 ms]

read StringArray, dictionary encoded, optional, no NULLs - old: time: [1.5339 ms 1.5489 ms 1.5662 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [1.2835 ms 1.2913 ms 1.3010 ms]

read StringArray, dictionary encoded, optional, half NULLs - old: time: [1.3033 ms 1.3308 ms 1.3640 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [984.41 us 992.85 us 1.0026 ms]

and here are the latest changes:
yordan-pavlov/arrow@48138d6

@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented May 12, 2021

UPDATE: I was able to add benchmarks for int32 over the weekend, the latest changes can be found here:
yordan-pavlov/arrow@661ffd3

This should be enough to provide a fairly comprehensive picture of the performance of the new ArrowArrayReader vs the old PrimitiveArrayReader and ComplexObjectArrayReader. However, the results from the int32 benchmarks are mixed - the new arrow array reader is faster than the old PrimitiveArrayReader in all cases except for mandatory columns where it is several times slower.

@Dandandan @jorgecarleitao @alamb Let me know what you think - is the simplification of reading arrow arrays using a single, iterator-based abstraction worth a performance hit in a small number of cases (given the performance improvement in most cases, especially strings and NULLs). Should I create a PR for this work next or should I try to make it even faster or even try to replace the Iterators with async Streams before creating a PR?

here are the benchmark results:
read Int32Array, plain encoded, mandatory, no NULLs - old: time: [8.8238 us 8.9269 us 9.0407 us]
read Int32Array, plain encoded, mandatory, no NULLs - new: time: [22.544 us 22.703 us 22.872 us]

read Int32Array, plain encoded, optional, no NULLs - old: time: [276.80 us 281.58 us 287.08 us]
read Int32Array, plain encoded, optional, no NULLs - new: time: [52.179 us 52.998 us 53.886 us]

read Int32Array, plain encoded, optional, half NULLs - old: time: [454.15 us 462.82 us 472.55 us]
read Int32Array, plain encoded, optional, half NULLs - new: time: [320.11 us 325.34 us 330.93 us]

read Int32Array, dictionary encoded, mandatory, no NULLs - old: time: [47.615 us 48.971 us 50.666 us]
read Int32Array, dictionary encoded, mandatory, no NULLs - new: time: [115.89 us 118.07 us 120.55 us]

read Int32Array, dictionary encoded, optional, no NULLs - old: time: [308.88 us 313.42 us 318.41 us]
read Int32Array, dictionary encoded, optional, no NULLs - new: time: [160.98 us 164.96 us 170.25 us]

read Int32Array, dictionary encoded, optional, half NULLs - old: time: [521.36 us 530.06 us 540.16 us]
read Int32Array, dictionary encoded, optional, half NULLs - new: time: [399.54 us 415.00 us 433.30 us]

@alamb
Copy link
Contributor Author

alamb commented May 14, 2021

@Dandandan @jorgecarleitao @alamb Let me know what you think - is the simplification of reading arrow arrays using a single, iterator-based abstraction worth a performance hit in a small number of cases (given the performance improvement in most cases, especially strings and NULLs). Should I create a PR for this work next or should I try to make it even faster or even try to replace the Iterators with async Streams before creating a PR?

I don't have much of an opinion to add here. I think given the work you have put in to date preparing for merging is probably what I would do next (to avoid this drifting too far)

Most of the benchmarks look good to me. if anyone else is concerned about the slowdown in read Int32Array, dictionary encoded, mandatory, no NULLs we could always profile it and see where the time was going / optimize just that case specifically.

@yordan-pavlov
Copy link
Contributor

@alamb thank you for taking the time to review my benchmark results.
I have done some profiling already, and although I haven't spent very long looking into the results, it's not super obvious where improvements could be made.
There is actually one more idea I would like to try over the weekend, in order to avoid the double memory copy which I suspect is causing the performance degradation (vs current implementation) in the simplest cases.

@Dandandan
Copy link
Contributor

Cool, thanks for the update @yordan-pavlov . Let's see, a small slowdown at some place can be fine if it's offset by other improvements and/or better code quality or design!

@jorgecarleitao
Copy link
Member

ccing @nevi-me since he is the expert here.

I'd say let's go for it. @yordan-pavlov , is the PR decomposable or is not worth the effort trying to split it?

@yordan-pavlov
Copy link
Contributor

@jorgecarleitao the int32 support can be split out in a separate PR, I added it now mostly so that I can benchmark how this approach would work for primitive types.

@yordan-pavlov
Copy link
Contributor

yordan-pavlov commented May 17, 2021

UPDATE: over the weekend I implemented a slightly different idea, which appears to have unlocked a new level of performance: instead of having an iterator (of structs which are essentially references to continuous buffer regions) all the way to the array converter, the iterator is just over pages. From then on, value bytes are read as byte slices (&[u8]) and passed to a callback function in a converter which just copies the byte slice into a MutableBuffer. This minimizes memory allocation and memory copy and also results in a significant performance improvement for string arrays.

Also the time for my datafusion benchmark query has reduced further from 100ms to 70ms (it used to be 125ms before all this work).

There is still an issue with the "read Int32Array, dictionary encoded, mandatory, no NULLs" benchmark, where the new version is still slower, but it is now faster than the previous implementation in all other cases (including "read Int32Array, plain encoded, mandatory, no NULLs" which used to be slower, because the old implementation was already fairly efficient).

Over the next few days I will be looking into a few places in the new code, where I think further improvements could be made.

Here are the latest benchmark results:
read Int32Array, plain encoded, mandatory, no NULLs - old: time: [9.3360 us 9.4986 us 9.6921 us]
read Int32Array, plain encoded, mandatory, no NULLs - new: time: [6.8815 us 6.9941 us 7.1260 us]

read Int32Array, plain encoded, optional, no NULLs - old: time: [250.83 us 254.36 us 258.59 us]
read Int32Array, plain encoded, optional, no NULLs - new: time: [49.452 us 49.547 us 49.686 us]

read Int32Array, plain encoded, optional, half NULLs - old: time: [448.57 us 456.15 us 464.68 us]
read Int32Array, plain encoded, optional, half NULLs - new: time: [340.68 us 349.96 us 361.22 us]

read Int32Array, dictionary encoded, mandatory, no NULLs - old: time: [44.508 us 45.301 us 46.256 us]
read Int32Array, dictionary encoded, mandatory, no NULLs - new: time: [162.29 us 164.37 us 166.87 us]

read Int32Array, dictionary encoded, optional, no NULLs - old: time: [336.00 us 344.43 us 353.51 us]
read Int32Array, dictionary encoded, optional, no NULLs - new: time: [233.54 us 241.86 us 251.34 us]

read Int32Array, dictionary encoded, optional, half NULLs - old: time: [458.47 us 468.36 us 481.06 us]
read Int32Array, dictionary encoded, optional, half NULLs - new: time: [464.21 us 470.32 us 477.61 us]

read StringArray, plain encoded, mandatory, no NULLs - old: time: [1.5856 ms 1.5996 ms 1.6168 ms]
read StringArray, plain encoded, mandatory, no NULLs - new: time: [312.25 us 314.47 us 317.58 us]

read StringArray, plain encoded, optional, no NULLs - old: time: [1.7269 ms 1.7466 ms 1.7679 ms]
read StringArray, plain encoded, optional, no NULLs - new: time: [332.59 us 335.79 us 339.89 us]

read StringArray, plain encoded, optional, half NULLs - old: time: [1.4635 ms 1.4821 ms 1.5060 ms]
read StringArray, plain encoded, optional, half NULLs - new: time: [533.63 us 540.17 us 548.34 us]

read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [1.4385 ms 1.4566 ms 1.4804 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [410.96 us 417.04 us 423.86 us]

read StringArray, dictionary encoded, optional, no NULLs - old: time: [1.5751 ms 1.5966 ms 1.6222 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [456.19 us 462.95 us 470.83 us]

read StringArray, dictionary encoded, optional, half NULLs - old: time: [1.3197 ms 1.3354 ms 1.3561 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [585.26 us 595.95 us 608.60 us]

And here are the latest changes: yordan-pavlov/arrow@8f4dcb1

@yordan-pavlov
Copy link
Contributor

UPDATE: I still haven't been able to figure out why the current implementation in PrimitiveArrayReader is still faster in the "read Int32Array, dictionary encoded, mandatory, no NULLs" benchmark. But I have made the VariableLenDictionaryDecoder even faster - up to 4.8 times faster for reading string arrays compared to the current implementation in ComplexObjectArrayReader.
Here are the relevant benchmark results:

read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [1.3798 ms 1.3884 ms 1.3987 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [280.68 us 288.89 us 298.13 us]

read StringArray, dictionary encoded, optional, no NULLs - old: time: [1.5283 ms 1.5432 ms 1.5601 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [334.56 us 346.34 us 362.41 us]

read StringArray, dictionary encoded, optional, half NULLs - old: time: [1.3208 ms 1.3432 ms 1.3676 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [516.13 us 523.28 us 531.78 us]

And here are the latest changes: yordan-pavlov/arrow@9cac678

@yordan-pavlov
Copy link
Contributor

UPDATE: I did some clean up of the code over the weekend, with minor changes to performance. Sadly the new code is still slower in the read Int32Array, dictionary encoded, mandatory, no NULLs benchmark.
I think what could help would be caching of the value slices in the row group context, but this requires a self-referencing struct and is not an easy thing to do. Also reading dictionary-encoded pages as arrow dictionary arrays could result in further performance improvement and completely eliminate any last remaining performance issues, but that is a topic for another PR.

With that said, because the new implementation is faster in all other benchmarks (in some cases more than 5 times faster), and also cleaner, I still think it is a significant improvement and should be merged. So, in the next few days I will be rebasing on the latest arrow-rs/master and creating a PR.

Here are the latest benchmarks results:

read Int32Array, plain encoded, mandatory, no NULLs - old: time: [8.8516 us 8.9323 us 9.0332 us]
read Int32Array, plain encoded, mandatory, no NULLs - new: time: [6.9189 us 7.0340 us 7.1556 us]

read Int32Array, plain encoded, optional, no NULLs - old: time: [410.12 us 423.31 us 437.25 us]
read Int32Array, plain encoded, optional, no NULLs - new: time: [56.863 us 60.122 us 63.467 us]

read Int32Array, plain encoded, optional, half NULLs - old: time: [467.45 us 477.59 us 489.31 us]
read Int32Array, plain encoded, optional, half NULLs - new: time: [331.12 us 337.25 us 344.02 us]

read Int32Array, dictionary encoded, mandatory, no NULLs - old: time: [43.921 us 44.525 us 45.309 us]
read Int32Array, dictionary encoded, mandatory, no NULLs - new: time: [146.66 us 148.39 us 150.40 us]

read Int32Array, dictionary encoded, optional, no NULLs - old: time: [304.69 us 310.85 us 317.45 us]
read Int32Array, dictionary encoded, optional, no NULLs - new: time: [195.72 us 199.61 us 203.64 us]

read Int32Array, dictionary encoded, optional, half NULLs - old: time: [476.98 us 486.63 us 497.43 us]
read Int32Array, dictionary encoded, optional, half NULLs - new: time: [401.85 us 408.88 us 416.63 us]

read StringArray, plain encoded, mandatory, no NULLs - old: time: [1.6361 ms 1.6459 ms 1.6561 ms]
read StringArray, plain encoded, mandatory, no NULLs - new: time: [301.92 us 311.13 us 320.87 us]

read StringArray, plain encoded, optional, no NULLs - old: time: [2.0373 ms 2.0751 ms 2.1182 ms]
read StringArray, plain encoded, optional, no NULLs - new: time: [343.34 us 350.98 us 359.35 us]

read StringArray, plain encoded, optional, half NULLs - old: time: [1.4777 ms 1.4999 ms 1.5247 ms]
read StringArray, plain encoded, optional, half NULLs - new: time: [587.13 us 605.02 us 626.96 us]

read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [1.4503 ms 1.4681 ms 1.4897 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [269.72 us 275.22 us 281.45 us]

read StringArray, dictionary encoded, optional, no NULLs - old: time: [1.5541 ms 1.5718 ms 1.5911 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [325.79 us 336.16 us 348.02 us]

read StringArray, dictionary encoded, optional, half NULLs - old: time: [1.3192 ms 1.3395 ms 1.3625 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [501.84 us 520.28 us 545.06 us]

@yordan-pavlov
Copy link
Contributor

I finally had some time to check how the new ArrowArrayReader affects TPC-H benchmark results - for queries which use string columns (queries 1 and 12), there is a performance improvement of about 30%, other queries that I tested, which mostly use non-string columns are unaffected. This makes sense as the new ArrowArrayReader is only enabled for string arrays currently.

Here are the results:

before new ArrowArrayReader:
Query 1 avg time: 822.14 ms
Query 3 avg time: 432.85 ms
Query 5 avg time: 698.90 ms
Query 6 avg time: 319.38 ms
Query 12 avg time: 682.50 ms

after new ArrowArrayReader:
Query 1 avg time: 514.88 ms
Query 3 avg time: 441.08 ms
Query 5 avg time: 702.91 ms
Query 6 avg time: 324.05 ms
Query 12 avg time: 425.38 ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants