diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e6e9c9e32885..af325aa9adc5 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -45,6 +45,7 @@ import org.apache.spark.util.Utils abstract class FileCommitProtocol { import FileCommitProtocol._ + def getJobId(): String /** * Setups up a job. Must be called on the driver before any other methods can be invoked. */ diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3e60c50ada59..665091536a80 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -90,6 +90,11 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * The temp job file + */ + protected def tempDir = new Path(path, ".temp-" + jobId) + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -166,9 +171,23 @@ class HadoopMapReduceCommitProtocol( committer.commitJob(jobContext) if (hasValidPath) { + + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + // move all file from temp dir to output + if (fs.exists(tempDir)) { + val files = fs.listFiles(tempDir, false) + while (files.hasNext) { + val file = files.next() + val name = file.getPath().getName() + val to = new Path(path, name) + fs.rename(file.getPath, to) + } + + fs.delete(tempDir, true) + } + val (allAbsPathFiles, allPartitionPaths) = taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") @@ -235,4 +254,6 @@ class HadoopMapReduceCommitProtocol( tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) } } + + override def getJobId(): String = jobId } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 401597f96721..77e3faeec4ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -117,7 +117,10 @@ object FileFormatWriter extends Logging { val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + val jobOutputPath = new Path(outputSpec.outputPath, s".temp-${committer.getJobId()}") + val fs = jobOutputPath.getFileSystem(hadoopConf) + fs.mkdirs(jobOutputPath) + FileOutputFormat.setOutputPath(job, jobOutputPath) val partitionSet = AttributeSet(partitionColumns) val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 39c594a9bc61..fe4691eac34a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol( // The specified output committer is a FileOutputCommitter. // So, we will use the FileOutputCommitter-specified constructor. val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) + + committer = ctor.newInstance(new Path(path, tempDir), context) } else { // The specified output committer is just an OutputCommitter. // So, we will use the no-argument constructor. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 92191c8b64b7..5683e0894973 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -117,4 +117,6 @@ class ManifestFileCommitProtocol(jobId: String, path: String) // Do nothing // TODO: we can also try delete the addedFiles as a best-effort cleanup. } + + override def getJobId(): String = jobId }