Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))

// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
_shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
Expand Down Expand Up @@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("SparkContext already stopped.")
return
}
if (_shutdownHookRef != null) {
Utils.removeShutdownHook(_shutdownHookRef)
}

postApplicationEnd()
_ui.foreach(_.stop())
Expand Down Expand Up @@ -1891,7 +1903,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
private val activeContext: AtomicReference[SparkContext] =
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

/**
Expand Down Expand Up @@ -1944,11 +1956,11 @@ object SparkContext extends Logging {
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* Note: This function cannot be used to create multiple SparkContext instances
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
Expand All @@ -1961,17 +1973,17 @@ object SparkContext extends Logging {
activeContext.get()
}
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
*
* This method allows not passing a SparkConf (useful if just retrieving).
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ private[spark] object Utils extends Logging {

val DEFAULT_SHUTDOWN_PRIORITY = 100

/**
* The shutdown priority of the SparkContext instance. This is lower than the default
* priority, so that by default hooks are run before the context is shut down.
*/
val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

Expand Down Expand Up @@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging {
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
}

/**
Expand All @@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging {
* @param hook The code to run during shutdown.
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,8 @@ private[spark] class ApplicationMaster(

val fs = FileSystem.get(yarnConf)

Utils.addShutdownHook { () =>
// If the SparkContext is still registered, shut it down as a best case effort in case
// users do not call sc.stop or do System.exit().
val sc = sparkContextRef.get()
if (sc != null) {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
}
// This shutdown hook should run *after* the SparkContext is shut down.
Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the grounds that the YARN AM already tries to do this, I think this is probably a good change to implement for all modes. I think it's all too common that user programs exit without calling stop() and this does affect things like finding logs.

Calling stop() is idempotent right? so the extra hook shouldn't hurt anything even for well-behaved programs.

You have some spurious white-space changes in this PR. Not really worth fixing but might tell your IDE not to change trailing whitespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling stop() is idempotent right?

Correct.

You have some spurious white-space changes in this PR.

IIRC that's in the style guide (not adding trailing whitespace) and we're really, really bad at enforcing that.

val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts

Expand Down