Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit column mask for selection in parquet: ProjectionMask (#1701) #1716

Merged
merged 5 commits into from
May 24, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented May 21, 2022

Which issue does this PR close?

Closes #1701.
Closes #1653

Rationale for this change

The current API is confusing, surfacing errors at runtime, and is liable to accidental misuse - apache/datafusion#2453 and apache/datafusion#2543.

What changes are included in this PR?

This adds an explicit ColumnMask that replaces the iterators of indices. This makes for a cleaner API that should hopefully make it harder to accidentally misuse.

In particular it also adds the ability to construct a RecordReader based on a mask of root columns, as opposed to leaf columns. This is the core behind apache/datafusion#2543

Are there any user-facing changes?

Yes, this changes the arrow projection API

@tustvold tustvold added the api-change Changes to the arrow API label May 21, 2022
@github-actions github-actions bot added the parquet Changes to the parquet crate label May 21, 2022
@tustvold tustvold force-pushed the explicit-column-mask branch from 284724e to 489d54a Compare May 21, 2022 10:02
// comes from. For instance: the leaf `a.b.c.d` would have a link back to `a`:
// -- a <-----+
// -- -- b |
// -- -- -- c |
// -- -- -- -- d
leaf_to_base: Vec<TypePtr>,
leaf_to_base: Vec<usize>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change to store the root index, as opposed to a copy of the root ptr makes it easier to convert from a root mask to a leaf mask

///
/// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
pub fn leaves(
schema: &SchemaDescriptor,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mask could theoretically carry this along and use it as a sanity check, but I'm inclined to think if a user constructs a mask and then uses it on a different schema, it's not something we can reasonably be expected to handle sensibly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as a useful error is produced, I agree this is fine behavior

@@ -155,24 +100,24 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
Ok(message) => message
.header_as_schema()
.map(arrow::ipc::convert::fb_to_schema)
.ok_or(ArrowError("the message is not Arrow Schema".to_string())),
.ok_or(arrow_err!("the message is not Arrow Schema")),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by cleanup to move to arrow_err! macro

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using arrow_err is not obviously better to me: it is the same verbosity but now requires looking / knowing what the arrow_err! macro does

;0 But it is not worse either

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's consistent with how errors are handled elsewhere in the crate, with arrow_err!, general_err!, etc...

@codecov-commenter
Copy link

codecov-commenter commented May 21, 2022

Codecov Report

Merging #1716 (2b327ef) into master (a30e787) will decrease coverage by 0.00%.
The diff coverage is 79.02%.

❗ Current head 2b327ef differs from pull request most recent head 637cb2e. Consider uploading reports for the commit 637cb2e to get more accurate results

@@            Coverage Diff             @@
##           master    #1716      +/-   ##
==========================================
- Coverage   83.32%   83.31%   -0.01%     
==========================================
  Files         195      196       +1     
  Lines       56023    55961      -62     
==========================================
- Hits        46681    46625      -56     
+ Misses       9342     9336       -6     
Impacted Files Coverage Δ
arrow/src/array/transform/mod.rs 86.96% <ø> (+0.16%) ⬆️
arrow/src/compute/util.rs 98.90% <ø> (-0.01%) ⬇️
parquet/src/arrow/async_reader.rs 0.00% <0.00%> (ø)
parquet/src/arrow/mod.rs 44.44% <44.44%> (ø)
parquet/src/arrow/schema.rs 96.81% <75.00%> (+0.02%) ⬆️
arrow/src/ipc/reader.rs 88.72% <80.00%> (-0.08%) ⬇️
parquet/src/schema/types.rs 83.73% <80.00%> (-0.10%) ⬇️
arrow/src/array/data.rs 83.65% <85.71%> (-0.86%) ⬇️
arrow/src/ipc/writer.rs 80.37% <100.00%> (+0.06%) ⬆️
parquet/src/arrow/array_reader/builder.rs 93.50% <100.00%> (-0.05%) ⬇️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a30e787...637cb2e. Read the comment docs.

@tustvold
Copy link
Contributor Author

As a happy side-effect this actually found and fixed a bug in the handling of nested projection pushdown in ParquetRecordBatchStream

Some(projection) => {
if let Some(col) = projection.iter().find(|x| **x >= num_columns) {
return Err(general_err!(
"column projection {} outside bounds of schema 0..{}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check was actually incorrect as it was checking against the arrow schema not the parquet schema. I think this demonstrates the footgun prone nature of the old API

@alamb alamb changed the title Add explicit column mask construction (#1701) Add explicit column mask construction in parquet: ProjectionMask (#1701) May 21, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks like a great improvement. Thanks @tustvold

cc @viirya @sunchao @nevi-me

///
/// i.e. `[0, 1, 2]` will construct the same mask as `[1, 0, 0, 2]`
pub fn leaves(
schema: &SchemaDescriptor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as a useful error is produced, I agree this is fine behavior

/// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project
#[derive(Debug, Clone)]
pub struct ProjectionMask {
/// A mask of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment seems to be truncated.

Also, since we have Bitmap and all the associated handling in Arrow, I wonder if it is worth using that (though a Vec<bool> is nice and simple

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Describing in the docstring that a mask of None means All is probably also a good idea as well as which schema a ProjectionMasks indexes refer to (I think Parquet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since we have Bitmap and all the associated handling in Arrow, I wonder if it is worth using that (though a Vec is nice and simple

Let's stick with simple, and maybe if/when we promote this construct to arrow-rs we can switch to using Bitmap


/// Create a [`ProjectionMask`] which selects only the specified leaf columns
///
/// Note: repeated or out of order indices will not impact the final mask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice -- so the idea is that you enforce masks having in-order indicies by wrapping them in a ProjectionMask which enforces this invariant during construction 👍

Self { mask: None }
}

/// Create a [`ProjectionMask`] which selects only the specified leaf columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain (or provide a link to something that explains) leaves and roots and what "order" they are in. I think it refers to the parquet schema (or maybe the arrow schema and types within Structs / LIsts / others nested types?)

@@ -51,74 +52,18 @@ pub fn parquet_to_arrow_schema(
) -> Result<Schema> {
parquet_to_arrow_schema_by_columns(
parquet_schema,
0..parquet_schema.columns().len(),
ProjectionMask::all(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

parquet_schema: &SchemaDescriptor,
column_indices: T,
mask: ProjectionMask,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this needs an owned mask or if it could be taken by reference

Suggested change
mask: ProjectionMask,
mask: &ProjectionMask,

Copy link
Contributor Author

@tustvold tustvold May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently yes, it gets moved into the Visitor. Theoretically it could borrow and have lifetimes, but in most cases I suspect we have the mask by value anyway.

Edit: This might be an argument to move to arrow Bitmap, as that is internally refcounted... Future PR me thinks

@@ -155,24 +100,24 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
Ok(message) => message
.header_as_schema()
.map(arrow::ipc::convert::fb_to_schema)
.ok_or(ArrowError("the message is not Arrow Schema".to_string())),
.ok_or(arrow_err!("the message is not Arrow Schema")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using arrow_err is not obviously better to me: it is the same verbosity but now requires looking / knowing what the arrow_err! macro does

;0 But it is not worse either

@@ -1188,9 +1112,9 @@ mod tests {
// required int64 leaf5;

let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is much nicer to read / reason about

@tustvold tustvold merged commit ca1d85f into apache:master May 24, 2022
@alamb alamb changed the title Add explicit column mask construction in parquet: ProjectionMask (#1701) Add explicit column mask for selection in parquet: ProjectionMask (#1701) May 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API parquet Changes to the parquet crate
Projects
None yet
3 participants