-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34859][SQL] Handle column index when using vectorized Parquet reader #32753
Conversation
.../main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
Outdated
Show resolved
Hide resolved
Test build #139237 has finished for PR 32753 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
f4ce616
to
64cfc59
Compare
Test build #139887 has started for PR 32753 at commit |
Kubernetes integration test starting |
Kubernetes integration test status success |
Thank you, @sunchao ! |
Test build #139891 has finished for PR 32753 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Thank you for updates, @sunchao ! |
.../main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
Show resolved
Hide resolved
Kubernetes integration test starting |
Test build #139952 has finished for PR 32753 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status success |
Kubernetes integration test status success |
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
Show resolved
Hide resolved
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
Show resolved
Hide resolved
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Called at the beginning of reading a new batch. | ||
* Construct a list of row ranges from the given `rowIndexes`. For example, suppose the | ||
* `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, does the parquet reader lib give you a big array containing these indexes, or it uses an algorithm to generate the indexes on the fly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And how fast/slow the parquet reader lib can generate the indexes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gives you an iterator so yeah generating them on the fly: https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253. The indexes are generated from Range
which is very similar to what we defined here. I'm planning to file a JIRA in parquet-mr to just return the original Range
s so we don't have to do this step in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valuesToReadInBatch -= (newOffset - offset); | ||
valuesToReadInPage -= (newOffset - offset); | ||
valuesToReadInPage -= (newRowId - rowId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert newOffset - offset
== newRowId - rowId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not necessarily true: rowId
tracks all the values that could be either read or skipped, while offset
only tracks value that are read into the result column vector.
|
||
// Read and decode dictionary ids. | ||
defColumn.readIntegers(readState, dictionaryIds, column, | ||
(VectorizedValuesReader) dataColumn); | ||
|
||
// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process | ||
// the values to add microseconds precision. | ||
if (column.hasDictionary() || (startOffset == 0 && isLazyDecodingSupported(typeName))) { | ||
if (column.hasDictionary() || (startRowId == pageFirstRowIndex && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, based on the comment (rowId != 0
) below, do we need to update it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I need to update the comment too
private int readPageV2(DataPageV2 page) throws IOException { | ||
this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move to readPage
? Looks like readPageV1
and readPageV2
all need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Let me do that.
for (int i = 0; i < n; ++i) { | ||
if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { | ||
updater.update(offset + i, values, valueReader); | ||
int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different to what I commented (#32753 (comment)) before. This looks more straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments. Looks good, otherwise.
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140382 has finished for PR 32753 at commit
|
final int maxDefinitionLevel; | ||
|
||
/** The current index overall all rows within the column chunk. This is used to check if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, overall all
-> over all
?
this.maxDefinitionLevel = maxDefinitionLevel; | ||
this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we move this rowIndexes == null ? null ...
part into constructRanges
method, that will be better because it will protect both this constructor and constructRanges
at the same time. For now, constructRanges
seems to have the assumption that the argument is not null
but there is no assertion for that. WDTY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's a fair point. Will do.
* @param total total number of values to skip | ||
* @param valuesReader reader to skip values from | ||
*/ | ||
void skipValues(int total, VectorizedValuesReader valuesReader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is renamed, please update the following PR description accordingly.
introduced a new API ParquetVectorUpdater.skipBatch which skips a batch of values from a Parquet value reader.
And, maybe, we had better update line 40 in this file.
Skip a batch of ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR description. Regarding the comment, IMO even though the method name is changed, the comment is still accurate in expressing what the method does.
|
||
// skip the part [rowId, start) | ||
int toSkip = (int) (start - rowId); | ||
if (toSkip > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. We may have two negative value cases.
- start < rowId
- (start - rowId) > Int.MaxValue
Are we considering both? Or, there is no change for case (2)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, I'd like to recommend to move the type casting (int)
into inside this if
statement. For if (toSkip > 0) {
check, we had better use long
. If the ranges are protected by line 191 ~ 192, then ignore this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start
must >= rowId
because it is defined as long start = Math.max(rangeStart, rowId)
. Therefore, the case 1 start < rowId
will never happen.
The second case, (start - rowId) > Int.MaxValue
, can only occur if start
is equal to rangeStart
. In this case we also know that rangeStart <= rowId + n
(from line 183) and n
is Math.min(leftInBatch, Math.min(leftInPage, this.currentCount))
which is guaranteed to be within integer range. Therefore, the cast is safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM from my side (with minor questions).
Kubernetes integration test starting |
Kubernetes integration test status success |
Merged to master for Apache Spark 3.2.0. Thank you, @sunchao , @viirya , @cloud-fan |
Also, cc @gengliangwang since this was the release blocker for Apache Spark 3.2.0. |
Test build #140459 has finished for PR 32753 at commit
|
What changes were proposed in this pull request?
Make the current vectorized Parquet reader to work with column index introduced in Parquet 1.11. In particular, this PR makes the following changes:
ParquetReadState
, track row ranges returned viaPageReadStore.getRowIndexes
as well as the first row index for each page viaDataPage.getFirstRowIndex
.ParquetVectorUpdater.skipValues
which skips a batch of values from a Parquet value reader. As part of the process also renamed existingupdateBatch
toreadValues
, andupdate
toreadValue
to keep the method names consistent.VectorizedValuesReader.skipXXX
for different data types, as well as the implementations. These are useful when the reader knows that the given batch of values can be skipped, for instance, due to the batch is not covered in the row ranges generated by column index filtering.VectorizedRleValuesReader
to handle column index filtering. This is done by comparing the range that is going to be read next within the current RLE/PACKED block (let's call this block range), against the current row range. There are three cases:Why are the changes needed?
Parquet Column Index is a new feature in Parquet 1.11 which allows very efficient filtering on page level (some benchmark numbers can be found here), especially when data is sorted. The feature is largely implemented in parquet-mr (via classes such as
ColumnIndex
andColumnIndexFilter
). In Spark, the non-vectorized Parquet reader can automatically benefit from the feature after upgrading to Parquet 1.11.x, without any code change. However, the same is not true for vectorized Parquet reader since Spark chose to implement its own logic such as reading Parquet pages, handling definition levels, reading values into columnar batches, etc.Previously, SPARK-26345 / (#31393) updated Spark to only scan pages filtered by column index from parquet-mr side. This is done by calling
ParquetFileReader.readNextFilteredRowGroup
andParquetFileReader.getFilteredRecordCount
API. The implementation, however, only work for a few limited cases: in the scenario where there are multiple columns and their type width are different (e.g.,int
andbigint
), it could return incorrect result. For this issue, please see SPARK-34859 for a detailed description.In order to fix the above, Spark needs to leverage the API
PageReadStore.getRowIndexes
andDataPage.getFirstRowIndex
. The former returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for nested schema they're different) after filtering within a Parquet row group. The latter returns the first row index within a single data page. With the combination of the two, one is able to know which rows/values should be filtered while scanning a Parquet page.Does this PR introduce any user-facing change?
Yes. Now the vectorized Parquet reader should work correctly with column index.
How was this patch tested?
Borrowed tests from #31998 and added a few more tests.