Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.spark.util.Utils


/**
* An interface to define how a single Spark job commits its outputs. Two notes:
* An interface to define how a single Spark job commits its outputs. Three notes:
*
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
* will be used for tasks on executors.
* 2. Implementations should have a constructor with either 2 or 3 arguments:
* (jobId: String, path: String) or (jobId: String, path: String, isAppend: Boolean).
* 2. Implementations should have a constructor with 2 arguments:
* (jobId: String, path: String)
* 3. A committer should not be reused across multiple Spark jobs.
*
* The proper call sequence is:
Expand Down Expand Up @@ -139,19 +139,10 @@ object FileCommitProtocol {
/**
* Instantiates a FileCommitProtocol using the given className.
*/
def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean)
def instantiate(className: String, jobId: String, outputPath: String)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anybody in open source community needs it, please let us know.

: FileCommitProtocol = {
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]

// First try the one with argument (jobId: String, outputPath: String, isAppend: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ class HadoopMapRedWriteConfigUtil[K, V: ClassTag](conf: SerializableJobConf)
FileCommitProtocol.instantiate(
className = classOf[HadoopMapRedCommitProtocol].getName,
jobId = jobId.toString,
outputPath = getConf.get("mapred.output.dir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
outputPath = getConf.get("mapred.output.dir")
).asInstanceOf[HadoopMapReduceCommitProtocol]
}

// --------------------------------------------------------------------------
Expand Down Expand Up @@ -325,8 +325,8 @@ class HadoopMapReduceWriteConfigUtil[K, V: ClassTag](conf: SerializableConfigura
FileCommitProtocol.instantiate(
className = classOf[HadoopMapReduceCommitProtocol].getName,
jobId = jobId.toString,
outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir"),
isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir")
).asInstanceOf[HadoopMapReduceCommitProtocol]
}

// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ private case class OutputCommitFunctions(tempDirPath: String) {
val committer = FileCommitProtocol.instantiate(
className = classOf[HadoopMapRedCommitProtocol].getName,
jobId = jobId.value.getId.toString,
outputPath = jobConf.get("mapred.output.dir"),
isAppend = false)
outputPath = jobConf.get("mapred.output.dir"))

// Create TaskAttemptContext.
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,10 @@ object SQLConf {

// The output committer class used by data sources. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
val OUTPUT_COMMITTER_CLASS =
buildConf("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
val OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.sources.outputCommitterClass")
.internal()
.stringConf
.createOptional

val FILE_COMMIT_PROTOCOL_CLASS =
buildConf("spark.sql.sources.commitProtocolClass")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ case class InsertIntoHadoopFsRelationCommand(
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputPath.toString,
isAppend = isAppend)
outputPath = outputPath.toString)

val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,34 @@ import org.apache.spark.sql.internal.SQLConf
* A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
* Hadoop output committer using an option specified in SQLConf.
*/
class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)

if (!isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the appending job fails.
// See SPARK-8578 for more details.
val configuration = context.getConfiguration
val clazz =
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
val configuration = context.getConfiguration
val clazz =
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

if (clazz != null) {
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
if (clazz != null) {
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
committer = ctor.newInstance()
}
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
committer = ctor.newInstance()
}
}
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ class FileStreamSink(
val committer = FileCommitProtocol.instantiate(
className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
jobId = batchId.toString,
outputPath = path,
isAppend = false)
outputPath = path)

committer match {
case manifestCommitter: ManifestFileCommitProtocol =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,13 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend)
private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String)
extends SQLHadoopMapReduceCommitProtocol(jobId, path)
with Serializable with Logging {

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
if (isAppend) {
throw new Exception("append data to an existed partitioned table, " +
"there should be no custom partition path sent to Task")
}

super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
throw new Exception("there should be no custom partition path")
}
}

Expand Down Expand Up @@ -115,7 +110,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
}
}

test("append data to an existed partitioned table without custom partition path") {
test("append data to an existing partitioned table without custom partition path") {
withTable("t") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[OnlyDetectCustomPathFileCommitProtocol].getName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ case class InsertIntoHiveTable(
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = tmpLocation.toString,
isAppend = false)
outputPath = tmpLocation.toString)

val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
Expand Down