Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
19135e0
[SPARK-39195][SQL] Spark should use two step update of outputCommitCo…
AngersZhuuuu May 16, 2022
7062d32
Update
AngersZhuuuu May 17, 2022
f4caa22
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu May 18, 2022
5e6c0be
Spark OutputCommitCoordinator should keep consistent
AngersZhuuuu May 24, 2022
171bd6a
Merge branch 'SPARK-39195' of https://github.com/AngersZhuuuu/spark i…
AngersZhuuuu May 24, 2022
b070c9d
revert
AngersZhuuuu May 24, 2022
796c08c
Update OutputCommitCoordinator.scala
AngersZhuuuu May 24, 2022
1d79aae
Merge remote-tracking branch 'upstream/master' into SPARK-39195
AngersZhuuuu May 25, 2022
5f5729b
Update SparkHadoopMapRedUtil.scala
AngersZhuuuu May 25, 2022
759814b
Update
AngersZhuuuu May 25, 2022
b4e11cc
trigger
AngersZhuuuu May 26, 2022
7642cdb
Update SparkHadoopMapRedUtil.scala
AngersZhuuuu May 27, 2022
e6dce26
Revert "Update SparkHadoopMapRedUtil.scala"
AngersZhuuuu May 27, 2022
6a403d7
Revert "Update"
AngersZhuuuu May 27, 2022
bc1214c
Revert "Update SparkHadoopMapRedUtil.scala"
AngersZhuuuu May 27, 2022
b5d2885
Revert "Update OutputCommitCoordinator.scala"
AngersZhuuuu May 27, 2022
b13dfbe
Revert "Spark OutputCommitCoordinator should keep consistent"
AngersZhuuuu May 27, 2022
ad67d0d
[SPARK-39195][SQL] Spark should use two step update of outputCommitCo…
AngersZhuuuu May 16, 2022
0e366a8
Update
AngersZhuuuu May 17, 2022
11ba4b7
Update
AngersZhuuuu May 27, 2022
cc71ddc
Update OutputCommitCoordinator.scala
AngersZhuuuu May 27, 2022
e7204df
Update OutputCommitCoordinator.scala
AngersZhuuuu May 27, 2022
9426f30
Update
AngersZhuuuu May 27, 2022
60e03f3
Update OutputCommitCoordinator.scala
AngersZhuuuu May 27, 2022
f77c9c3
follow comment
AngersZhuuuu May 30, 2022
c1faddd
Update SQLQuerySuite.scala
AngersZhuuuu Jun 8, 2022
f7b92e1
Update SQLQuerySuite.scala
AngersZhuuuu Jun 8, 2022
ec5ef4b
re-trigger
AngersZhuuuu Jun 8, 2022
58ea1a9
Update
AngersZhuuuu Jun 8, 2022
58b6c0f
Update OutputCommitCoordinator.scala
AngersZhuuuu Jun 8, 2022
beed831
Update
AngersZhuuuu Jun 8, 2022
87fadf0
Update SQLQuerySuite.scala
AngersZhuuuu Jun 8, 2022
46aa5e0
Update DAGScheduler.scala
AngersZhuuuu Jun 8, 2022
3924127
Update core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoo…
AngersZhuuuu Jun 8, 2022
9c24d8e
Follow comment
AngersZhuuuu Jun 9, 2022
60beea1
Update SQLQuerySuite.scala
AngersZhuuuu Jun 9, 2022
c0bbcb8
Follow comment
AngersZhuuuu Jun 9, 2022
35b613a
Update SparkEnv.scala
AngersZhuuuu Jun 9, 2022
7bc078f
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
b52a617
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
ce5ac35
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
7ca7ca3
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 9, 2022
79815ab
Update OutputCommitCoordinatorIntegrationSuite.scala
AngersZhuuuu Jun 9, 2022
c6c73e1
trigegr
AngersZhuuuu Jun 10, 2022
77f124c
update
AngersZhuuuu Jun 11, 2022
56543db
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 11, 2022
b524a87
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 14, 2022
4a1a092
Update OutputCommitCoordinatorIntegrationSuite.scala
AngersZhuuuu Jun 14, 2022
cd404d3
Merge branch 'master' into SPARK-39195
AngersZhuuuu Jun 14, 2022
a6be796
Update OutputCommitCoordinatorIntegrationSuite.scala
AngersZhuuuu Jun 15, 2022
d23bcbc
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 15, 2022
c29c9fb
Revert "Update OutputCommitCoordinatorSuite.scala"
AngersZhuuuu Jun 15, 2022
11d3ef2
Update OutputCommitCoordinatorSuite.scala
AngersZhuuuu Jun 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,12 @@ class SparkContext(config: SparkConf) extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
SparkEnv.createDriverEnv(
conf,
isLocal,
listenerBus,
SparkContext.numDriverCores(master, conf),
this)
}

