Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6e6f748
[SPARK-4879] Use the Spark driver to authorize Hadoop commits.
mccheah Jan 21, 2015
bc80770
Unit tests for OutputCommitCoordinator
mccheah Jan 22, 2015
c9decc6
Scalastyle fixes
mccheah Jan 22, 2015
6b543ba
Removing redundant accumulator in unit test
mccheah Jan 22, 2015
66a71cd
Removing whitespace modifications
mccheah Jan 22, 2015
1c2b219
Renaming oudated names for test function classes
mccheah Jan 22, 2015
f135a8e
Moving the output commit coordinator from class into method.
mccheah Jan 22, 2015
abc7db4
TaskInfo can't be null in DAGSchedulerSuite
mccheah Jan 22, 2015
83de900
Making the OutputCommitCoordinatorMessage serializable
mccheah Jan 23, 2015
78eb1b5
Better OutputCommitCoordinatorActor stopping; simpler canCommit
mccheah Jan 23, 2015
9c6a4fa
More OutputCommitCoordinator cleanup on stop()
mccheah Jan 23, 2015
8d5a091
Was mistakenly serializing the accumulator in test suite.
mccheah Jan 23, 2015
c334255
Properly handling messages that could be sent after actor shutdown.
mccheah Jan 23, 2015
d431144
Using more concurrency to process OutputCommitCoordinator requests.
mccheah Jan 26, 2015
1df2a91
Throwing exception if SparkHadoopWriter commit denied
mccheah Jan 27, 2015
9fe6495
Fixing scalastyle
mccheah Jan 27, 2015
d63f63f
Fixing compiler error
mccheah Jan 27, 2015
60a47f4
Writing proper unit test for OutputCommitCoordinator and fixing bugs.
mccheah Jan 29, 2015
594e41a
Fixing a scalastyle error
mccheah Jan 29, 2015
0aec91e
Only coordinate when speculation is enabled; add configuration option…
JoshRosen Feb 3, 2015
b344bad
(Temporarily) re-enable “always coordinate” for testing purposes.
JoshRosen Feb 3, 2015
92e6dc9
Bug fix: use task ID instead of StageID to index into authorizedCommi…
JoshRosen Feb 3, 2015
f7d69c5
Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhad…
JoshRosen Feb 3, 2015
c79df98
Some misc. code style + doc changes:
JoshRosen Feb 3, 2015
dd00b7c
Move CommitDeniedException to executors package; remove `@DeveloperAP…
JoshRosen Feb 3, 2015
459310a
Roll back TaskSetManager changes that broke other tests.
JoshRosen Feb 3, 2015
997b41b
Roll back unnecessary DAGSchedulerSingleThreadedProcessLoop refactoring:
JoshRosen Feb 3, 2015
a7c0e29
Create fake TaskInfo using dummy fields instead of Mockito.
JoshRosen Feb 3, 2015
f582574
Some cleanup in OutputCommitCoordinatorSuite
JoshRosen Feb 3, 2015
97da5fe
Use actor only for RPC; call methods directly in DAGScheduler.
JoshRosen Feb 4, 2015
ede7590
Add test to ensure that a job that denies all commits cannot complete…
JoshRosen Feb 4, 2015
3969f5f
Re-enable guarding of commit coordination with spark.speculation sett…
JoshRosen Feb 4, 2015
48d5c1c
Roll back copiesRunning change in TaskSetManager
JoshRosen Feb 5, 2015
ed8b554
Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhad…
JoshRosen Feb 10, 2015
14861ea
splitID -> partitionID in a few places
JoshRosen Feb 10, 2015
e7be65a
Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhad…
JoshRosen Feb 10, 2015
ed783b2
Address Andrew’s feedback.
JoshRosen Feb 11, 2015
658116b
Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhad…
JoshRosen Feb 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}

private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)

// Used to store a URL for each static file/jar together with the file's local timestamp
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
Expand Down Expand Up @@ -67,6 +68,7 @@ class SparkEnv (
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

private[spark] var isStopped = false
Expand All @@ -88,6 +90,7 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
Expand Down Expand Up @@ -169,7 +172,8 @@ object SparkEnv extends Logging {
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
listenerBus: LiveListenerBus,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
Expand All @@ -181,7 +185,8 @@ object SparkEnv extends Logging {
port,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}

Expand Down Expand Up @@ -220,7 +225,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
numUsableCores: Int = 0): SparkEnv = {
numUsableCores: Int = 0,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

// Listener bus is only used on the driver
if (isDriver) {
Expand Down Expand Up @@ -368,6 +374,13 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}

val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see how this could be confusing: the OutputCommitCoordinator class serves as both a server and a client interface, similar to MapOutputStatusTracker and a couple of other classes. I'll see about splitting the client functionality into a separate OutputCommitCoordinatorClient class that's initialized everywhere and leave the OutputCommitCoordinator as a driver-only component.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, per offline discussion with @andrewor14, I'm going to leave this as-is for now.

val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
new OutputCommitCoordinatorActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)

val envInstance = new SparkEnv(
executorId,
actorSystem,
Expand All @@ -384,6 +397,7 @@ object SparkEnv extends Logging {
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
outputCommitCoordinator,
conf)

// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
Expand Down
43 changes: 38 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD

Expand Down Expand Up @@ -105,24 +106,56 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {

// Called after we have decided to commit
def performCommit(): Unit = {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
logInfo (s"$taID: Committed")
} catch {
case e: IOException => {
case e: IOException =>
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}

// First, check whether the task's output has already been committed by some other attempt
if (cmtr.needsTaskCommit(taCtxt)) {
// The task output needs to be committed, but we don't know whether some other task attempt
// might be racing to commit the same output partition. Therefore, coordinate with the driver
// in order to determine whether this attempt can commit (see SPARK-4879).
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can anyone think of cases where this assumption would be violated? Can this ever be violated due to, say, transitive network failures?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that depends on the semantics of cmtr.needsTaskCommit, but I'm not familiar with that API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's actually more of a Spark scheduler semantics issue: I'm wondering whether it's possible to have multiple concurrently running task attempts for the same task even when speculation is disabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think there is a possible to have multiple concurrently running task attempts for the same task when there are some fetchFailed resultTasks. example: after one minute, one failed resultTask reports fetchFailed and DAGScheduler makes stage failedStage. then DAGScheduler will resubmit all uncompleted tasks of this result stage,but now some of uncompleted tasks have been submitted before. so at this time, it has multiple concurrently running task attempts for the same task. @JoshRosen @vanzin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianhuiwang but by the time there's a fetch failed exception isn't the first task already finished? So at any given time there's still only 1 running task attempt, right?

val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
}
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
if (canCommit) {
performCommit()
} else {
val msg = s"$taID: Not committed because the driver did not authorize commit"
logInfo(msg)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
cmtr.abortTask(taCtxt)
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
performCommit()
}
} else {
logInfo ("No need to commit output of task: " + taID.value)
// Some other attempt committed the output, so we do nothing and signal success
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
}
}

def commitJob() {
// always ? Or if cmtr.needsTaskCommit ?
val cmtr = getOutputCommitter()
cmtr.commitJob(getJobContext())
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
}

/**
* :: DeveloperApi ::
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptID: Int)
extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
}

/**
* :: DeveloperApi ::
* The task failed because the executor that it was running on was lost. This may happen because
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.{TaskCommitDenied, TaskEndReason}

/**
* Exception thrown when a task attempts to commit output to HDFS but is denied by the driver.
*/
class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptID: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID)

}

5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}

case cDE: CommitDeniedException => {
val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}

case t: Throwable => {
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.{CallSite, EventLoop, SystemClock, Clock, Utils}
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand All @@ -63,7 +63,7 @@ class DAGScheduler(
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
clock: org.apache.spark.util.Clock = SystemClock)
extends Logging {

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
Expand Down Expand Up @@ -126,6 +126,8 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val outputCommitCoordinator = env.outputCommitCoordinator

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -808,6 +810,7 @@ class DAGScheduler(
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
outputCommitCoordinator.stageStart(stage.id)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
Expand Down Expand Up @@ -865,6 +868,7 @@ class DAGScheduler(
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
Expand Down Expand Up @@ -909,6 +913,9 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
Expand All @@ -921,6 +928,7 @@ class DAGScheduler(
// Skip all the actions if the stage has been cancelled.
return
}

val stage = stageIdToStage(task.stageId)

def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
Expand Down Expand Up @@ -1073,6 +1081,9 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
}

case commitDenied: TaskCommitDenied =>
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits

case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

Expand Down
Loading