Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,32 +328,28 @@ class ParquetFileFormat
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is this PR aiming only for non-vectorized code, @LuciferYang ? If then, please update the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

// ParquetRecordReader returns UnsafeRow
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](readSupport)
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator(reader)
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
if (partitionSchema.length == 0) {
// There is no partition columns
iter.asInstanceOf[Iterator[InternalRow]]
iter.map(unsafeProjection)
} else {
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[UnsafeRow]]s.
* [[InternalRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
Expand All @@ -51,7 +51,7 @@ import org.apache.spark.sql.types._
*/
class ParquetReadSupport(val convertTz: Option[TimeZone],
enableVectorizedReader: Boolean)
extends ReadSupport[UnsafeRow] with Logging {
extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _

def this() {
Expand Down Expand Up @@ -114,13 +114,13 @@ class ParquetReadSupport(val convertTz: Option[TimeZone],
/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[UnsafeRow]]s.
* records to Catalyst [[InternalRow]]s.
*/
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
readContext: ReadContext): RecordMaterializer[InternalRow] = {
val parquetRequestedSchema = readContext.getRequestedSchema
new ParquetRecordMaterializer(
parquetRequestedSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.TimeZone
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -37,12 +37,12 @@ private[parquet] class ParquetRecordMaterializer(
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[TimeZone])
extends RecordMaterializer[UnsafeRow] {
extends RecordMaterializer[InternalRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)

override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
override def getCurrentRecord: InternalRow = rootConverter.currentRecord

override def getRootConverter: GroupConverter = rootConverter
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,10 @@ private[parquet] class ParquetRowConverter(

private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))

private val unsafeProjection = UnsafeProjection.create(catalystType)

/**
* The [[UnsafeRow]] converted from an entire Parquet record.
* The [[InternalRow]] converted from an entire Parquet record.
*/
def currentRecord: UnsafeRow = unsafeProjection(currentRow)
def currentRecord: InternalRow = currentRow

// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
Expand Down Expand Up @@ -176,7 +175,7 @@ case class ParquetPartitionReaderFactory(
reader
}

private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, UnsafeRow] = {
private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, InternalRow] = {
buildReaderBase(file, createRowBaseParquetReader)
}

Expand All @@ -185,16 +184,16 @@ case class ParquetPartitionReaderFactory(
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
convertTz: Option[TimeZone]): RecordReader[Void, UnsafeRow] = {
convertTz: Option[TimeZone]): RecordReader[Void, InternalRow] = {
logDebug(s"Falling back to parquet-mr")
val taskContext = Option(TaskContext.get())
// ParquetRecordReader returns UnsafeRow
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](readSupport)
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion listener before `initialization`.
Expand Down