Skip to content

Commit a0b9860

Browse files
committed
[SPARK-1901] worker should make sure executor has exited before updating executor's info
1 parent 75af8bd commit a0b9860

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,18 @@ private[spark] class ExecutorRunner(
6161
// Shutdown hook that kills actors on shutdown.
6262
shutdownHook = new Thread() {
6363
override def run() {
64-
killProcess()
64+
killProcess(None)
6565
}
6666
}
6767
Runtime.getRuntime.addShutdownHook(shutdownHook)
6868
}
6969

70-
private def killProcess() {
70+
private def killProcess(message: Option[String]) {
7171
if (process != null) {
7272
logInfo("Killing process!")
7373
process.destroy()
74-
process.waitFor()
74+
val exitCode = process.waitFor()
75+
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
7576
}
7677
}
7778

@@ -82,7 +83,6 @@ private[spark] class ExecutorRunner(
8283
workerThread.interrupt()
8384
workerThread = null
8485
state = ExecutorState.KILLED
85-
worker ! ExecutorStateChanged(appId, execId, state, None, None)
8686
Runtime.getRuntime.removeShutdownHook(shutdownHook)
8787
}
8888
}
@@ -148,14 +148,13 @@ private[spark] class ExecutorRunner(
148148
} catch {
149149
case interrupted: InterruptedException => {
150150
logInfo("Runner thread for executor " + fullId + " interrupted")
151-
killProcess()
151+
state = ExecutorState.KILLED
152+
killProcess(None)
152153
}
153154
case e: Exception => {
154155
logError("Error running executor", e)
155-
killProcess()
156156
state = ExecutorState.FAILED
157-
val message = e.getClass + ": " + e.getMessage
158-
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
157+
killProcess(Some(e.getClass + ":" + e.getMessage))
159158
}
160159
}
161160
}

0 commit comments

Comments
 (0)