Skip to content

Commit 89cdbb0

Browse files
CodingCatpwendell
authored andcommitted
SPARK-1677: allow user to disable output dir existence checking
https://issues.apache.org/jira/browse/SPARK-1677 For compatibility with older versions of Spark it would be nice to have an option `spark.hadoop.validateOutputSpecs` (default true) for the user to disable the output directory existence checking Author: CodingCat <zhunansjtu@gmail.com> Closes #947 from CodingCat/SPARK-1677 and squashes the following commits: 7930f83 [CodingCat] miao c0c0e03 [CodingCat] bug fix and doc update 5318562 [CodingCat] bug fix 13219b5 [CodingCat] allow user to disable output dir existence checking
1 parent 7c16029 commit 89cdbb0

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
737737
val outfmt = job.getOutputFormatClass
738738
val jobFormat = outfmt.newInstance
739739

740-
if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
740+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
741+
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
741742
// FileOutputFormat ignores the filesystem parameter
742743
jobFormat.checkOutputSpecs(job)
743744
}
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
803804
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
804805
valueClass.getSimpleName + ")")
805806

806-
if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
808+
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
807809
// FileOutputFormat ignores the filesystem parameter
808810
val ignoredFs = FileSystem.get(conf)
809811
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
230230
}
231231
}
232232

233+
test ("allow user to disable the output directory existence checking (old Hadoop API") {
234+
val sf = new SparkConf()
235+
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
236+
sc = new SparkContext(sf)
237+
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
238+
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
239+
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
240+
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
241+
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
242+
}
243+
233244
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
234245
sc = new SparkContext("local", "test")
235246
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
248259
}
249260
}
250261

262+
test ("allow user to disable the output directory existence checking (new Hadoop API") {
263+
val sf = new SparkConf()
264+
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
265+
sc = new SparkContext(sf)
266+
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
267+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
268+
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
269+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
270+
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
271+
}
272+
251273
test ("save Hadoop Dataset through old Hadoop API") {
252274
sc = new SparkContext("local", "test")
253275
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)

docs/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
487487
this duration will be cleared as well.
488488
</td>
489489
</tr>
490+
<tr>
491+
<td>spark.hadoop.validateOutputSpecs</td>
492+
<td>true</td>
493+
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
494+
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
495+
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
496+
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
497+
</tr>
490498
</table>
491499

492500
#### Networking

0 commit comments

Comments
 (0)