diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6018c87b01224..b86909dbf9b47 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -287,12 +287,7 @@ class SparkContext(config: SparkConf) extends Logging { conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { - SparkEnv.createDriverEnv( - conf, - isLocal, - listenerBus, - SparkContext.numDriverCores(master, conf), - this) + SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) } private[spark] def env: SparkEnv = _env diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 728b24d517327..de2d215562b9f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -258,7 +258,6 @@ object SparkEnv extends Logging { isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, - sparkContext: SparkContext, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") @@ -281,7 +280,6 @@ object SparkEnv extends Logging { numCores, ioEncryptionKey, listenerBus = listenerBus, - Option(sparkContext), mockOutputCommitCoordinator = mockOutputCommitCoordinator ) } @@ -317,7 +315,6 @@ object SparkEnv extends Logging { /** * Helper method to create a SparkEnv for a driver or an executor. */ - // scalastyle:off argcount private def create( conf: SparkConf, executorId: String, @@ -328,9 +325,7 @@ object SparkEnv extends Logging { numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, - sc: Option[SparkContext] = None, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { - // scalastyle:on argcount val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER @@ -473,12 +468,7 @@ object SparkEnv extends Logging { } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { - if (isDriver) { - new OutputCommitCoordinator(conf, isDriver, sc) - } else { - new OutputCommitCoordinator(conf, isDriver) - } - + new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index cd5d6b8f9c90d..a5858ebf9cdcc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -44,10 +44,7 @@ private case class AskPermissionToCommitOutput( * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) * for an extensive design discussion. */ -private[spark] class OutputCommitCoordinator( - conf: SparkConf, - isDriver: Boolean, - sc: Option[SparkContext] = None) extends Logging { +private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging { // Initialized by SparkEnv var coordinatorRef: Option[RpcEndpointRef] = None @@ -158,10 +155,9 @@ private[spark] class OutputCommitCoordinator( val taskId = TaskIdentifier(stageAttempt, attemptNumber) stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId if (stageState.authorizedCommitters(partition) == taskId) { - sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " + - s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " + - s"but task commit success, data duplication may happen. " + - s"reason=$reason")) + logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + + s"partition=$partition) failed; clearing lock") + stageState.authorizedCommitters(partition) = null } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index fcacd223814c7..ecc91560714d1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext} import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} +import org.scalatest.time.{Seconds, Span} -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext} /** * Integration tests for the OutputCommitCoordinator. @@ -44,15 +45,13 @@ class OutputCommitCoordinatorIntegrationSuite sc = new SparkContext("local[2, 4]", "test", conf) } - test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") { + test("exception thrown in OutputCommitter.commitTask()") { // Regression test for SPARK-10381 - val e = intercept[SparkException] { + failAfter(Span(60, Seconds)) { withTempDir { tempDir => sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out") } - }.getCause.getMessage - assert(e.contains("failed; but task commit success, data duplication may happen.") && - e.contains("Intentional exception")) + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index f1a4b97c2981d..46b95177e7719 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -87,12 +87,11 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { outputCommitCoordinator = - spy[OutputCommitCoordinator]( - new OutputCommitCoordinator(conf, isDriver = true, Option(this))) + spy[OutputCommitCoordinator](new OutputCommitCoordinator(conf, isDriver = true)) // Use Mockito.spy() to maintain the default infrastructure everywhere else. // This mocking allows us to control the coordinator responses in test cases. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, - SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator)) + SparkContext.numDriverCores(master), Some(outputCommitCoordinator)) } } // Use Mockito.spy() to maintain the default infrastructure everywhere else @@ -190,9 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // The authorized committer now fails, clearing the lock outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test")) - // A new task should not be allowed to become stage failed because of potential data duplication - assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + // A new task should now be allowed to become the authorized committer + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, nonAuthorizedCommitter + 2)) + // There can only be one authorized committer + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + nonAuthorizedCommitter + 3)) } test("SPARK-19631: Do not allow failed attempts to be authorized for committing") { @@ -226,8 +228,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit, - // then fail the 1st attempt and since stage failed because of potential data duplication, - // make sure fail the 4th attempt. + // then fail the 1st attempt and make sure the 4th one can commit again. stage += 1 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) @@ -236,9 +237,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt)) outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) - // A new task should not be allowed to become the authorized committer since stage failed - // because of potential data duplication - assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) + assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) } test("SPARK-24589: Make sure stage state is cleaned up") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4fb8faa43a39e..02e1c70cc8cb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -1206,6 +1206,43 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession errorMessage.contains("is not a valid DFS filename")) } + test("SPARK-7837 Do not close output writer twice when commitTask() fails") { + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // Using a output committer that always fail when committing a task, so that both + // `commitTask()` and `abortTask()` are invoked. + val extraOptions = Map[String, String]( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName + ) + + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. + + withTempPath { dir => + val m1 = intercept[SparkException] { + spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + } + assert(m1.getErrorClass == "TASK_WRITE_FAILED") + assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes")) + } + + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b")) + .coalesce(1) + df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) + } + if (m2.getErrorClass != null) { + assert(m2.getErrorClass == "TASK_WRITE_FAILED") + assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes")) + } else { + assert(m2.getMessage.contains("TASK_WRITE_FAILED")) + } + } + } + } + test("SPARK-11044 Parquet writer version fixed as version1 ") { withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { @@ -1550,52 +1587,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } -// Parquet IO test suite with output commit coordination disabled. -// This test suite is separated ParquetIOSuite to avoid race condition of failure events -// from `OutputCommitCoordination` and `TaskSetManager`. -class ParquetIOWithoutOutputCommitCoordinationSuite - extends QueryTest with ParquetTest with SharedSparkSession { - import testImplicits._ - - override protected def sparkConf: SparkConf = { - super.sparkConf - .set("spark.hadoop.outputCommitCoordination.enabled", "false") - } - - test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> - classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { - // Using a output committer that always fail when committing a task, so that both - // `commitTask()` and `abortTask()` are invoked. - val extraOptions = Map[String, String]( - SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> - classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName - ) - - // Before fixing SPARK-7837, the following code results in an NPE because both - // `commitTask()` and `abortTask()` try to close output writers. - - withTempPath { dir => - val m1 = intercept[SparkException] { - spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) - } - assert(m1.getErrorClass == "TASK_WRITE_FAILED") - assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes")) - } - - withTempPath { dir => - val m2 = intercept[SparkException] { - val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b")) - .coalesce(1) - df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) - } - assert(m2.getErrorClass == "TASK_WRITE_FAILED") - assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes")) - } - } - } -} - class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) {