Skip to content
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ private[spark] case class SparkUserAppException(exitCode: Int)
*/
private[spark] case class ExecutorDeadException(message: String)
extends SparkException(message)

/**
* Exception thrown when several InsertHadoopFsRelation operations are conflicted.
*/
private[spark] case class InsertFileSourceConflictException(message: String)
extends SparkException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import org.apache.hadoop.mapreduce._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils


/**
* An interface to define how a single Spark job commits its outputs. Three notes:
*
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
* will be used for tasks on executors.
* 2. Implementations should have a constructor with 2 or 3 arguments:
* (jobId: String, path: String) or
* (jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
* (jobId: String, path: String, dynamicPartitionOverwrite: Boolean) or
* (jobId: String, path: String, fileSourceWriteDesc: Option[FileSourceWriteDesc])
* 3. A committer should not be reused across multiple Spark jobs.
*
* The proper call sequence is:
Expand Down Expand Up @@ -169,4 +169,34 @@ object FileCommitProtocol extends Logging {
ctor.newInstance(jobId, outputPath)
}
}

/**
* Instantiates a FileCommitProtocol with file source write description.
*/
def instantiate(
className: String,
jobId: String,
outputPath: String,
fileSourceWriteDesc: Option[FileSourceWriteDesc]): FileCommitProtocol = {

logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
s" fileSourceWriteDesc= $fileSourceWriteDesc")
val clazz = Utils.classForName[FileCommitProtocol](className)
// First try the constructor with arguments (jobId: String, outputPath: String,
// fileSourceWriteDesc: Option[FileSourceWriteDesc]).
// If that doesn't exist, try to invoke `FileCommitProtocol.instance(className,
// JobId, outputPath, dynamicPartitionOverwrite)`.
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String],
classOf[Option[FileSourceWriteDesc]])
logDebug("Using (String, String, Option[FileSourceWriteDesc]) constructor")
ctor.newInstance(jobId, outputPath, fileSourceWriteDesc)
} catch {
case _: NoSuchMethodException =>
logDebug("Falling back to invoke instance(className, JobId, outputPath," +
" dynamicPartitionOverwrite)")
instantiate(className, jobId, outputPath,
fileSourceWriteDesc.map(_.dynamicPartitionOverwrite).getOrElse(false))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

/**
* A class to describe the properties for file source write operation.
*
* @param isInsertIntoHadoopFsRelation whether is a InsertIntoHadoopFsRelation operation
* @param dynamicPartitionOverwrite dynamic overwrite is enabled, the save mode is overwrite and
* not all partition keys are specified
* @param escapedStaticPartitionKVs static partition key and value pairs, which have been escaped
*/
class FileSourceWriteDesc(
val isInsertIntoHadoopFsRelation: Boolean = false,
val dynamicPartitionOverwrite: Boolean = false,
val escapedStaticPartitionKVs: Seq[(String, String)] = Seq.empty[(String, String)])
extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

package org.apache.spark.internal.io

import java.io.IOException
import java.io.{File, FileNotFoundException, IOException}
import java.util.{Date, UUID}

import scala.collection.mutable
import scala.util.Try
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil

Expand All @@ -40,22 +41,31 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., we first write files under a staging
* directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
* we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move
* files from staging directory to the corresponding partition
* directories under destination path.
* @param fileSourceWriteDesc a description for file source write operation
*/
class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
fileSourceWriteDesc: Option[FileSourceWriteDesc])
extends FileCommitProtocol with Serializable with Logging {

import FileCommitProtocol._
import HadoopMapReduceCommitProtocol._

def this(jobId: String, path: String, dynamicPartitionOverwrite: Boolean = false) =
this(jobId, path, Some(new FileSourceWriteDesc(dynamicPartitionOverwrite =
dynamicPartitionOverwrite)))

/**
* If true, Spark will overwrite partition directories at runtime dynamically, i.e., we first
* write files under a staging directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet.
* When committing the job, we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move files from staging directory to
* the corresponding partition directories under destination path.
*/
def dynamicPartitionOverwrite: Boolean =
fileSourceWriteDesc.map(_.dynamicPartitionOverwrite).getOrElse(false)

/** OutputCommitter from Hadoop is not serializable so marking it transient. */
@transient private var committer: OutputCommitter = _
Expand Down Expand Up @@ -91,7 +101,63 @@ class HadoopMapReduceCommitProtocol(
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)

/**
* For InsertIntoHadoopFsRelation operation, we support concurrent write to different partitions
* in a same table.
*/
def supportConcurrent: Boolean =
fileSourceWriteDesc.map(_.isInsertIntoHadoopFsRelation).getOrElse(false)

/**
* Get escaped static partition key and value pairs, the default is empty.
*/
private def escapedStaticPartitionKVs =
fileSourceWriteDesc.map(_.escapedStaticPartitionKVs).getOrElse(Seq.empty)

/**
* The staging root directory for InsertIntoHadoopFsRelation operation.
*/
@transient private var insertStagingDir: Path = null

/**
* The staging output path for InsertIntoHadoopFsRelation operation.
*/
@transient private var stagingOutputPath: Path = null

/**
* Get the desired output path for the job. The output will be [[path]] when current operation
* is not a InsertIntoHadoopFsRelation operation. Otherwise, we choose a sub path composed of
* [[escapedStaticPartitionKVs]] under [[insertStagingDir]] over [[path]] to mark this operation
* and we can detect whether there is a operation conflict with current by checking the existence
* of relative output path.
*
* @return Path the desired output path.
*/
protected def getOutputPath(context: TaskAttemptContext): Path = {
if (supportConcurrent) {
val insertStagingPath = ".spark-staging-" + escapedStaticPartitionKVs.size
insertStagingDir = new Path(path, insertStagingPath)
val appId = SparkEnv.get.conf.getAppId
val outputPath = new Path(path, Array(insertStagingPath,
getEscapedStaticPartitionPath(escapedStaticPartitionKVs), appId, jobId)
.mkString(File.separator))
insertStagingDir.getFileSystem(context.getConfiguration).makeQualified(outputPath)
outputPath
} else {
new Path(path)
}
}

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
if (supportConcurrent) {
stagingOutputPath = getOutputPath(context)
context.getConfiguration.set(FileOutputFormat.OUTDIR, stagingOutputPath.toString)
logDebug("Set file output committer algorithm version to 2 implicitly," +
" for that the task output would be committed to staging output path firstly," +
" which is equivalent to algorithm 1.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about logDebug? It produces a lot of logs in Jenkins.
@advancedxy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about 15843 lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

15843 is a lot, however, it would be not that much inside one spark application.
One way to solve this, is to use an object level counter to only log the first warning log(or logs).
But I am not sure if that's worth it. Also, the head of logs may get rotated and discarded...

Or use logDebug is fine, but normally user won't set log level to DEBUG.

I am not sure which one is better. It's up to you then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what that means. V2 task commit is non-atomic so isn't the same as v1. if task attempt 1 failed, task attempt 2 will call mergepaths into the same dir, so the set of files to commit in job commit may contain the output of both.

context.getConfiguration.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2)
}

val format = context.getOutputFormatClass.getConstructor().newInstance()
// If OutputFormat is Configurable, we should set conf to it.
format match {
Expand Down Expand Up @@ -200,6 +266,16 @@ class HadoopMapReduceCommitProtocol(
}
fs.rename(new Path(stagingDir, part), finalPartPath)
}
} else if (supportConcurrent) {
// For InsertIntoHadoopFsRelation operation, the result has been committed to staging
// output path, merge it to destination path.
mergeStagingPath(fs, stagingOutputPath, new Path(path))
}

