diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 7efa9416362a..50f51e1af453 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -24,12 +24,12 @@ import org.apache.spark.util.Utils /** - * An interface to define how a single Spark job commits its outputs. Two notes: + * An interface to define how a single Spark job commits its outputs. Three notes: * * 1. Implementations must be serializable, as the committer instance instantiated on the driver * will be used for tasks on executors. - * 2. Implementations should have a constructor with either 2 or 3 arguments: - * (jobId: String, path: String) or (jobId: String, path: String, isAppend: Boolean). + * 2. Implementations should have a constructor with 2 arguments: + * (jobId: String, path: String) * 3. A committer should not be reused across multiple Spark jobs. * * The proper call sequence is: @@ -139,19 +139,10 @@ object FileCommitProtocol { /** * Instantiates a FileCommitProtocol using the given className. */ - def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean) + def instantiate(className: String, jobId: String, outputPath: String) : FileCommitProtocol = { val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]] - - // First try the one with argument (jobId: String, outputPath: String, isAppend: Boolean). - // If that doesn't exist, try the one with (jobId: string, outputPath: String). - try { - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean]) - ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean]) - } catch { - case _: NoSuchMethodException => - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) - ctor.newInstance(jobId, outputPath) - } + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) + ctor.newInstance(jobId, outputPath) } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 7d846f9354df..949d8c677998 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -197,8 +197,8 @@ class HadoopMapRedWriteConfigUtil[K, V: ClassTag](conf: SerializableJobConf) FileCommitProtocol.instantiate( className = classOf[HadoopMapRedCommitProtocol].getName, jobId = jobId.toString, - outputPath = getConf.get("mapred.output.dir"), - isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] + outputPath = getConf.get("mapred.output.dir") + ).asInstanceOf[HadoopMapReduceCommitProtocol] } // -------------------------------------------------------------------------- @@ -325,8 +325,8 @@ class HadoopMapReduceWriteConfigUtil[K, V: ClassTag](conf: SerializableConfigura FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = jobId.toString, - outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir"), - isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] + outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir") + ).asInstanceOf[HadoopMapReduceCommitProtocol] } // -------------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 60b595532198..03b190390249 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -265,8 +265,7 @@ private case class OutputCommitFunctions(tempDirPath: String) { val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapRedCommitProtocol].getName, jobId = jobId.value.getId.toString, - outputPath = jobConf.get("mapred.output.dir"), - isAppend = false) + outputPath = jobConf.get("mapred.output.dir")) // Create TaskAttemptContext. // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 55558ca9f700..824908ddeb61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -480,8 +480,10 @@ object SQLConf { // The output committer class used by data sources. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. - val OUTPUT_COMMITTER_CLASS = - buildConf("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional + val OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.sources.outputCommitterClass") + .internal() + .stringConf + .createOptional val FILE_COMMIT_PROTOCOL_CLASS = buildConf("spark.sql.sources.commitProtocolClass") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index c1bcfb861078..9ebe1e4d71c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -98,8 +98,7 @@ case class InsertIntoHadoopFsRelationCommand( val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, - outputPath = outputPath.toString, - isAppend = isAppend) + outputPath = outputPath.toString) val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 9b9ed28412ca..40825a1f724b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -29,41 +29,34 @@ import org.apache.spark.sql.internal.SQLConf * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual * Hadoop output committer using an option specified in SQLConf. */ -class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) +class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String) extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging { override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) - if (!isAppend) { - // If we are appending data to an existing dir, we will only use the output committer - // associated with the file output format since it is not safe to use a custom - // committer for appending. For example, in S3, direct parquet output committer may - // leave partial data in the destination dir when the appending job fails. - // See SPARK-8578 for more details. - val configuration = context.getConfiguration - val clazz = - configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + val configuration = context.getConfiguration + val clazz = + configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - if (clazz != null) { - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + if (clazz != null) { + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) - } else { - // The specified output committer is just an OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - committer = ctor.newInstance() - } + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + committer = ctor.newInstance(new Path(path), context) + } else { + // The specified output committer is just an OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + committer = ctor.newInstance() } } logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 96225ecffad4..0ed2dbe17740 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -102,8 +102,7 @@ class FileStreamSink( val committer = FileCommitProtocol.instantiate( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, - outputPath = path, - isAppend = false) + outputPath = path) committer match { case manifestCommitter: ManifestFileCommitProtocol => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 6f998aa60faf..0fe33e87318a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -32,18 +32,13 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils -private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean) - extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) +private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String) + extends SQLHadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging { override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - if (isAppend) { - throw new Exception("append data to an existed partitioned table, " + - "there should be no custom partition path sent to Task") - } - - super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) + throw new Exception("there should be no custom partition path") } } @@ -115,7 +110,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { } } - test("append data to an existed partitioned table without custom partition path") { + test("append data to an existing partitioned table without custom partition path") { withTable("t") { withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[OnlyDetectCustomPathFileCommitProtocol].getName) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index cd263e8b6df8..b9461ad489bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -337,8 +337,7 @@ case class InsertIntoHiveTable( val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, - outputPath = tmpLocation.toString, - isAppend = false) + outputPath = tmpLocation.toString) val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {