From dbff81eac4629ab95acdf6c7393607ecbeb1b9c2 Mon Sep 17 00:00:00 2001 From: yangz Date: Sat, 19 May 2018 11:02:47 +0800 Subject: [PATCH 1/2] [SPARK-24238] HadoopFsRelation can't append the same table with multi job at the same time --- .../internal/io/FileCommitProtocol.scala | 1 + .../io/HadoopMapReduceCommitProtocol.scala | 21 ++++++++++++++++++- .../datasources/FileFormatWriter.scala | 5 ++++- .../SQLHadoopMapReduceCommitProtocol.scala | 3 ++- .../ManifestFileCommitProtocol.scala | 2 ++ 5 files changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e6e9c9e32885..af325aa9adc5 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -45,6 +45,7 @@ import org.apache.spark.util.Utils abstract class FileCommitProtocol { import FileCommitProtocol._ + def getJobId(): String /** * Setups up a job. Must be called on the driver before any other methods can be invoked. */ diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3e60c50ada59..421f45064d47 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -90,6 +90,11 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * The temp job file + */ + protected def tempDir = new Path(path, ".temp-" + jobId) + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -166,9 +171,21 @@ class HadoopMapReduceCommitProtocol( committer.commitJob(jobContext) if (hasValidPath) { + + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + // move all file from temp dir to output + val files = fs.listFiles(tempDir, false) + while (files.hasNext) { + val file = files.next() + val name = file.getPath().getName() + val to = new Path(path, name) + fs.rename(file.getPath, to) + } + + fs.delete(tempDir, true) + val (allAbsPathFiles, allPartitionPaths) = taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") @@ -235,4 +252,6 @@ class HadoopMapReduceCommitProtocol( tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) } } + + override def getJobId(): String = jobId } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 401597f96721..77e3faeec4ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -117,7 +117,10 @@ object FileFormatWriter extends Logging { val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + val jobOutputPath = new Path(outputSpec.outputPath, s".temp-${committer.getJobId()}") + val fs = jobOutputPath.getFileSystem(hadoopConf) + fs.mkdirs(jobOutputPath) + FileOutputFormat.setOutputPath(job, jobOutputPath) val partitionSet = AttributeSet(partitionColumns) val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 39c594a9bc61..fe4691eac34a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol( // The specified output committer is a FileOutputCommitter. // So, we will use the FileOutputCommitter-specified constructor. val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) + + committer = ctor.newInstance(new Path(path, tempDir), context) } else { // The specified output committer is just an OutputCommitter. // So, we will use the no-argument constructor. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 92191c8b64b7..5683e0894973 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -117,4 +117,6 @@ class ManifestFileCommitProtocol(jobId: String, path: String) // Do nothing // TODO: we can also try delete the addedFiles as a best-effort cleanup. } + + override def getJobId(): String = jobId } From 49532fe871278473dd18807efdc7fac68e0c0b26 Mon Sep 17 00:00:00 2001 From: yangz Date: Sat, 19 May 2018 11:06:16 +0800 Subject: [PATCH 2/2] [SPARK-24238] HadoopFsRelation can't append the same table with multi job at the same time fix unit test error, add some check --- .../io/HadoopMapReduceCommitProtocol.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 421f45064d47..665091536a80 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -174,15 +174,17 @@ class HadoopMapReduceCommitProtocol( val fs = stagingDir.getFileSystem(jobContext.getConfiguration) // move all file from temp dir to output - val files = fs.listFiles(tempDir, false) - while (files.hasNext) { - val file = files.next() - val name = file.getPath().getName() - val to = new Path(path, name) - fs.rename(file.getPath, to) - } + if (fs.exists(tempDir)) { + val files = fs.listFiles(tempDir, false) + while (files.hasNext) { + val file = files.next() + val name = file.getPath().getName() + val to = new Path(path, name) + fs.rename(file.getPath, to) + } - fs.delete(tempDir, true) + fs.delete(tempDir, true) + } val (allAbsPathFiles, allPartitionPaths) = taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip