Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect max rowgroup size in Arrow writer #381

Merged
merged 3 commits into from
Jun 2, 2021

Conversation

nevi-me
Copy link
Contributor

@nevi-me nevi-me commented May 30, 2021

Which issue does this PR close?

Closes #257.

Rationale for this change

Parquet splits batches into row groups, which are normally determined by a max_row_group_size setting.
The Arrow writer could not respect this setting because we cannot slice into structs and arrays correctly.
The issue is that when using array.slice(offset: usize, len: usize), we don't propagate and calculate the slice of child data, leading to only the top-level data being sliced.

What changes are included in this PR?

We use the LevelInfo struct to keep track of its array's offset and length. This allows us to track nested arrays' offsets, and calculate the correct list offsets and lengths.

We then use the arrow::array::slice to perform 0-copy slices from a batch, to limit the row group size correctly.

I have changed all writer tests to use a max row group size, ensuring that we aren't introducing bugs when slicing.

Note that this is related to #225, but I don't think it quite covers all its use-cases.
If we have a sliced recordbatch per #343, we would need to account for its individual array offsets, as there is never a guarantee that a record batch has all child arrays starting from the same offset.

Are there any user-facing changes?

No. All changes are crate-internal.

@nevi-me
Copy link
Contributor Author

nevi-me commented May 30, 2021

CC @crepererum @houqp (we've spoken about this before)

@nevi-me nevi-me requested review from alamb and jorgecarleitao and removed request for alamb May 30, 2021 06:24
@nevi-me nevi-me added the parquet Changes to the parquet crate label May 30, 2021
@codecov-commenter
Copy link

codecov-commenter commented May 30, 2021

Codecov Report

Attention: Patch coverage is 93.33333% with 8 lines in your changes missing coverage. Please review.

Project coverage is 82.62%. Comparing base (f26ffb3) to head (530949e).
Report is 2944 commits behind head on master.

Files with missing lines Patch % Lines
parquet/src/arrow/levels.rs 89.47% 6 Missing ⚠️
parquet/src/arrow/arrow_writer.rs 96.77% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #381      +/-   ##
==========================================
+ Coverage   82.60%   82.62%   +0.02%     
==========================================
  Files         162      162              
  Lines       44199    44275      +76     
==========================================
+ Hits        36509    36583      +74     
- Misses       7690     7692       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

// We currently do not have a way of slicing nested arrays, thus we
// track this manually.
let num_rows = batch.num_rows();
let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ensure that max_row_group_size > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's sufficient :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in thinking this code could result in non-uniform row group sizes?

like if we had max_row_group_size=10 and wrote a RecordBatch with 25 rows, would we get row groups like

(row_group 1: 10 rows)
(row_group 2: 10 rows)
(row_group 3: 5 rows)
(row_group 4: 10 rows)
(row_group 5: 10 rows)
(row_group 6: 5 rows)

?

If so I think this is fine (in that it is technically respecting max_row_group but it might be unexpected from a user, who might expect something more like

(row_group 1: 10 rows)
(row_group 2: 10 rows)
(row_group 3: 10 rows)
(row_group 4: 10 rows)
(row_group 5: 10 rows)

Perhaps it is worth a doc comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a fair observation. I think it's a bit tricky because we would need to get the other 5 records from the next batch.

If we passed all batches at once, we would be able to segment them into equal rows.

This is something we can think of, as I think it's a valid expectation from a user.

I can check if we are able to keep row groups open, so that when the next batch comes in, we take its 5 records

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just adding a note in the docstring would be sufficient at this time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note, thanks :)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nevi-me ! I had some minor additional test coverage suggestions, but otherwise this looks great!

// We currently do not have a way of slicing nested arrays, thus we
// track this manually.
let num_rows = batch.num_rows();
let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in thinking this code could result in non-uniform row group sizes?

like if we had max_row_group_size=10 and wrote a RecordBatch with 25 rows, would we get row groups like

(row_group 1: 10 rows)
(row_group 2: 10 rows)
(row_group 3: 5 rows)
(row_group 4: 10 rows)
(row_group 5: 10 rows)
(row_group 6: 5 rows)

?

If so I think this is fine (in that it is technically respecting max_row_group but it might be unexpected from a user, who might expect something more like

(row_group 1: 10 rows)
(row_group 2: 10 rows)
(row_group 3: 10 rows)
(row_group 4: 10 rows)
(row_group 5: 10 rows)

Perhaps it is worth a doc comment?

@@ -1176,31 +1236,51 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));

