Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;

/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
Expand Down Expand Up @@ -136,7 +137,9 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema);
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have the requestedSchema in parquet's form, which does not contain the correct annotation. It may still be a potential issue when correct annotations in requestedSchema matters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it's safer when the Parquet requested schema conforms to the actual physical file to be read. Normally, we shouldn't care about logical types (those with annotations) at the level of Parquet record reader. It's the upper level engine's responsibility to convert basic types like int32 into logical types like INT_8 and INT_16. The vectorized reader has to mix them up because we need to construct value vectors of proper types at this level.

this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
)
}
}

test("SPARK-16632: read Parquet int32 as ByteType and ShortType") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
withTempPath { dir =>
val path = dir.getCanonicalPath

// When being written to Parquet, `TINYINT` and `SMALLINT` should be converted into
// `int32 (INT_8)` and `int32 (INT_16)` respectively. However, Hive doesn't add the `INT_8`
// and `INT_16` annotation properly (HIVE-14294). Thus, when reading files written by Hive
// using Spark with the vectorized Parquet reader enabled, we may hit error due to type
// mismatch.
//
// Here we are simulating Hive's behavior by writing a single `INT` field and then read it
// back as `TINYINT` and `SMALLINT` in Spark to verify this issue.
Seq(1).toDF("f").write.parquet(path)

val withByteField = new StructType().add("f", ByteType)
checkAnswer(spark.read.schema(withByteField).parquet(path), Row(1: Byte))

val withShortField = new StructType().add("f", ShortType)
checkAnswer(spark.read.schema(withShortField).parquet(path), Row(1: Short))
}
}
}
}

object TestingUDT {
Expand Down