Skip to content

Commit 000eaef

Browse files
committed
Use more efficient method to get stacktrace.
1 parent bca8c39 commit 000eaef

File tree

2 files changed

+45
-23
lines changed

2 files changed

+45
-23
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ private[spark] class Executor(
194194
/** Whether this task has been killed. */
195195
@volatile private var killed = false
196196

197+
@volatile private var threadId: Long = -1
198+
199+
def getThreadId: Long = threadId
200+
197201
/** Whether this task has been finished. */
198202
@GuardedBy("TaskRunner.this")
199203
private var finished = false
@@ -234,7 +238,8 @@ private[spark] class Executor(
234238
}
235239

236240
override def run(): Unit = {
237-
Thread.currentThread().setName(threadName)
241+
threadId = Thread.currentThread.getId
242+
Thread.currentThread.setName(threadName)
238243
val threadMXBean = ManagementFactory.getThreadMXBean
239244
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
240245
val deserializeStartTime = System.currentTimeMillis()
@@ -461,9 +466,10 @@ private[spark] class Executor(
461466
logWarning(s"Killed task ${taskRunner.taskId} is still running after $elapsedTimeMs ms")
462467
if (takeThreadDump) {
463468
try {
464-
val threads = Utils.getThreadDump()
465-
threads.find(_.threadName == taskRunner.threadName).foreach { thread =>
466-
logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}")
469+
Utils.getThreadDumpForThread(taskRunner.getThreadId).foreach { thread =>
470+
if (thread.threadName == taskRunner.threadName) {
471+
logWarning(s"Thread dump from task ${taskRunner.taskId}:\n${thread.stackTrace}")
472+
}
467473
}
468474
} catch {
469475
case NonFatal(e) =>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21-
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo}
21+
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo}
2222
import java.net._
2323
import java.nio.ByteBuffer
2424
import java.nio.channels.Channels
@@ -2131,28 +2131,44 @@ private[spark] object Utils extends Logging {
21312131
// We need to filter out null values here because dumpAllThreads() may return null array
21322132
// elements for threads that are dead / don't exist.
21332133
val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
2134-
threadInfos.sortBy(_.getThreadId).map { case threadInfo =>
2135-
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame -> m).toMap
2136-
val stackTrace = threadInfo.getStackTrace.map { frame =>
2137-
monitors.get(frame) match {
2138-
case Some(monitor) =>
2139-
monitor.getLockedStackFrame.toString + s" => holding ${monitor.lockString}"
2140-
case None =>
2141-
frame.toString
2142-
}
2143-
}.mkString("\n")
2144-
2145-
// use a set to dedup re-entrant locks that are held at multiple places
2146-
val heldLocks = (threadInfo.getLockedSynchronizers.map(_.lockString)
2147-
++ threadInfo.getLockedMonitors.map(_.lockString)
2148-
).toSet
2134+
threadInfos.sortBy(_.getThreadId).map(threadInfoToThreadStackTrace)
2135+
}
21492136

2150-
ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName, threadInfo.getThreadState,
2151-
stackTrace, if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
2152-
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""), heldLocks.toSeq)
2137+
def getThreadDumpForThread(threadId: Long): Option[ThreadStackTrace] = {
2138+
if (threadId <= 0) {
2139+
None
2140+
} else {
2141+
val threadInfo = Option(ManagementFactory.getThreadMXBean.getThreadInfo(threadId))
2142+
threadInfo.map(threadInfoToThreadStackTrace)
21532143
}
21542144
}
21552145

2146+
private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo): ThreadStackTrace = {
2147+
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame -> m).toMap
2148+
val stackTrace = threadInfo.getStackTrace.map { frame =>
2149+
monitors.get(frame) match {
2150+
case Some(monitor) =>
2151+
monitor.getLockedStackFrame.toString + s" => holding ${monitor.lockString}"
2152+
case None =>
2153+
frame.toString
2154+
}
2155+
}.mkString("\n")
2156+
2157+
// use a set to dedup re-entrant locks that are held at multiple places
2158+
val heldLocks =
2159+
(threadInfo.getLockedSynchronizers ++ threadInfo.getLockedMonitors).map(_.lockString).toSet
2160+
2161+
ThreadStackTrace(
2162+
threadId = threadInfo.getThreadId,
2163+
threadName = threadInfo.getThreadName,
2164+
threadState = threadInfo.getThreadState,
2165+
stackTrace = stackTrace,
2166+
blockedByThreadId =
2167+
if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
2168+
blockedByLock = Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
2169+
holdingLocks = heldLocks.toSeq)
2170+
}
2171+
21562172
/**
21572173
* Convert all spark properties set in the given SparkConf to a sequence of java options.
21582174
*/

0 commit comments

Comments
 (0)