Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
85e93b8
cleanup OriginalType used in parquet VectorizedColumnReader
LuciferYang Mar 5, 2021
c2fc30d
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang Mar 8, 2021
4b9daf3
rename originalType to logicalTypeAnnotation
LuciferYang Mar 8, 2021
34c8d2e
rewrite isDecimalTypeMatched
LuciferYang Mar 8, 2021
ae77ed0
cleanup ParquetReadSupport
LuciferYang Mar 15, 2021
6d8e5f8
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang Mar 15, 2021
5f090bb
clean up ParquetRowConverter
LuciferYang Mar 15, 2021
c7ec27b
cleanup ParquetSchemaConverter
LuciferYang Mar 15, 2021
54467a4
re-clean ParquetRowConverter
LuciferYang Mar 15, 2021
23e1185
fix null match
LuciferYang Mar 15, 2021
fa9a9ec
fix null INT64 type match in ParquetSchemaConverter
LuciferYang Mar 15, 2021
9564e64
fix failed case
LuciferYang Mar 16, 2021
311d7dd
clean up ParquetFilters
LuciferYang Mar 16, 2021
2930b61
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang Mar 16, 2021
8817f48
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang Mar 18, 2021
6a9d8c5
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang Mar 29, 2021
66715c9
fix ParquetRowConverter
LuciferYang Mar 30, 2021
9835eeb
fix bug
LuciferYang Mar 30, 2021
9a7ec8c
try remove decimalLogicalType from ParquetSchemaType
LuciferYang Mar 31, 2021
51f75b7
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang Apr 28, 2021
f91b670
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang May 5, 2021
2bc0391
Merge branch 'upmaster' into cleanup-parquet-dep-api
LuciferYang May 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
import org.apache.parquet.schema.PrimitiveType;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
Expand Down Expand Up @@ -101,7 +105,7 @@ public class VectorizedColumnReader {

private final PageReader pageReader;
private final ColumnDescriptor descriptor;
private final OriginalType originalType;
private final LogicalTypeAnnotation logicalTypeAnnotation;
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC;
Expand All @@ -110,10 +114,14 @@ public class VectorizedColumnReader {

private boolean isDecimalTypeMatched(DataType dt) {
DecimalType d = (DecimalType) dt;
DecimalMetadata dm = descriptor.getPrimitiveType().getDecimalMetadata();
// It's OK if the required decimal precision is larger than or equal to the physical decimal
// precision in the Parquet metadata, as long as the decimal scale is the same.
return dm != null && dm.getPrecision() <= d.precision() && dm.getScale() == d.scale();
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
if (typeAnnotation instanceof DecimalLogicalTypeAnnotation) {
DecimalLogicalTypeAnnotation decimalType = (DecimalLogicalTypeAnnotation) typeAnnotation;
// It's OK if the required decimal precision is larger than or equal to the physical decimal
// precision in the Parquet metadata, as long as the decimal scale is the same.
return decimalType.getPrecision() <= d.precision() && decimalType.getScale() == d.scale();
}
return false;
}

private boolean canReadAsIntDecimal(DataType dt) {
Expand All @@ -133,15 +141,15 @@ private boolean canReadAsBinaryDecimal(DataType dt) {

public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
LogicalTypeAnnotation logicalTypeAnnotation,
PageReader pageReader,
ZoneId convertTz,
String datetimeRebaseMode,
String int96RebaseMode) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
this.originalType = originalType;
this.logicalTypeAnnotation = logicalTypeAnnotation;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
Expand Down Expand Up @@ -172,13 +180,14 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName
boolean isSupported = false;
switch (typeName) {
case INT32:
isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode);
isSupported = !(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) ||
"CORRECTED".equals(datetimeRebaseMode);
break;
case INT64:
if (originalType == OriginalType.TIMESTAMP_MICROS) {
if (isTimestampTypeMatched(TimeUnit.MICROS)) {
isSupported = "CORRECTED".equals(datetimeRebaseMode);
} else {
isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
isSupported = !isTimestampTypeMatched(TimeUnit.MILLIS);
}
break;
case FLOAT:
Expand Down Expand Up @@ -263,17 +272,18 @@ void readBatch(int total, WritableColumnVector column) throws IOException {
// We need to make sure that we initialize the right type for the dictionary otherwise
// WritableColumnVector will throw an exception when trying to decode to an Int when the
// dictionary is in fact initialized as Long
boolean castLongToInt = primitiveType.getOriginalType() == OriginalType.DECIMAL &&
primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() &&
primitiveType.getPrimitiveTypeName() == INT64;
LogicalTypeAnnotation typeAnnotation = primitiveType.getLogicalTypeAnnotation();
boolean castLongToInt = typeAnnotation instanceof DecimalLogicalTypeAnnotation &&
((DecimalLogicalTypeAnnotation) typeAnnotation).getPrecision() <=
Decimal.MAX_INT_DIGITS() && primitiveType.getPrimitiveTypeName() == INT64;

// We require a long value, but we need to use dictionary to decode the original
// signed int first
boolean isUnsignedInt32 = primitiveType.getOriginalType() == OriginalType.UINT_32;
boolean isUnsignedInt32 = isUnsignedIntTypeMatched(32);

// We require a decimal value, but we need to use dictionary to decode the original
// signed long first
boolean isUnsignedInt64 = primitiveType.getOriginalType() == OriginalType.UINT_64;
boolean isUnsignedInt64 = isUnsignedIntTypeMatched(64);

boolean needTransform = castLongToInt || isUnsignedInt32 || isUnsignedInt64;
column.setDictionary(new ParquetDictionary(dictionary, needTransform));
Expand Down Expand Up @@ -398,14 +408,14 @@ private void decodeDictionaryIds(
case INT64:
if (column.dataType() == DataTypes.LongType ||
canReadAsLongDecimal(column.dataType()) ||
(originalType == OriginalType.TIMESTAMP_MICROS &&
(isTimestampTypeMatched(TimeUnit.MICROS) &&
"CORRECTED".equals(datetimeRebaseMode))) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
} else if (originalType == OriginalType.UINT_64) {
} else if (isUnsignedIntTypeMatched(64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as dictionary encoded signed int64 in Parquet
// whenever dictionary is available.
Expand All @@ -418,7 +428,7 @@ private void decodeDictionaryIds(
column.putByteArray(i, unsigned);
}
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
} else if (isTimestampTypeMatched(TimeUnit.MILLIS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Expand All @@ -436,7 +446,7 @@ private void decodeDictionaryIds(
}
}
}
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
} else if (isTimestampTypeMatched(TimeUnit.MICROS)) {
final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Expand Down Expand Up @@ -611,13 +621,13 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn,
DecimalType.is32BitDecimalType(column.dataType()));
} else if (originalType == OriginalType.UINT_64) {
} else if (isUnsignedIntTypeMatched(64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary fallbacks.
// We read them as decimal values.
defColumn.readUnsignedLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
} else if (isTimestampTypeMatched(TimeUnit.MICROS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, false);
Expand All @@ -626,7 +636,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
defColumn.readLongsWithRebase(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
} else if (isTimestampTypeMatched(TimeUnit.MILLIS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
Expand Down Expand Up @@ -871,4 +881,15 @@ private void readPageV2(DataPageV2 page) throws IOException {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}

private boolean isTimestampTypeMatched(TimeUnit unit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these can be static methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract these methods into util class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we can just change private boolean to private static boolean since they are not depending on any of the state from the class, but it's a really minor thing :)

return logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation &&
((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == unit;
}

private boolean isUnsignedIntTypeMatched(int bitWidth) {
return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation &&
!((IntLogicalTypeAnnotation) logicalTypeAnnotation).isSigned() &&
((IntLogicalTypeAnnotation) logicalTypeAnnotation).getBitWidth() == bitWidth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private void checkEndOfRowGroup() throws IOException {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(
columns.get(i),
types.get(i).getOriginalType(),
types.get(i).getLogicalTypeAnnotation(),
pages.getPageReader(columns.get(i)),
convertTz,
datetimeRebaseMode,
Expand Down
Loading