Skip to content

Commit a988aaf

Browse files
LuciferYangcloud-fan
authored andcommitted
[SPARK-29454][SQL] Reduce unsafeProjection times when read Parquet file use non-vectorized mode
### What changes were proposed in this pull request? There will be 2 times unsafeProjection convert operation When we read a Parquet data file use non-vectorized mode: 1. `ParquetGroupConverter` call unsafeProjection function to covert `SpecificInternalRow` to `UnsafeRow` every times when read Parquet data file use `ParquetRecordReader`. 2. `ParquetFileFormat` will call unsafeProjection function to covert this `UnsafeRow` to another `UnsafeRow` again when partitionSchema is not empty in DataSourceV1 branch, and `PartitionReaderWithPartitionValues` will always do this convert operation in DataSourceV2 branch. In this pr, remove `unsafeProjection` convert operation in `ParquetGroupConverter` and change `ParquetRecordReader` to produce `SpecificInternalRow` instead of `UnsafeRow`. ### Why are the changes needed? The first time convert in `ParquetGroupConverter` is redundant and `ParquetRecordReader` return a `InternalRow(SpecificInternalRow)` is enough. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit Test Closes #26106 from LuciferYang/spark-parquet-unsafe-projection. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 857f109 commit a988aaf

File tree

5 files changed

+23
-30
lines changed

5 files changed

+23
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -328,32 +328,28 @@ class ParquetFileFormat
328328
iter.asInstanceOf[Iterator[InternalRow]]
329329
} else {
330330
logDebug(s"Falling back to parquet-mr")
331-
// ParquetRecordReader returns UnsafeRow
331+
// ParquetRecordReader returns InternalRow
332332
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
333333
val reader = if (pushed.isDefined && enableRecordFilter) {
334334
val parquetFilter = FilterCompat.get(pushed.get, null)
335-
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
335+
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
336336
} else {
337-
new ParquetRecordReader[UnsafeRow](readSupport)
337+
new ParquetRecordReader[InternalRow](readSupport)
338338
}
339-
val iter = new RecordReaderIterator(reader)
339+
val iter = new RecordReaderIterator[InternalRow](reader)
340340
// SPARK-23457 Register a task completion listener before `initialization`.
341341
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
342342
reader.initialize(split, hadoopAttemptContext)
343343

344344
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
345-
val joinedRow = new JoinedRow()
346-
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
345+
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
347346

348-
// This is a horrible erasure hack... if we type the iterator above, then it actually check
349-
// the type in next() and we get a class cast exception. If we make that function return
350-
// Object, then we can defer the cast until later!
351347
if (partitionSchema.length == 0) {
352348
// There is no partition columns
353-
iter.asInstanceOf[Iterator[InternalRow]]
349+
iter.map(unsafeProjection)
354350
} else {
355-
iter.asInstanceOf[Iterator[InternalRow]]
356-
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
351+
val joinedRow = new JoinedRow()
352+
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
357353
}
358354
}
359355
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ import org.apache.parquet.schema._
2929
import org.apache.parquet.schema.Type.Repetition
3030

3131
import org.apache.spark.internal.Logging
32-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
32+
import org.apache.spark.sql.catalyst.InternalRow
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.types._
3535

3636
/**
3737
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
38-
* [[UnsafeRow]]s.
38+
* [[InternalRow]]s.
3939
*
4040
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
4141
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
@@ -51,7 +51,7 @@ import org.apache.spark.sql.types._
5151
*/
5252
class ParquetReadSupport(val convertTz: Option[TimeZone],
5353
enableVectorizedReader: Boolean)
54-
extends ReadSupport[UnsafeRow] with Logging {
54+
extends ReadSupport[InternalRow] with Logging {
5555
private var catalystRequestedSchema: StructType = _
5656

5757
def this() {
@@ -114,13 +114,13 @@ class ParquetReadSupport(val convertTz: Option[TimeZone],
114114
/**
115115
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
116116
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
117-
* records to Catalyst [[UnsafeRow]]s.
117+
* records to Catalyst [[InternalRow]]s.
118118
*/
119119
override def prepareForRead(
120120
conf: Configuration,
121121
keyValueMetaData: JMap[String, String],
122122
fileSchema: MessageType,
123-
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
123+
readContext: ReadContext): RecordMaterializer[InternalRow] = {
124124
val parquetRequestedSchema = readContext.getRequestedSchema
125125
new ParquetRecordMaterializer(
126126
parquetRequestedSchema,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.TimeZone
2222
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
2323
import org.apache.parquet.schema.MessageType
2424

25-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
25+
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.types.StructType
2727

2828
/**
@@ -37,12 +37,12 @@ private[parquet] class ParquetRecordMaterializer(
3737
catalystSchema: StructType,
3838
schemaConverter: ParquetToSparkSchemaConverter,
3939
convertTz: Option[TimeZone])
40-
extends RecordMaterializer[UnsafeRow] {
40+
extends RecordMaterializer[InternalRow] {
4141

4242
private val rootConverter =
4343
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
4444

45-
override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
45+
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
4646

4747
override def getRootConverter: GroupConverter = rootConverter
4848
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,10 @@ private[parquet] class ParquetRowConverter(
173173

174174
private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
175175

176-
private val unsafeProjection = UnsafeProjection.create(catalystType)
177-
178176
/**
179-
* The [[UnsafeRow]] converted from an entire Parquet record.
177+
* The [[InternalRow]] converted from an entire Parquet record.
180178
*/
181-
def currentRecord: UnsafeRow = unsafeProjection(currentRow)
179+
def currentRecord: InternalRow = currentRow
182180

183181
// Converters for each field.
184182
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.spark.TaskContext
3131
import org.apache.spark.broadcast.Broadcast
3232
import org.apache.spark.internal.Logging
3333
import org.apache.spark.sql.catalyst.InternalRow
34-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3534
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3635
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
3736
import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
@@ -176,7 +175,7 @@ case class ParquetPartitionReaderFactory(
176175
reader
177176
}
178177

179-
private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, UnsafeRow] = {
178+
private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, InternalRow] = {
180179
buildReaderBase(file, createRowBaseParquetReader)
181180
}
182181

@@ -185,16 +184,16 @@ case class ParquetPartitionReaderFactory(
185184
partitionValues: InternalRow,
186185
hadoopAttemptContext: TaskAttemptContextImpl,
187186
pushed: Option[FilterPredicate],
188-
convertTz: Option[TimeZone]): RecordReader[Void, UnsafeRow] = {
187+
convertTz: Option[TimeZone]): RecordReader[Void, InternalRow] = {
189188
logDebug(s"Falling back to parquet-mr")
190189
val taskContext = Option(TaskContext.get())
191-
// ParquetRecordReader returns UnsafeRow
190+
// ParquetRecordReader returns InternalRow
192191
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
193192
val reader = if (pushed.isDefined && enableRecordFilter) {
194193
val parquetFilter = FilterCompat.get(pushed.get, null)
195-
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
194+
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
196195
} else {
197-
new ParquetRecordReader[UnsafeRow](readSupport)
196+
new ParquetRecordReader[InternalRow](readSupport)
198197
}
199198
val iter = new RecordReaderIterator(reader)
200199
// SPARK-23457 Register a task completion listener before `initialization`.

0 commit comments

Comments
 (0)