diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 39155ff2649ec..5fc6ad1a96036 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -141,6 +141,7 @@ private[spark] object SparkUI { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 val DEFAULT_RETAINED_JOBS = 1000 + val DEFAULT_RETAINED_TASKS = 1000 def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 945830c8bf242..cb5fc39f28996 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -93,6 +93,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) + val retainedTasks = conf.getInt("spark.ui.retainedTasks", SparkUI.DEFAULT_RETAINED_TASKS) // We can test for memory leaks by ensuring that collections that track non-active jobs and // stages do not grow without bound and that collections for active jobs/stages eventually become @@ -137,6 +138,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ) } + /** If Tasks is too large, remove and garbage collect old tasks */ + private def trimTasksIfNecessary(taskData: HashMap[Long, TaskUIData]) = synchronized { + if (taskData.size > retainedTasks) { + val toRemove = math.max(retainedTasks / 10, 1) + val oldIds = taskData.map(_._2.taskInfo.taskId).toList.sorted.take(toRemove) + for (id <- oldIds) { + taskData.remove(id) + } + } + } + /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { @@ -399,6 +411,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskData.taskInfo = info taskData.metrics = taskMetrics taskData.errorMessage = errorMessage + trimTasksIfNecessary(stageData.taskData) for ( activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId); diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 5d1928ac6b2ca..007f82973fad8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -131,7 +131,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val stageData = stageDataOption.get val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) - val numCompleted = tasks.count(_.taskInfo.finished) + val numCompleted = stageData.numCompleteTasks + val totalTasks = stageData.numActiveTasks + + stageData.numCompleteTasks + stageData.numFailedTasks + val totalTasksNumStr = if (totalTasks == tasks.size) { + s"$totalTasks" + } else { + s"$totalTasks, only showing ${tasks.size}" + } val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal } @@ -576,7 +583,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
spark.ui.retainedTasksspark.worker.ui.retainedExecutors