Skip to content

Commit 05887fc

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-19916][SQL] simplify bad file handling
## What changes were proposed in this pull request? We should only have one centre place to try catch the exception for corrupted files. ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes #17253 from cloud-fan/bad-file.
1 parent 9456688 commit 05887fc

File tree

3 files changed

+43
-63
lines changed

3 files changed

+43
-63
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,14 @@ trait FileFormat {
9090
* @param options A set of string -> string configuration options.
9191
* @return
9292
*/
93-
def buildReader(
93+
protected def buildReader(
9494
sparkSession: SparkSession,
9595
dataSchema: StructType,
9696
partitionSchema: StructType,
9797
requiredSchema: StructType,
9898
filters: Seq[Filter],
9999
options: Map[String, String],
100100
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
101-
// TODO: Remove this default implementation when the other formats have been ported
102-
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
103101
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
104102
}
105103

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import java.io.IOException
20+
import java.io.{FileNotFoundException, IOException}
2121

2222
import scala.collection.mutable
2323

@@ -44,7 +44,7 @@ case class PartitionedFile(
4444
filePath: String,
4545
start: Long,
4646
length: Long,
47-
locations: Array[String] = Array.empty) {
47+
@transient locations: Array[String] = Array.empty) {
4848
override def toString: String = {
4949
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
5050
}
@@ -121,6 +121,20 @@ class FileScanRDD(
121121
nextElement
122122
}
123123

124+
private def readCurrentFile(): Iterator[InternalRow] = {
125+
try {
126+
readFunction(currentFile)
127+
} catch {
128+
case e: FileNotFoundException =>
129+
throw new FileNotFoundException(
130+
e.getMessage + "\n" +
131+
"It is possible the underlying files have been updated. " +
132+
"You can explicitly invalidate the cache in Spark by " +
133+
"running 'REFRESH TABLE tableName' command in SQL or " +
134+
"by recreating the Dataset/DataFrame involved.")
135+
}
136+
}
137+
124138
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
125139
private def nextIterator(): Boolean = {
126140
updateBytesReadWithFileSize()
@@ -130,54 +144,36 @@ class FileScanRDD(
130144
// Sets InputFileBlockHolder for the file block's information
131145
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
132146

133-
try {
134-
if (ignoreCorruptFiles) {
135-
currentIterator = new NextIterator[Object] {
136-
private val internalIter = {
137-
try {
138-
// The readFunction may read files before consuming the iterator.
139-
// E.g., vectorized Parquet reader.
140-
readFunction(currentFile)
141-
} catch {
142-
case e @(_: RuntimeException | _: IOException) =>
143-
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
144-
Iterator.empty
145-
}
146-
}
147-
148-
override def getNext(): AnyRef = {
149-
try {
150-
if (internalIter.hasNext) {
151-
internalIter.next()
152-
} else {
153-
finished = true
154-
null
155-
}
156-
} catch {
157-
case e: IOException =>
158-
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
159-
finished = true
160-
null
147+
if (ignoreCorruptFiles) {
148+
currentIterator = new NextIterator[Object] {
149+
// The readFunction may read some bytes before consuming the iterator, e.g.,
150+
// vectorized Parquet reader. Here we use lazy val to delay the creation of
151+
// iterator so that we will throw exception in `getNext`.
152+
private lazy val internalIter = readCurrentFile()
153+
154+
override def getNext(): AnyRef = {
155+
try {
156+
if (internalIter.hasNext) {
157+
internalIter.next()
158+
} else {
159+
finished = true
160+
null
161161
}
162+
} catch {
163+
// Throw FileNotFoundException even `ignoreCorruptFiles` is true
164+
case e: FileNotFoundException => throw e
165+
case e @ (_: RuntimeException | _: IOException) =>
166+
logWarning(
167+
s"Skipped the rest of the content in the corrupted file: $currentFile", e)
168+
finished = true
169+
null
162170
}
163-
164-
override def close(): Unit = {}
165171
}
166-
} else {
167-
currentIterator = readFunction(currentFile)
172+
173+
override def close(): Unit = {}
168174
}
169-
} catch {
170-
case e: IOException if ignoreCorruptFiles =>
171-
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
172-
currentIterator = Iterator.empty
173-
case e: java.io.FileNotFoundException =>
174-
throw new java.io.FileNotFoundException(
175-
e.getMessage + "\n" +
176-
"It is possible the underlying files have been updated. " +
177-
"You can explicitly invalidate the cache in Spark by " +
178-
"running 'REFRESH TABLE tableName' command in SQL or " +
179-
"by recreating the Dataset/DataFrame involved."
180-
)
175+
} else {
176+
currentIterator = readCurrentFile()
181177
}
182178

183179
hasNext

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -283,20 +283,6 @@ class ParquetFileFormat
283283
filters: Seq[Filter],
284284
options: Map[String, String],
285285
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
286-
// For Parquet data source, `buildReader` already handles partition values appending. Here we
287-
// simply delegate to `buildReader`.
288-
buildReader(
289-
sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
290-
}
291-
292-
override def buildReader(
293-
sparkSession: SparkSession,
294-
dataSchema: StructType,
295-
partitionSchema: StructType,
296-
requiredSchema: StructType,
297-
filters: Seq[Filter],
298-
options: Map[String, String],
299-
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
300286
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
301287
hadoopConf.set(
302288
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,

0 commit comments

Comments
 (0)