Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ package object config {
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.ui.jobs

import java.util.concurrent.TimeoutException

import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap, ListBuffer}

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
Expand Down Expand Up @@ -93,6 +94,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
val retainedTasks = conf.get(UI_RETAINED_TASKS)

// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
Expand Down Expand Up @@ -400,6 +402,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage

// If Tasks is too large, remove and garbage collect old tasks
if (stageData.taskData.size > retainedTasks) {
stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
}

for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
jobId <- activeJobsDependentOnStage;
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
val numCompleted = stageData.numCompleteTasks
val totalTasks = stageData.numActiveTasks +
stageData.numCompleteTasks + stageData.numFailedTasks
val totalTasksNumStr = if (totalTasks == tasks.size) {
s"$totalTasks"
} else {
s"$totalTasks, showing ${tasks.size}"
}

val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal }
Expand Down Expand Up @@ -576,7 +583,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
maybeAccumulableTable ++
<h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable
<h4 id="tasks-section">Tasks ({totalTasksNumStr})</h4> ++
taskTableHTML ++ jsForScrollingDownToTaskTable
UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.ui.jobs

import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, LinkedHashMap}

import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
Expand Down Expand Up @@ -94,7 +94,7 @@ private[spark] object UIData {
var description: Option[String] = None

var accumulables = new HashMap[Long, AccumulableInfo]
var taskData = new HashMap[Long, TaskUIData]
var taskData = new LinkedHashMap[Long, TaskUIData]
var executorSummary = new HashMap[String, ExecutorSummary]

def hasInput: Boolean = inputBytes > 0
Expand Down
Loading