Skip to content

Commit 312b07d

Browse files
committed
A data source can use spark.sql.sources.outputCommitterClass to override the output committer.
1 parent 814b3da commit 312b07d

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ private[spark] object SQLConf {
6969
// Whether to perform partition discovery when loading external data sources. Default to true.
7070
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
7171

72+
// The output committer class used by FSBasedRelation. The specified class needs to be a
73+
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
74+
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
75+
7276
// Whether to perform eager analysis when constructing a dataframe.
7377
// Set to false when debugging requires the ability to look at invalid query plans.
7478
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private[sql] class ParquetRelation2(
197197
classOf[ParquetOutputCommitter])
198198

199199
conf.setClass(
200-
"mapred.output.committer.class",
200+
SQLConf.OUTPUT_COMMITTER_CLASS,
201201
committerClass,
202202
classOf[ParquetOutputCommitter])
203203

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable
2323

2424
import org.apache.hadoop.fs.Path
2525
import org.apache.hadoop.mapreduce._
26-
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
26+
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
2727
import org.apache.hadoop.util.Shell
2828
import parquet.hadoop.util.ContextUtil
2929

@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
3636
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3737
import org.apache.spark.sql.execution.RunnableCommand
38-
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
38+
import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
3939

4040
private[sql] case class InsertIntoDataSource(
4141
logicalRelation: LogicalRelation,
@@ -287,24 +287,39 @@ private[sql] abstract class BaseWriterContainer(
287287
protected def getWorkPath: String = {
288288
outputCommitter match {
289289
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
290-
case f: FileOutputCommitter => f.getWorkPath.toString
290+
case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
291291
case _ => outputPath
292292
}
293293
}
294294

295295
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
296296
val committerClass = context.getConfiguration.getClass(
297-
"mapred.output.committer.class", null, classOf[OutputCommitter])
297+
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
298298

299299
Option(committerClass).map { clazz =>
300-
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
301-
ctor.newInstance(new Path(outputPath), context)
300+
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
301+
// has an associated output committer. To override this output committer,
302+
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
303+
// If a data source needs to override the output committer, it needs to set the
304+
// output committer in prepareForWrite method.
305+
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
306+
// The specified output committer is a FileOutputCommitter.
307+
// So, we will use the FileOutputCommitter-specified constructor.
308+
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
309+
ctor.newInstance(new Path(outputPath), context)
310+
} else {
311+
// The specified output committer is just a OutputCommitter.
312+
// So, we will use the no-argument constructor.
313+
val ctor = clazz.getDeclaredConstructor()
314+
ctor.newInstance()
315+
}
302316
}.getOrElse {
317+
// If output committer class is not set, we will use the one associated with the
318+
// file output format.
303319
outputFormatClass.newInstance().getOutputCommitter(context)
304320
}
305321
}
306322

307-
308323
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
309324
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
310325
this.taskId = new TaskID(this.jobId, true, splitId)

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
527527

528528
/**
529529
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
530-
* be put here. For example, user defined output committer can be configured here.
530+
* be put here. For example, user defined output committer can be configured here
531+
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
531532
*
532533
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
533534
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states

0 commit comments

Comments
 (0)