Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
return new FixedLenByteArrayAsLongUpdater(arrayLen);
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayUpdater(arrayLen);
} else if (sparkType == DataTypes.BinaryType) {
return new FixedLenByteArrayUpdater(arrayLen);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class ParquetToSparkSchemaConverter(
case _: DecimalLogicalTypeAnnotation =>
makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength))
case _: IntervalLogicalTypeAnnotation => typeNotImplemented()
case null => BinaryType
case _ => illegalType()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
Expand Down Expand Up @@ -111,6 +112,41 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

test("SPARK-41096: FIXED_LEN_BYTE_ARRAY support") {
Seq(true, false).foreach { dictionaryEnabled =>
def makeRawParquetFile(path: Path): Unit = {
val schemaStr =
"""message root {
| required FIXED_LEN_BYTE_ARRAY(1) a;
| required FIXED_LEN_BYTE_ARRAY(3) b;
|}
""".stripMargin
val schema = MessageTypeParser.parseMessageType(schemaStr)

val writer = createParquetWriter(schema, path, dictionaryEnabled)

(0 until 10).map(_.toString).foreach { n =>
val record = new SimpleGroup(schema)
record.add(0, Binary.fromString(n))
record.add(1, Binary.fromString(n + n + n))
writer.write(record)
}
writer.close()
}

withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawParquetFile(path)
Seq(true, false).foreach { vectorizedReaderEnabled =>
readParquetFile(path.toString, vectorizedReaderEnabled) { df =>
checkAnswer(df, (48 until 58).map(n => // char '0' is 48 in ascii
Row(Array(n), Array(n, n, n))))
}
}
}
}
}

test("string") {
val data = (1 to 4).map(i => Tuple1(i.toString))
// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
Expand Down