private[spark] def env: SparkEnv = _env
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ object SparkEnv extends Logging {
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
sparkContext: SparkContext,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
Expand All @@ -191,6 +192,7 @@ object SparkEnv extends Logging {
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
Option(sparkContext),
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
Expand Down Expand Up @@ -235,6 +237,7 @@ object SparkEnv extends Logging {
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
// scalastyle:off argcount
private def create(
conf: SparkConf,
executorId: String,
Expand All @@ -245,7 +248,9 @@ object SparkEnv extends Logging {
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
sc: Option[SparkContext] = None,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// scalastyle:on argcount

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

Expand Down Expand Up @@ -391,7 +396,12 @@ object SparkEnv extends Logging {
}

val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
if (isDriver) {
new OutputCommitCoordinator(conf, isDriver, sc)
} else {
new OutputCommitCoordinator(conf, isDriver)
}

}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,13 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
}

private[scheduler] def handleStageFailed(
stageId: Int,
reason: String,
exception: Option[Throwable]): Unit = {
stageIdToStage.get(stageId).foreach { abortStage(_, reason, exception) }
}

private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
Expand Down Expand Up @@ -2608,6 +2615,13 @@ private[spark] class DAGScheduler(
runningStages -= stage
}

/**
* Called by the OutputCommitCoordinator to cancel stage due to data duplication may happen.
*/
private[scheduler] def stageFailed(stageId: Int, reason: String): Unit = {
eventProcessLoop.post(StageFailed(stageId, reason, None))
}

/**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
Expand Down Expand Up @@ -2876,6 +2890,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)

case StageFailed(stageId, reason, exception) =>
dagScheduler.handleStageFailed(stageId, reason, exception)

case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR
private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
extends DAGSchedulerEvent

private[scheduler]
case class StageFailed(stageId: Int, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent

private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ private case class AskPermissionToCommitOutput(
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
private[spark] class OutputCommitCoordinator(
conf: SparkConf,
isDriver: Boolean,
sc: Option[SparkContext] = None) extends Logging {

// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
Expand Down Expand Up @@ -155,9 +158,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
s"but task commit success, data duplication may happen."))
}
Comment on lines +161 to 164
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think this is not very clear or correct in the reason string. stageState.authorizedCommitters records a commit is allowed but it is not actually successful. So as you said the driver never knows if the task commit is successful or not. Maybe we should update this to reduce confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. @AngersZhuuuu can you refine it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #38980 seems we didn't need this patch anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, shall we create a PR to revert it then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, shall we create a PR to revert it then?

Double checked with @boneanxs , we should revert this, let me do this.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Seconds, Span}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext}

/**
* Integration tests for the OutputCommitCoordinator.
Expand All @@ -45,13 +44,14 @@ class OutputCommitCoordinatorIntegrationSuite
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("exception thrown in OutputCommitter.commitTask()") {
test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
failAfter(Span(60, Seconds)) {
val e = intercept[SparkException] {
withTempDir { tempDir =>
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
}
}
}.getCause.getMessage
assert(e.endsWith("failed; but task commit success, data duplication may happen."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
outputCommitCoordinator =
spy(new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else
Expand Down Expand Up @@ -187,12 +188,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
// A new task should now be allowed to become the authorized committer
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 2))
// There can only be one authorized committer
// A new task should not be allowed to become stage failed because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 3))
nonAuthorizedCommitter + 2))
}

test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
Expand Down Expand Up @@ -226,7 +224,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))

// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
// then fail the 1st attempt and make sure the 4th one can commit again.
// then fail the 1st attempt and since stage failed because of potential data duplication,
// make sure fail the 4th attempt.
stage += 1
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
Expand All @@ -235,7 +234,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
// A new task should not be allowed to become the authorized committer since stage failed
// because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
}

test("SPARK-24589: Make sure stage state is cleaned up") {
Expand Down