Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,14 @@ trait FileFormat {
* @param options A set of string -> string configuration options.
* @return
*/
def buildReader(
protected def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// TODO: Remove this default implementation when the other formats have been ported
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more TODO here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we don't need to implement this method in all sub-classes. Some FileFormat may override buildReaderWithPartitionValues directly(parquet), Some FileFormat may not be used in read path(HiveFileFormat)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I am wondering why the buildReader method is now marked as protected? Maybe you can comment here: https://issues.apache.org/jira/browse/SPARK-27751

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.IOException
import java.io.{FileNotFoundException, IOException}

import scala.collection.mutable

Expand All @@ -44,7 +44,7 @@ case class PartitionedFile(
filePath: String,
start: Long,
length: Long,
locations: Array[String] = Array.empty) {
@transient locations: Array[String] = Array.empty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to mark it as transient? filePartitions: Seq[FilePartition]) is already transient in FileScanRDD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not for FileScanRDD.filePartitions, this is for FilePartitions that sent by scheduler. The location is only useful during planning, we should not send it to executors.

override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
Expand Down Expand Up @@ -121,6 +121,20 @@ class FileScanRDD(
nextElement
}

private def readCurrentFile(): Iterator[InternalRow] = {
try {
readFunction(currentFile)
} catch {
case e: FileNotFoundException =>
throw new FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved.")
}
}

/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
updateBytesReadWithFileSize()
Expand All @@ -130,54 +144,36 @@ class FileScanRDD(
// Sets InputFileBlockHolder for the file block's information
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)

try {
if (ignoreCorruptFiles) {
currentIterator = new NextIterator[Object] {
private val internalIter = {
try {
// The readFunction may read files before consuming the iterator.
// E.g., vectorized Parquet reader.
readFunction(currentFile)
} catch {
case e @(_: RuntimeException | _: IOException) =>
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
Iterator.empty
}
}

override def getNext(): AnyRef = {
try {
if (internalIter.hasNext) {
internalIter.next()
} else {
finished = true
null
}
} catch {
case e: IOException =>
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
finished = true
null
if (ignoreCorruptFiles) {
currentIterator = new NextIterator[Object] {
// The readFunction may read some bytes before consuming the iterator, e.g.,
// vectorized Parquet reader. Here we use lazy val to delay the creation of
// iterator so that we will throw exception in `getNext`.
private lazy val internalIter = readCurrentFile()

override def getNext(): AnyRef = {
try {
if (internalIter.hasNext) {
internalIter.next()
} else {
finished = true
null
}
} catch {
// Throw FileNotFoundException even `ignoreCorruptFiles` is true
case e: FileNotFoundException => throw e
case e @ (_: RuntimeException | _: IOException) =>
logWarning(
s"Skipped the rest of the content in the corrupted file: $currentFile", e)
finished = true
null
}

override def close(): Unit = {}
}
} else {
currentIterator = readFunction(currentFile)

override def close(): Unit = {}
}
} catch {
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
currentIterator = Iterator.empty
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved."
)
} else {
currentIterator = readCurrentFile()
}

hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,6 @@ class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// For Parquet data source, `buildReader` already handles partition values appending. Here we
// simply delegate to `buildReader`.
buildReader(
sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}

override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
Expand Down