one_column_roundtrip("timestamp_second_single_column", values, false);
one_column_roundtrip(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth at least one test that divides into "more than 2" batches as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the SMALL_SIZE to an odd number, and then changed the batch sizes of some tests.

@@ -748,6 +784,8 @@ mod tests {
array_mask: vec![true, true], // both lists defined
max_definition: 0,
level_type: LevelType::Root,
offset: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cool to add a test here where the offset was something other than 0 -- all the examples I see have offset: 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test to list_single_column with an offset.

@nevi-me
Copy link
Contributor Author

nevi-me commented Jun 2, 2021

I've addressed feedback, PTAL @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great @nevi-me

max_definition: 0,
level_type: LevelType::Root,
offset: 0,
length: 5,
offset: 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 7ec3158 into apache:master Jun 2, 2021
@alamb
Copy link
Contributor

alamb commented Jun 4, 2021

I could not automatically cherry pick this to active_release (what will become 4.3.0) so if we want to do so we will have to do it manually

alamb@ip-10-0-0-124:~/Software/arrow-rs$ CHERRY_PICK_SHA=7ec3158     ARROW_GITHUB_API_TOKEN=$ARROW_GITHUB_API_TOKEN CHECKOUT_ROOT=/tmp/arrow-rs  python3 dev/release/cherry-pick-pr.py
CHERRY_PICK_SHA=7ec3158     ARROW_GITHUB_API_TOKEN=$ARROW_GITHUB_API_TOKEN CHECKOUT_ROOT=/tmp/arrow-rs  python3 dev/release/cherry-pick-pr.py
Using checkout in /tmp/arrow-rs
Creating cherry pick from 7ec3158b to cherry_pick_7ec3158b
remote: Total 0 (delta 0), reused 0 (delta 0), pack-reused 0        
From github.com:apache/arrow-rs
 * branch              master     -> FETCH_HEAD
From github.com:apache/arrow-rs
 * branch              active_release -> FETCH_HEAD
fatal: A branch named 'cherry_pick_7ec3158b' already exists.
Command failed: ['git', 'checkout', '-b', 'cherry_pick_7ec3158b']
With output:
--------------
b''
--------------
Traceback (most recent call last):
  File "/Users/alamb/Software/arrow-rs/dev/release/cherry-pick-pr.py", line 153, in <module>
    make_cherry_pick()
  File "/Users/alamb/Software/arrow-rs/dev/release/cherry-pick-pr.py", line 114, in make_cherry_pick
    run_cmd(['git', 'checkout', '-b', new_branch])
  File "/Users/alamb/Software/arrow-rs/dev/release/cherry-pick-pr.py", line 78, in run_cmd
    raise e
  File "/Users/alamb/Software/arrow-rs/dev/release/cherry-pick-pr.py", line 70, in run_cmd
    output = subprocess.check_output(cmd)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/subprocess.py", line 424, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/subprocess.py", line 528, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['git', 'checkout', '-b', 'cherry_pick_7ec3158b']' returned non-zero exit status 128.
alamb@ip-10-0-0-124:~/Software/arrow-rs$ 

Edit -- I will try again once I have cherry-picked #307

alamb pushed a commit that referenced this pull request Jun 8, 2021
* Respect max rowgroup size in Arrow writer

* simplify while loop

* address review feedback
@alamb
Copy link
Contributor

alamb commented Jun 8, 2021

Back ported in #430

alamb added a commit that referenced this pull request Jun 9, 2021
* Respect max rowgroup size in Arrow writer

* simplify while loop

* address review feedback

Co-authored-by: Wakahisa <nevilledips@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parquet WriterProperties.max_row_group_size not wired up
4 participants