Skip to content

Commit

Permalink
[SPARK-34859][SQL] Handle column index when using vectorized Parquet …
Browse files Browse the repository at this point in the history
…reader

### What changes were proposed in this pull request?

Make the current vectorized Parquet reader to work with column index introduced in Parquet 1.11. In particular, this PR makes the following changes:
1. in `ParquetReadState`, track row ranges returned via `PageReadStore.getRowIndexes` as well as the first row index for each page via `DataPage.getFirstRowIndex`.
1. introduced a new API `ParquetVectorUpdater.skipValues` which skips a batch of values from a Parquet value reader. As part of the process also renamed existing `updateBatch` to `readValues`, and `update` to `readValue` to keep the method names consistent.
1. in correspondence as above, also introduced new API `VectorizedValuesReader.skipXXX` for different data types, as well as the implementations. These are useful when the reader knows that the given batch of values can be skipped, for instance, due to the batch is not covered in the row ranges generated by column index filtering.
2. changed `VectorizedRleValuesReader` to handle column index filtering. This is done by comparing the range that is going to be read next within the current RLE/PACKED block (let's call this block range), against the current row range. There are three cases:
    * if the block range is before the current row range, skip all the values in the block range
    * if the block range is after the current row range, advance the row range and repeat the steps
    * if the block range overlaps with the current row range, only read the values within the overlapping area and skip the rest.

### Why are the changes needed?

[Parquet Column Index](https://github.com/apache/parquet-format/blob/master/PageIndex.md) is a new feature in Parquet 1.11 which allows very efficient filtering on page level (some benchmark numbers can be found [here](https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/)), especially when data is sorted. The feature is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`). In Spark, the non-vectorized Parquet reader can automatically benefit from the feature after upgrading to Parquet 1.11.x, without any code change. However, the same is not true for vectorized Parquet reader since Spark chose to implement its own logic such as reading Parquet pages, handling definition levels, reading values into columnar batches, etc.

Previously, [SPARK-26345](https://issues.apache.org/jira/browse/SPARK-26345) / (#31393) updated Spark to only scan pages filtered by column index from parquet-mr side. This is done by calling `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. The implementation, however, only work for a few limited cases: in the scenario where there are multiple columns and their type width are different (e.g., `int` and `bigint`), it could return incorrect result. For this issue, please see SPARK-34859 for a detailed description.

In order to fix the above, Spark needs to leverage the API `PageReadStore.getRowIndexes` and `DataPage.getFirstRowIndex`. The former returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for nested schema they're different) after filtering within a Parquet row group. The latter returns the first row index within a single data page. With the combination of the two, one is able to know which rows/values should be filtered while scanning a Parquet page.

### Does this PR introduce _any_ user-facing change?

Yes. Now the vectorized Parquet reader should work correctly with column index.

### How was this patch tested?

Borrowed tests from #31998 and added a few more tests.

Closes #32753 from sunchao/SPARK-34859.

Lead-authored-by: Chao Sun <sunchao@apple.com>
Co-authored-by: Li Xian <lxian2shell@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
2 people authored and dongjoon-hyun committed Jun 30, 2021
1 parent d46c1e3 commit a5c8866
Show file tree
Hide file tree
Showing 10 changed files with 746 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,38 @@

package org.apache.spark.sql.execution.datasources.parquet;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PrimitiveIterator;

/**
* Helper class to store intermediate state while reading a Parquet column chunk.
*/
final class ParquetReadState {
/** Maximum definition level */
/** A special row range used when there is no row indexes (hence all rows must be included) */
private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE);

/**
* A special row range used when the row indexes are present AND all the row ranges have been
* processed. This serves as a sentinel at the end indicating that all rows come after the last
* row range should be skipped.
*/
private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE);

/** Iterator over all row ranges, only not-null if column index is present */
private final Iterator<RowRange> rowRanges;

/** The current row range */
private RowRange currentRange;

/** Maximum definition level for the Parquet column */
final int maxDefinitionLevel;

/** The current index over all rows within the column chunk. This is used to check if the
* current row should be skipped by comparing against the row ranges. */
long rowId;

/** The offset in the current batch to put the next value */
int offset;

Expand All @@ -33,31 +58,108 @@ final class ParquetReadState {
/** The remaining number of values to read in the current batch */
int valuesToReadInBatch;

ParquetReadState(int maxDefinitionLevel) {
ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) {
this.maxDefinitionLevel = maxDefinitionLevel;
this.rowRanges = constructRanges(rowIndexes);
nextRange();
}

/**
* Called at the beginning of reading a new batch.
* Construct a list of row ranges from the given `rowIndexes`. For example, suppose the
* `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges:
* `[0-2], [4-5], [7-9]`.
*/
void resetForBatch(int batchSize) {
private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong rowIndexes) {
if (rowIndexes == null) {
return null;
}

List<RowRange> rowRanges = new ArrayList<>();
long currentStart = Long.MIN_VALUE;
long previous = Long.MIN_VALUE;

while (rowIndexes.hasNext()) {
long idx = rowIndexes.nextLong();
if (currentStart == Long.MIN_VALUE) {
currentStart = idx;
} else if (previous + 1 != idx) {
RowRange range = new RowRange(currentStart, previous);
rowRanges.add(range);
currentStart = idx;
}
previous = idx;
}

if (previous != Long.MIN_VALUE) {
rowRanges.add(new RowRange(currentStart, previous));
}

return rowRanges.iterator();
}

/**
* Must be called at the beginning of reading a new batch.
*/
void resetForNewBatch(int batchSize) {
this.offset = 0;
this.valuesToReadInBatch = batchSize;
}

/**
* Called at the beginning of reading a new page.
* Must be called at the beginning of reading a new page.
*/
void resetForPage(int totalValuesInPage) {
void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
this.valuesToReadInPage = totalValuesInPage;
this.rowId = pageFirstRowIndex;
}

/**
* Advance the current offset to the new values.
* Returns the start index of the current row range.
*/
void advanceOffset(int newOffset) {
long currentRangeStart() {
return currentRange.start;
}

/**
* Returns the end index of the current row range.
*/
long currentRangeEnd() {
return currentRange.end;
}

/**
* Advance the current offset and rowId to the new values.
*/
void advanceOffsetAndRowId(int newOffset, long newRowId) {
valuesToReadInBatch -= (newOffset - offset);
valuesToReadInPage -= (newOffset - offset);
valuesToReadInPage -= (newRowId - rowId);
offset = newOffset;
rowId = newRowId;
}

/**
* Advance to the next range.
*/
void nextRange() {
if (rowRanges == null) {
currentRange = MAX_ROW_RANGE;
} else if (!rowRanges.hasNext()) {
currentRange = END_ROW_RANGE;
} else {
currentRange = rowRanges.next();
}
}

/**
* Helper struct to represent a range of row indexes `[start, end]`.
*/
private static class RowRange {
final long start;
final long end;

RowRange(long start, long end) {
this.start = start;
this.end = end;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,28 @@ public interface ParquetVectorUpdater {
* @param values destination values vector
* @param valuesReader reader to read values from
*/
void updateBatch(
void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader);

/**
* Skip a batch of `total` values from `valuesReader`.
*
* @param total total number of values to skip
* @param valuesReader reader to skip values from
*/
void skipValues(int total, VectorizedValuesReader valuesReader);

/**
* Read a single value from `valuesReader` into `values`, at `offset`.
*
* @param offset offset in `values` to put the new value
* @param values destination value vector
* @param valuesReader reader to read values from
*/
void update(int offset, WritableColumnVector values, VectorizedValuesReader valuesReader);
void readValue(int offset, WritableColumnVector values, VectorizedValuesReader valuesReader);

/**
* Process a batch of `total` values starting from `offset` in `values`, whose null slots
Expand Down
Loading

0 comments on commit a5c8866

Please sign in to comment.