Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.util.Utils
abstract class FileCommitProtocol {
import FileCommitProtocol._

def getJobId(): String
/**
* Setups up a job. Must be called on the driver before any other methods can be invoked.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class HadoopMapReduceCommitProtocol(
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)

/**
* The temp job file
*/
protected def tempDir = new Path(path, ".temp-" + jobId)

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
Expand Down Expand Up @@ -166,9 +171,23 @@ class HadoopMapReduceCommitProtocol(
committer.commitJob(jobContext)

if (hasValidPath) {

val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
// move all file from temp dir to output
if (fs.exists(tempDir)) {
val files = fs.listFiles(tempDir, false)
while (files.hasNext) {
val file = files.next()
val name = file.getPath().getName()
val to = new Path(path, name)
fs.rename(file.getPath, to)
}

fs.delete(tempDir, true)
}

val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)

val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
Expand Down Expand Up @@ -235,4 +254,6 @@ class HadoopMapReduceCommitProtocol(
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
}

override def getJobId(): String = jobId
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ object FileFormatWriter extends Logging {
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
val jobOutputPath = new Path(outputSpec.outputPath, s".temp-${committer.getJobId()}")
val fs = jobOutputPath.getFileSystem(hadoopConf)
fs.mkdirs(jobOutputPath)
FileOutputFormat.setOutputPath(job, jobOutputPath)

val partitionSet = AttributeSet(partitionColumns)
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol(
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)

committer = ctor.newInstance(new Path(path, tempDir), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,6 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
// Do nothing
// TODO: we can also try delete the addedFiles as a best-effort cleanup.
}

override def getJobId(): String = jobId
}