Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ private[spark] object SQLConf {
// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"

// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"

// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private[sql] class ParquetRelation2(
classOf[ParquetOutputCommitter])

conf.setClass(
"mapred.output.committer.class",
SQLConf.OUTPUT_COMMITTER_CLASS,
committerClass,
classOf[ParquetOutputCommitter])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.util.Shell
import parquet.hadoop.util.ContextUtil

Expand All @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}

private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
Expand Down Expand Up @@ -287,24 +287,39 @@ private[sql] abstract class BaseWriterContainer(
protected def getWorkPath: String = {
outputCommitter match {
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
case f: FileOutputCommitter => f.getWorkPath.toString
case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
case _ => outputPath
}
}

private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val committerClass = context.getConfiguration.getClass(
"mapred.output.committer.class", null, classOf[OutputCommitter])
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
outputFormatClass.newInstance().getOutputCommitter(context)
}
}


private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio

/**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
* be put here. For example, user defined output committer can be configured here.
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
*
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
Expand Down