Skip to content

Commit 0300838

Browse files
committed
[CELEBORN-1697] Improve ThreadStackTrace for thread dump
1 parent 7dcd259 commit 0300838

File tree

7 files changed

+389
-30
lines changed

7 files changed

+389
-30
lines changed

common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,44 @@ case class StackTrace(elems: Seq[String]) {
424424
* Note: code was initially copied from Apache Spark(v3.5.1).
425425
*/
426426
case class ThreadStackTrace(
427-
val threadId: Long,
428-
val threadName: String,
429-
val threadState: Thread.State,
430-
val stackTrace: StackTrace,
431-
val blockedByThreadId: Option[Long],
432-
val blockedByLock: String,
433-
val holdingLocks: Seq[String])
427+
threadId: Long,
428+
threadName: String,
429+
threadState: Thread.State,
430+
stackTrace: StackTrace,
431+
blockedByThreadId: Option[Long],
432+
blockedByLock: String,
433+
holdingLocks: Seq[String],
434+
synchronizers: Seq[String],
435+
monitors: Seq[String],
436+
lockName: Option[String],
437+
lockOwnerName: Option[String],
438+
suspended: Boolean,
439+
inNative: Boolean) {
440+
441+
/**
442+
* Returns a string representation of this thread stack trace w.r.t java.lang.management.ThreadInfo(JDK 8)'s toString.
443+
*
444+
* TODO(SPARK-44896): Also considering adding information os_prio, cpu, elapsed, tid, nid, etc., from the jstack tool
445+
*/
446+
override def toString: String = {
447+
val sb = new StringBuilder(
448+
s""""$threadName" Id=$threadId $threadState""")
449+
lockName.foreach(lock => sb.append(s" on $lock"))
450+
lockOwnerName.foreach {
451+
owner => sb.append(s"""owned by "$owner"""")
452+
}
453+
blockedByThreadId.foreach(id => s" Id=$id")
454+
if (suspended) sb.append(" (suspended)")
455+
if (inNative) sb.append(" (in native)")
456+
sb.append('\n')
457+
458+
sb.append(stackTrace.elems.map(e => s"\tat $e").mkString)
459+
460+
if (synchronizers.nonEmpty) {
461+
sb.append(s"\n\tNumber of locked synchronizers = ${synchronizers.length}\n")
462+
synchronizers.foreach(sync => sb.append(s"\t- $sync\n"))
463+
}
464+
sb.append('\n')
465+
sb.toString
466+
}
467+
}

common/src/main/scala/org/apache/celeborn/common/util/Utils.scala

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -766,29 +766,39 @@ object Utils extends Logging {
766766
* Note: code was initially copied from Apache Spark(v3.5.1).
767767
*/
768768
private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo): ThreadStackTrace = {
769-
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame -> m).toMap
770-
val stackTrace = StackTrace(threadInfo.getStackTrace.map { frame =>
771-
monitors.get(frame) match {
772-
case Some(monitor) =>
773-
monitor.getLockedStackFrame.toString + s" => holding ${monitor.lockString}"
774-
case None =>
775-
frame.toString
776-
}
769+
val threadState = threadInfo.getThreadState
770+
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackDepth -> m.toString).toMap
771+
val stackTrace = StackTrace(threadInfo.getStackTrace.zipWithIndex.map { case (frame, idx) =>
772+
val locked =
773+
if (idx == 0 && threadInfo.getLockInfo != null) {
774+
threadState match {
775+
case Thread.State.BLOCKED =>
776+
s"\t- blocked on ${threadInfo.getLockInfo}\n"
777+
case Thread.State.WAITING | Thread.State.TIMED_WAITING =>
778+
s"\t- waiting on ${threadInfo.getLockInfo}\n"
779+
case _ => ""
780+
}
781+
} else ""
782+
val locking = monitors.get(idx).map(mi => s"\t- locked $mi\n").getOrElse("")
783+
s"${frame.toString}\n$locked$locking"
777784
})
778785

779-
// use a set to dedup re-entrant locks that are held at multiple places
780-
val heldLocks =
781-
(threadInfo.getLockedSynchronizers ++ threadInfo.getLockedMonitors).map(_.lockString).toSet
782-
786+
val synchronizers = threadInfo.getLockedSynchronizers.map(_.toString)
787+
val monitorStrs = monitors.values.toSeq
783788
ThreadStackTrace(
784-
threadId = threadInfo.getThreadId,
785-
threadName = threadInfo.getThreadName,
786-
threadState = threadInfo.getThreadState,
787-
stackTrace = stackTrace,
788-
blockedByThreadId =
789-
if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
790-
blockedByLock = Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
791-
holdingLocks = heldLocks.toSeq)
789+
threadInfo.getThreadId,
790+
threadInfo.getThreadName,
791+
threadState,
792+
stackTrace,
793+
if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
794+
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
795+
synchronizers ++ monitorStrs,
796+
synchronizers,
797+
monitorStrs,
798+
Option(threadInfo.getLockName),
799+
Option(threadInfo.getLockOwnerName),
800+
threadInfo.isSuspended,
801+
threadInfo.isInNative)
792802
}
793803

794804
private def readProcessStdout(process: Process): String = {

0 commit comments

Comments
 (0)