diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index cce53001ea621..15d58f0c7572a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -176,6 +176,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa return new FixedLenByteArrayAsLongUpdater(arrayLen); } else if (canReadAsBinaryDecimal(descriptor, sparkType)) { return new FixedLenByteArrayUpdater(arrayLen); + } else if (sparkType == DataTypes.BinaryType) { + return new FixedLenByteArrayUpdater(arrayLen); } break; default: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 6f271bca36ab5..bb0df3639dcd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -297,6 +297,7 @@ class ParquetToSparkSchemaConverter( case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength)) case _: IntervalLogicalTypeAnnotation => typeNotImplemented() + case null => BinaryType case _ => illegalType() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index fea986cc8e2de..df96403ac5071 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -36,6 +36,7 @@ import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP +import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} @@ -111,6 +112,41 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + test("SPARK-41096: FIXED_LEN_BYTE_ARRAY support") { + Seq(true, false).foreach { dictionaryEnabled => + def makeRawParquetFile(path: Path): Unit = { + val schemaStr = + """message root { + | required FIXED_LEN_BYTE_ARRAY(1) a; + | required FIXED_LEN_BYTE_ARRAY(3) b; + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + + val writer = createParquetWriter(schema, path, dictionaryEnabled) + + (0 until 10).map(_.toString).foreach { n => + val record = new SimpleGroup(schema) + record.add(0, Binary.fromString(n)) + record.add(1, Binary.fromString(n + n + n)) + writer.write(record) + } + writer.close() + } + + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawParquetFile(path) + Seq(true, false).foreach { vectorizedReaderEnabled => + readParquetFile(path.toString, vectorizedReaderEnabled) { df => + checkAnswer(df, (48 until 58).map(n => // char '0' is 48 in ascii + Row(Array(n), Array(n, n, n)))) + } + } + } + } + } + test("string") { val data = (1 to 4).map(i => Tuple1(i.toString)) // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL