diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ff5c724375c3..d47f551b905e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -318,10 +318,34 @@ private[parquet] class ParquetRowConverter( new ParquetMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => + val wrappedUpdater = { + // SPARK-30338: avoid unnecessary InternalRow copying for nested structs: + // There are two cases to handle here: + // + // 1. Parent container is a map or array: we must make a deep copy of the mutable row + // because this converter may be invoked multiple times per Parquet input record + // (if the map or array contains multiple elements). + // + // 2. Parent container is a struct: we don't need to copy the row here because either: + // + // (a) all ancestors are structs and therefore no copying is required because this + // converter will only be invoked once per Parquet input record, or + // (b) some ancestor is struct that is nested in a map or array and that ancestor's + // converter will perform deep-copying (which will recursively copy this row). + if (updater.isInstanceOf[RowUpdater]) { + // `updater` is a RowUpdater, implying that the parent container is a struct. + updater + } else { + // `updater` is NOT a RowUpdater, implying that the parent container a map or array. + new ParentContainerUpdater { + override def set(value: Any): Unit = { + updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy + } + } + } + } new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater { - override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) - }) + schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater) case t => throw new RuntimeException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 39590b063f0a..1550b3bbb624 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -204,6 +204,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + testStandardAndLegacyModes("array of struct") { + val data = (1 to 4).map { i => + Tuple1( + Seq( + Tuple1(s"1st_val_$i"), + Tuple1(s"2nd_val_$i") + ) + ) + } + withParquetDataFrame(data) { df => + // Structs are converted to `Row`s + checkAnswer(df, data.map { case Tuple1(array) => + Row(array.map(struct => Row(struct.productIterator.toSeq: _*))) + }) + } + } + + testStandardAndLegacyModes("array of nested struct") { + val data = (1 to 4).map { i => + Tuple1( + Seq( + Tuple1( + Tuple1(s"1st_val_$i")), + Tuple1( + Tuple1(s"2nd_val_$i")) + ) + ) + } + withParquetDataFrame(data) { df => + // Structs are converted to `Row`s + checkAnswer(df, data.map { case Tuple1(array) => + Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))}) + }) + } + } + testStandardAndLegacyModes("nested struct with array of array as field") { val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) withParquetDataFrame(data) { df => @@ -214,9 +250,34 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + testStandardAndLegacyModes("nested map with struct as key type") { + val data = (1 to 4).map { i => + Tuple1( + Map( + (i, s"kA_$i") -> s"vA_$i", + (i, s"kB_$i") -> s"vB_$i" + ) + ) + } + withParquetDataFrame(data) { df => + // Structs are converted to `Row`s + checkAnswer(df, data.map { case Tuple1(m) => + Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v }) + }) + } + } + testStandardAndLegacyModes("nested map with struct as value type") { - val data = (1 to 4).map(i => Tuple1(Map(i -> ((i, s"val_$i"))))) + val data = (1 to 4).map { i => + Tuple1( + Map( + s"kA_$i" -> ((i, s"vA_$i")), + s"kB_$i" -> ((i, s"vB_$i")) + ) + ) + } withParquetDataFrame(data) { df => + // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(m) => Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) })