Skip to content

Commit f45ca9b

Browse files
committed
fix code
1 parent 0433977 commit f45ca9b

File tree

1 file changed

+9
-30
lines changed

1 file changed

+9
-30
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -383,18 +383,18 @@ object HadoopMapReduceCommitProtocol extends Logging {
383383
}
384384

385385
// Firstly, delete the staging output dir with recursive, because it is unique.
386-
deletePath(fs, stagingOutputDir, true)
386+
deleteSilently(fs, stagingOutputDir, true)
387387

388388
var currentLevelPath = stagingOutputDir.getParent
389389
while (currentLevelPath != insertStagingDir) {
390-
deletePath(fs, currentLevelPath, false)
390+
deleteSilently(fs, currentLevelPath, false)
391391
currentLevelPath = currentLevelPath.getParent
392392
}
393393

394-
deletePath(fs, insertStagingDir, false)
394+
deleteSilently(fs, insertStagingDir, false)
395395
}
396396

397-
private def deletePath(fs: FileSystem, path: Path, recursive: Boolean): Unit = {
397+
private def deleteSilently(fs: FileSystem, path: Path, recursive: Boolean): Unit = {
398398
try {
399399
if (!fs.delete(path, recursive)) {
400400
logWarning(s"Failed to delete path:$path with recursive:$recursive")
@@ -405,24 +405,6 @@ object HadoopMapReduceCommitProtocol extends Logging {
405405
}
406406
}
407407

408-
/**
409-
* Used to check whether there are some remaining files under staging output path.
410-
*/
411-
private def checkHasRemainingFiles(
412-
fs: FileSystem,
413-
path: Path): Boolean = {
414-
var statusList = Seq(fs.getFileStatus(path))
415-
var found = false
416-
while (!found && !statusList.isEmpty) {
417-
if (statusList.exists(_.isFile)) {
418-
found = true
419-
} else {
420-
statusList = statusList.flatMap(s => fs.listStatus(s.getPath))
421-
}
422-
}
423-
found
424-
}
425-
426408
/**
427409
* Merge files under staging output path to destination path. Before merging, we need delete the
428410
* succeeded file under staging output path and regenerate it after merging completed.
@@ -431,21 +413,18 @@ object HadoopMapReduceCommitProtocol extends Logging {
431413
fs: FileSystem,
432414
stagingOutputPath: Path,
433415
destPath: Path): Unit = {
434-
val SUCCEEDED_FILE_NAME = FileOutputCommitter.SUCCEEDED_FILE_NAME
435-
val stagingMarkerPath = new Path(stagingOutputPath, SUCCEEDED_FILE_NAME)
416+
val stagingMarkerPath = new Path(stagingOutputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
436417
fs.delete(stagingMarkerPath, true)
437418

438-
do {
439-
doMergePaths(fs, fs.getFileStatus(stagingOutputPath), destPath)
440-
} while (checkHasRemainingFiles(fs, stagingOutputPath))
419+
doMergePaths(fs, fs.getFileStatus(stagingOutputPath), destPath)
441420

442-
val markerPath = new Path(destPath, SUCCEEDED_FILE_NAME)
443-
fs.create(markerPath).close()
421+
val markerPath = new Path(destPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
422+
fs.create(markerPath, true).close()
444423
}
445424

446425
/**
447426
* This is a reflected implementation of [[FileOutputCommitter]]'s mergePaths.
448-
* Just remove some unnecessary operation to improve performance.
427+
* Just remove some unnecessary operations to improve performance.
449428
*/
450429
@throws[IOException]
451430
private def doMergePaths(fs: FileSystem, from: FileStatus, to: Path): Unit = {

0 commit comments

Comments
 (0)