Skip to content

Commit 47776e7

Browse files
zsxwingMridul Muralidharan
authored andcommitted
[SPARK-17850][CORE] Add a flag to ignore corrupt files
## What changes were proposed in this pull request? Add a flag to ignore corrupt files. For Spark core, the configuration is `spark.files.ignoreCorruptFiles`. For Spark SQL, it's `spark.sql.files.ignoreCorruptFiles`. ## How was this patch tested? The added unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #15422 from zsxwing/SPARK-17850.
1 parent eb69335 commit 47776e7

File tree

7 files changed

+153
-7
lines changed

7 files changed

+153
-7
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,9 @@ package object config {
156156
.doc("Port to use for the block managed on the driver.")
157157
.fallbackConf(BLOCK_MANAGER_PORT)
158158

159+
private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
160+
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
161+
"encountering corrupt files and contents that have been read will still be returned.")
162+
.booleanConf
163+
.createWithDefault(false)
159164
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.io.EOFException
20+
import java.io.IOException
2121
import java.text.SimpleDateFormat
2222
import java.util.Date
2323

@@ -43,6 +43,7 @@ import org.apache.spark.annotation.DeveloperApi
4343
import org.apache.spark.broadcast.Broadcast
4444
import org.apache.spark.deploy.SparkHadoopUtil
4545
import org.apache.spark.internal.Logging
46+
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
4647
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4748
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
4849
import org.apache.spark.storage.StorageLevel
@@ -139,6 +140,8 @@ class HadoopRDD[K, V](
139140

140141
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
141142

143+
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
144+
142145
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
143146
protected def getJobConf(): JobConf = {
144147
val conf: Configuration = broadcastedConf.value.value
@@ -253,8 +256,7 @@ class HadoopRDD[K, V](
253256
try {
254257
finished = !reader.next(key, value)
255258
} catch {
256-
case eof: EOFException =>
257-
finished = true
259+
case e: IOException if ignoreCorruptFiles => finished = true
258260
}
259261
if (!finished) {
260262
inputMetrics.incRecordsRead(1)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.IOException
2021
import java.text.SimpleDateFormat
2122
import java.util.Date
2223

@@ -33,6 +34,7 @@ import org.apache.spark._
3334
import org.apache.spark.annotation.DeveloperApi
3435
import org.apache.spark.deploy.SparkHadoopUtil
3536
import org.apache.spark.internal.Logging
37+
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
3638
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
3739
import org.apache.spark.storage.StorageLevel
3840
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -85,6 +87,8 @@ class NewHadoopRDD[K, V](
8587

8688
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
8789

90+
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
91+
8892
def getConf: Configuration = {
8993
val conf: Configuration = confBroadcast.value.value
9094
if (shouldCloneJobConf) {
@@ -179,7 +183,11 @@ class NewHadoopRDD[K, V](
179183

180184
override def hasNext: Boolean = {
181185
if (!finished && !havePair) {
182-
finished = !reader.nextKeyValue
186+
try {
187+
finished = !reader.nextKeyValue
188+
} catch {
189+
case e: IOException if ignoreCorruptFiles => finished = true
190+
}
183191
if (finished) {
184192
// Close and release the reader here; close() will also be called when the task
185193
// completes, but for tasks that read from many files, it helps to release the

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark
1919

20-
import java.io.{File, FileWriter}
20+
import java.io._
21+
import java.util.zip.GZIPOutputStream
2122

2223
import scala.io.Source
2324

@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
2930
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
3031

3132
import org.apache.spark.input.PortableDataStream
33+
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
3234
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
3335
import org.apache.spark.storage.StorageLevel
3436
import org.apache.spark.util.Utils
@@ -541,4 +543,62 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
541543
}.collect()
542544
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
543545
}
546+
547+
test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
548+
val inputFile = File.createTempFile("input-", ".gz")
549+
try {
550+
// Create a corrupt gzip file
551+
val byteOutput = new ByteArrayOutputStream()
552+
val gzip = new GZIPOutputStream(byteOutput)
553+
try {
554+
gzip.write(Array[Byte](1, 2, 3, 4))
555+
} finally {
556+
gzip.close()
557+
}
558+
val bytes = byteOutput.toByteArray
559+
val o = new FileOutputStream(inputFile)
560+
try {
561+
// It's corrupt since we only write half of bytes into the file.
562+
o.write(bytes.take(bytes.length / 2))
563+
} finally {
564+
o.close()
565+
}
566+
567+
// Reading a corrupt gzip file should throw EOFException
568+
sc = new SparkContext("local", "test")
569+
// Test HadoopRDD
570+
var e = intercept[SparkException] {
571+
sc.textFile(inputFile.toURI.toString).collect()
572+
}
573+
assert(e.getCause.isInstanceOf[EOFException])
574+
assert(e.getCause.getMessage === "Unexpected end of input stream")
575+
// Test NewHadoopRDD
576+
e = intercept[SparkException] {
577+
sc.newAPIHadoopFile(
578+
inputFile.toURI.toString,
579+
classOf[NewTextInputFormat],
580+
classOf[LongWritable],
581+
classOf[Text]).collect()
582+
}
583+
assert(e.getCause.isInstanceOf[EOFException])
584+
assert(e.getCause.getMessage === "Unexpected end of input stream")
585+
sc.stop()
586+
587+
val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true)
588+
sc = new SparkContext("local", "test", conf)
589+
// Test HadoopRDD
590+
assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
591+
// Test NewHadoopRDD
592+
assert {
593+
sc.newAPIHadoopFile(
594+
inputFile.toURI.toString,
595+
classOf[NewTextInputFormat],
596+
classOf[LongWritable],
597+
classOf[Text]).collect().isEmpty
598+
}
599+
} finally {
600+
inputFile.delete()
601+
}
602+
}
603+
544604
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20+
import java.io.IOException
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.{Partition => RDDPartition, TaskContext}
@@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD}
2527
import org.apache.spark.sql.SparkSession
2628
import org.apache.spark.sql.catalyst.InternalRow
2729
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
30+
import org.apache.spark.util.NextIterator
2831

2932
/**
3033
* A part (i.e. "block") of a single file that should be read, along with partition column values
@@ -62,6 +65,8 @@ class FileScanRDD(
6265
@transient val filePartitions: Seq[FilePartition])
6366
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
6467

68+
private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
69+
6570
override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
6671
val iterator = new Iterator[Object] with AutoCloseable {
6772
private val inputMetrics = context.taskMetrics().inputMetrics
@@ -119,7 +124,30 @@ class FileScanRDD(
119124
InputFileNameHolder.setInputFileName(currentFile.filePath)
120125

121126
try {
122-
currentIterator = readFunction(currentFile)
127+
if (ignoreCorruptFiles) {
128+
currentIterator = new NextIterator[Object] {
129+
private val internalIter = readFunction(currentFile)
130+
131+
override def getNext(): AnyRef = {
132+
try {
133+
if (internalIter.hasNext) {
134+
internalIter.next()
135+
} else {
136+
finished = true
137+
null
138+
}
139+
} catch {
140+
case e: IOException =>
141+
finished = true
142+
null
143+
}
144+
}
145+
146+
override def close(): Unit = {}
147+
}
148+
} else {
149+
currentIterator = readFunction(currentFile)
150+
}
123151
} catch {
124152
case e: java.io.FileNotFoundException =>
125153
throw new java.io.FileNotFoundException(

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,12 @@ object SQLConf {
576576
.doubleConf
577577
.createWithDefault(0.05)
578578

579+
val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
580+
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
581+
"encountering corrupt files and contents that have been read will still be returned.")
582+
.booleanConf
583+
.createWithDefault(false)
584+
579585
object Deprecated {
580586
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
581587
}
@@ -743,6 +749,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
743749

744750
def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString
745751

752+
def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
753+
746754
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
747755

748756
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import java.io.File
20+
import java.io._
2121
import java.util.concurrent.atomic.AtomicInteger
22+
import java.util.zip.GZIPOutputStream
2223

2324
import org.apache.hadoop.conf.Configuration
2425
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
@@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
441442
}
442443
}
443444

445+
test("spark.files.ignoreCorruptFiles should work in SQL") {
446+
val inputFile = File.createTempFile("input-", ".gz")
447+
try {
448+
// Create a corrupt gzip file
449+
val byteOutput = new ByteArrayOutputStream()
450+
val gzip = new GZIPOutputStream(byteOutput)
451+
try {
452+
gzip.write(Array[Byte](1, 2, 3, 4))
453+
} finally {
454+
gzip.close()
455+
}
456+
val bytes = byteOutput.toByteArray
457+
val o = new FileOutputStream(inputFile)
458+
try {
459+
// It's corrupt since we only write half of bytes into the file.
460+
o.write(bytes.take(bytes.length / 2))
461+
} finally {
462+
o.close()
463+
}
464+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
465+
val e = intercept[SparkException] {
466+
spark.read.text(inputFile.toURI.toString).collect()
467+
}
468+
assert(e.getCause.isInstanceOf[EOFException])
469+
assert(e.getCause.getMessage === "Unexpected end of input stream")
470+
}
471+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
472+
assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty)
473+
}
474+
} finally {
475+
inputFile.delete()
476+
}
477+
}
478+
444479
// Helpers for checking the arguments passed to the FileFormat.
445480

446481
protected val checkPartitionSchema =

0 commit comments

Comments
 (0)