Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet Scan Filter #1191

Closed
Tracked by #3147 ...
tustvold opened this issue Jan 17, 2022 · 18 comments
Closed
Tracked by #3147 ...

Parquet Scan Filter #1191

tustvold opened this issue Jan 17, 2022 · 18 comments
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@tustvold
Copy link
Contributor

tustvold commented Jan 17, 2022

I hope to work on this after finishing up dictionary preservation (#1180) and async (#1154), creating now for feedback and visibility

Background

IOx is a columnar, time-series database that uses parquet for its persistence format. The data model consists of a number of relatively low cardinality tag columns, a timestamp column and a number of value columns. Data is stored in order of column cardinality, with the lowest cardinality columns first.

Almost all IOx queries contain highly selective predicates on tag columns, and therefore predicate pushdown is very important to IOx's performance story.

Currently the parquet crate only supports predicate pushdown in the form of row group pruning, that is using statistics to skip reading entire row groups. Unfortunately this is only very effective if a file is both large enough to contain multiple row groups, and the predicate is on a column early in the sort order.

An obvious next step might be to perform page-pruning, that is using page metadata to skip pages. However, the page metadata is typically stored inside the page header, and so you likely end up reading the entire page from the backing store and decompressing it only to then throw it away, which is not ideal.

There is an optional extension to parquet called PageIndex that tries to address this. However, I'm not aware of any systems outside of Impala that support this functionality, and it is still constrained by the page granularity and the limitations of relying on aggregate statistics.

Proposal

Instead I would like to propose adding functionality to allow providing a row selection mask when scanning a column chunk. When provided, only values (null or otherwise) with a corresponding set bit in the selection mask will be returned in the output. For simplicity nested data will not be supported, at least initially.

This would allow IOx, or potentially DataFusion depending on where the logic for this eventually sits, to do the following for pushing down predicates, in addition to the current row group filtering:

For each column in an order determined by some heuristic:

  • Stream out the data for the column with the current selection mask (initially all true)
  • Evaluate the column predicate on the returned array
  • Use this to refine the selection mask for the next column

Once all the predicates that can be evaluated in this way have been evaluated, a final scan can be performed with the final selection mask across the full set of projected columns.

Potential Problems

I'm currently aware of the following potential problems with this approach, but let me know if I've missed something:

Despite this I am optimistic about the feasibility of this approach, as it is very similar to the one @e-dard has successfully applied to IOx's read buffer - an in-memory, read-only, queryable data structure.

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

This would allow IOx, or potentially DataFusion depending on where the logic for this eventually sits, to do the following for pushing down predicates, in addition to the current row group filtering:

I think it would be best to implement in DataFusion if at all possible -- this logic and need is not at all specific to IOx and so the community would benefit (and also likely help maintain this) if it were in IOx

One thing to consider for the strategy described is that it may actually slow down parquet scanning for non selective predicates (e.g. predicates that filter out none of the rows).

Another thing is that the order in which the predicates are evaluated may make a difference (e.g. if there is a predicate that filters out all but a few rows, and a predicate that doesn't filter any, applying the predicate that filters out most of the rows first is likely to be faster

I think the challenge of non-selective predicates and order should not be handled by the actual parquet reader (that is the query engine should specify what predicates and in what order to apply them).

@nevi-me
Copy link
Contributor

nevi-me commented Jan 18, 2022

In the Impala implementation, there was negligible impact on unsorted/random data https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/. If parquet-rs can correctly store and retrieve the column sort statistics, that would help with the evaluation decision of whether to use a page index or not.

I think the predicate evaluation would best live in parquet as it can get complex for some pages. So datafusion and other processing engine implementing the logic on their own would repeat this non-trivial work.

@tustvold
Copy link
Contributor Author

I think it would be best to implement in DataFusion if at all possible

Agreed, I was somewhat hedging here 😆

In the Impala implementation, there was negligible impact on unsorted/random data https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/.

That's a great link 👍, and yeah the ability to prune on aggregate statistics is very dependent on the sort order.

that would help with the evaluation decision of whether to use a page index or not.
I think the predicate evaluation would best live in parquet as it can get complex for some pages

I was somewhat hoping to avoid using the page index, in part for this reason, as it would require pushing predicate evaluation into the parquet crate, but also because of the small matter that we don't currently read or write it 😅

I believe in most cases a selection mask will perform similarly or significantly better, allowing skipping pages and even runs within pages, whilst also not requiring predicate evaluation logic to leak into the parquet crate?

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

I think the predicate evaluation would best live in parquet as it can get complex for some pages. So datafusion and other processing engine implementing the logic on their own would repeat this non-trivial work.

I agree that predicate evaluation should be done in parquet -- what @e-dard did in the IOx ReadBuffer was to implement predicates that could be evaluated very fast (e.g. =, !=, <, etc of a column and a literal)

https://github.com/influxdata/influxdb_iox/blob/main/read_buffer/src/row_group.rs#L1517-L1524

I recommend a similar (or the same!) approach in parquet

@tustvold
Copy link
Contributor Author

Shall I create a separate ticket in that case for directly evaluating predicates against encoded data, I think the two problems are separable?

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

Shall I create a separate ticket in that case for directly evaluating predicates against encoded data, I think the two problems are separable?

I probably don't fully understand what "evaluating predicates against encoded data" means compared to what you have proposed in this ticket. Thus I am not sure about the need for a second ticket

@tustvold
Copy link
Contributor Author

I probably don't fully understand what "evaluating predicates against encoded data" means compared to what you have proposed in this ticket.

In this ticket I propose passing a bitmask down to the scan, the parquet crate would have no involvement in generating this mask, nor would it understand the predicates involved in generating it. Think of it like the take kernel, you give it a mask and it returns those rows, but it has no idea why the query engine is requesting those rows.

This would mean streaming out data as an arrow RecordBatch (or Array) in order to evaluate predicates, however, my hope is with the dictionary preservation this will be relatively cheap, and successive predicate evaluations will retrieve successively less rows.

A further optimisation would then be to actually evaluate the predicates directly on the underlying parquet data, without first decoding to an arrow representation. This is what I'm wondering if I should create a ticket for? This still needs the "take" support in order for the generated masks to be useful, but it likely will speed up their generation vs decoding to an arrow representation first

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

A further optimisation would then be to actually evaluate the predicates directly on the underlying parquet data, without first decoding to an arrow representation. This is what I'm wondering if I should create a ticket for? This still needs the "take" support in order for the generated masks to be useful, but it likely will speed up their generation vs decoding to an arrow representation first

I think that makes sense as a follow up item; 👍

@sunchao
Copy link
Member

sunchao commented Jan 27, 2022

We can also explore lazy materialization, that is, only decode & decompress (or even pay the IO cost), when a Parquet page is actually needed. I think this is especially useful when many columns are selected and only a few very selective predicates are applied on some of the columns. It in some sense is similar to page skipping based on column index, but more powerful.

It'd be very nice to implement row group and page level skipping in arrow-rs also, so that engines don't have to duplicate the work.

@tustvold
Copy link
Contributor Author

Definitely. Irrespective of how the pruning takes places - whether a scan mask as proposed here, or information derived from a page index, or some other mechanism, there's definitely no point doing IO for pages that aren't going to be used. 👍

My hope with the scan masks is to provide an API that allows query engines to express what rows they want, without having to worry about the mechanics of what pages, etc... to fetch and decode for those rows, what statistics are available, how the writer interpreted the ambiguous parquet specifications, etc...

This would leave the implementation in arrow-rs free to choose the most efficient way to get the requested rows of data. As an added benefit this approach would integrate well with the async plumbing I stubbed out in #1154, which needs to know what data is going to be needed ahead of time.

This scan mask approach is heavily geared to the use-case of IOx where the page index is likely to not be very helpful, but they're definitely complementary approaches. I'd imagine it is possible to construct the partial scan logic in such a way that it works well for both. Ultimately the page index is just a way to generate a low granularity scan mask 😄

All that is to say, I agree entirely with handling these concerns in arrow-rs as you suggest.

@sunchao
Copy link
Member

sunchao commented Jan 27, 2022

Is this scan mask some sort of bitmap alongside an Arrow vector? or it is associated with a Parquet column chunk.

You can also check out this paper: Filter Representation in Vectorized Query Execution, to compare that against a selection vector based approach. AFAIK the latter may yield better performance due to no branching cost.

@tustvold
Copy link
Contributor Author

I mean I was just planning to use an arrow BooleanArray that you'd pass in... I'll give the linked paper a read 👍

@ParadoxShmaradox
Copy link

Just wanted to chip in and say that implementing PageIndex would be great even if parquet-rs doesn't use it internally for predicate push down as it can be used by other engines/implementation that do. In analytics systems these files get passed around between different systems.

I'm currently rewriting a project in Java, to Rust, that uses parquet 1.11.1 (so no hadoop or Impala or Spark) which uses predicate pushdown using ColumnIndex and OffsetIndex.

I'm using parquet-rs to write the file and datafusion to read the file back, unfortunately the current system can't read the file due the absent of the PageIndex.

Having current systems to be able to read parquet-rs files would be a boon for backward/forward compatibly.

@tustvold
Copy link
Contributor Author

@ParadoxShmaradox makes sense. I can try to find some time to bash something out in the next few weeks, time permitting. If you were willing to assist that might speed things along, but if not, no worries 😅

@ParadoxShmaradox
Copy link

@tustvold Sure! Hit me up and I can test file generation in rust and read in Java, I can probably share the files from both implementations.

@alamb
Copy link
Contributor

alamb commented May 16, 2022

Filed #1705 to track adding PageIndex support

@tustvold
Copy link
Contributor Author

I think the high-level work to support this is now complete, so marking this as done. There is definitely further improvements to be made, but that can be tracked separately

@alamb
Copy link
Contributor

alamb commented Aug 15, 2022

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

5 participants