if (supportConcurrent) {
// For InsertIntoHadoopFsRelation operation, try to delete its staging output path.
deleteStagingInsertOutputPath(fs, insertStagingDir, stagingOutputPath,
escapedStaticPartitionKVs)
}

fs.delete(stagingDir, true)
Expand All @@ -224,6 +300,8 @@ class HadoopMapReduceCommitProtocol(
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true)
deleteStagingInsertOutputPath(fs, insertStagingDir, stagingOutputPath,
escapedStaticPartitionKVs)
}
} catch {
case e: IOException =>
Expand Down Expand Up @@ -272,3 +350,121 @@ class HadoopMapReduceCommitProtocol(
}
}
}

object HadoopMapReduceCommitProtocol extends Logging {

/**
* Get a path according to specified partition key-value pairs.
*/
def getEscapedStaticPartitionPath(staticPartitionKVs: Iterable[(String, String)]): String = {
staticPartitionKVs.map{kv =>
kv._1 + "=" + kv._2
}.mkString(File.separator)
}

/**
* Delete the staging output path of current InsertIntoHadoopFsRelation operation. This output
* path is used to mark a InsertIntoHadoopFsRelation operation and we can detect conflict when
* there are several operations write same partition or a non-partitioned table concurrently.
*
* The output path is a multi level path and is composed of specified partition key value pairs
* formatted `.spark-staging-${depth}/p1=v1/p2=v2/.../pn=vn/appId/jobId`. When deleting the
* staging output path, delete the last level with recursive firstly. Then try to delete upper
* level without recursive, if success, then delete upper level with same way, until delete the
* insertStagingDir.
*/
def deleteStagingInsertOutputPath(
fs: FileSystem,
insertStagingDir: Path,
stagingOutputDir: Path,
escapedStaticPartitionKVs: Seq[(String, String)]): Unit = {
if (insertStagingDir == null || stagingOutputDir ==null || !fs.isDirectory(stagingOutputDir)) {
return
}

// Firstly, delete the staging output dir with recursive, because it is unique.
deleteSilently(fs, stagingOutputDir, true)

var currentLevelPath = stagingOutputDir.getParent
while (currentLevelPath != insertStagingDir) {
deleteSilently(fs, currentLevelPath, false)
currentLevelPath = currentLevelPath.getParent
}

deleteSilently(fs, insertStagingDir, false)
}

private def deleteSilently(fs: FileSystem, path: Path, recursive: Boolean): Unit = {
try {
if (!fs.delete(path, recursive)) {
logWarning(s"Failed to delete path:$path with recursive:$recursive")
}
} catch {
case e: Exception =>
logWarning(s"Exception occurred when deleting dir: $path.", e)
}
}

/**
* Merge files under staging output path to destination path. Before merging, we need delete the
* succeeded file under staging output path and regenerate it after merging completed.
*/
private def mergeStagingPath(
fs: FileSystem,
stagingOutputPath: Path,
destPath: Path): Unit = {
val stagingMarkerPath = new Path(stagingOutputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fs.delete(stagingMarkerPath, true)

doMergePaths(fs, fs.getFileStatus(stagingOutputPath), destPath)

val markerPath = new Path(destPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fs.create(markerPath, true).close()
}

/**
* This is a reflected implementation of [[FileOutputCommitter]]'s mergePaths.
* Just remove some unnecessary operations to improve performance.
*/
@throws[IOException]
private def doMergePaths(fs: FileSystem, from: FileStatus, to: Path): Unit = {
logDebug(s"Merging data from $from to $to")

val toStat: FileStatus = Try {
fs.getFileStatus(to)
} match {
case Success(stat) => stat
case Failure(_: FileNotFoundException) => null
case Failure(e) => throw e
}

if (from.isFile) {
if (toStat != null && !fs.delete(to, true)) {
throw new IOException(s"Failed to delete $to" )
}
rename(fs, from, to)
} else if (from.isDirectory) {
if (toStat != null) {
if (!toStat.isDirectory) {
if (!fs.delete(to, true)) {
throw new IOException(s"Failed to delete $to")
}
rename(fs, from, to)
} else {
fs.listStatus(from.getPath).foreach { fileToMove =>
doMergePaths(fs, fileToMove, new Path(to, fileToMove.getPath.getName))
}
}
} else {
rename(fs, from, to)
}
}
}

@throws[IOException]
private def rename(fs: FileSystem, from: FileStatus, to: Path): Unit = {
if (!fs.rename(from.getPath, to)) {
throw new IOException(s"Failed to rename $from to $to")
}
}
}
Loading