Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private[spark] object SparkUI {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
val DEFAULT_RETAINED_JOBS = 1000
val DEFAULT_RETAINED_TASKS = 1000

def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
val retainedTasks = conf.getInt("spark.ui.retainedTasks", SparkUI.DEFAULT_RETAINED_TASKS)

// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
Expand Down Expand Up @@ -137,6 +138,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
)
}

/** If Tasks is too large, remove and garbage collect old tasks */
private def trimTasksIfNecessary(taskData: HashMap[Long, TaskUIData]) = synchronized {
if (taskData.size > retainedTasks) {
val toRemove = math.max(retainedTasks / 10, 1)
val oldIds = taskData.map(_._2.taskInfo.taskId).toList.sorted.take(toRemove)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain your reasoning behind doing this different than the equivalent stages and jobs functions below? This just seems a bit redundant comparatively (doing all this to get oldIds then going through a loop, rather than using trimStart) or I may just be missing some key scala understanding

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also @tgravescs before I make the new pr, what's your opinion on this bit of code? Should I leave this as is or change it to match how jobs and stages are trimmed? I don't see a probe;em with this code, but I don't know why he decided to implement it differently than the precedent.

for (id <- oldIds) {
taskData.remove(id)
}
}
}

/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
Expand Down Expand Up @@ -399,6 +411,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
taskData.taskInfo = info
taskData.metrics = taskMetrics
taskData.errorMessage = errorMessage
trimTasksIfNecessary(stageData.taskData)

for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
val numCompleted = stageData.numCompleteTasks
val totalTasks = stageData.numActiveTasks +
stageData.numCompleteTasks + stageData.numFailedTasks
val totalTasksNumStr = if (totalTasks == tasks.size) {
s"$totalTasks"
} else {
s"$totalTasks, only showing ${tasks.size}"
}

val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal }
Expand Down Expand Up @@ -576,7 +583,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
maybeAccumulableTable ++
<h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable
<h4 id="tasks-section">Tasks ({totalTasksNumStr})</h4> ++
taskTableHTML ++ jsForScrollingDownToTaskTable
UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
}
}
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,14 @@ Apart from these, the following properties are also available, and may be useful
collecting.
</td>
</tr>
<tr>
<td><code>spark.ui.retainedTasks</code></td>
<td>1000</td>
<td>
How many tasks the Spark UI and status APIs remember before garbage
collecting.
</td>
</tr>
<tr>
<td><code>spark.worker.ui.retainedExecutors</code></td>
<td>1000</td>
Expand Down