From f131cd871d9b4202cad65d5b6d0e967d51713a01 Mon Sep 17 00:00:00 2001 From: Zheng Tan Date: Tue, 3 May 2016 09:10:56 +0800 Subject: [PATCH 1/2] Fix SPARK-15059: remove fine-grained lock in ChildFirstURLClassLoader --- .../spark/util/MutableURLClassLoader.scala | 31 +++---------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala index 0a3180da8798..034826c57ef1 100644 --- a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala @@ -19,7 +19,6 @@ package org.apache.spark.util import java.net.{URL, URLClassLoader} import java.util.Enumeration -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -48,32 +47,12 @@ private[spark] class ChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoa private val parentClassLoader = new ParentClassLoader(parent) - /** - * Used to implement fine-grained class loading locks similar to what is done by Java 7. This - * prevents deadlock issues when using non-hierarchical class loaders. - * - * Note that due to some issues with implementing class loaders in - * Scala, Java 7's `ClassLoader.registerAsParallelCapable` method is not called. - */ - private val locks = new ConcurrentHashMap[String, Object]() - override def loadClass(name: String, resolve: Boolean): Class[_] = { - var lock = locks.get(name) - if (lock == null) { - val newLock = new Object() - lock = locks.putIfAbsent(name, newLock) - if (lock == null) { - lock = newLock - } - } - - lock.synchronized { - try { - super.loadClass(name, resolve) - } catch { - case e: ClassNotFoundException => - parentClassLoader.loadClass(name, resolve) - } + try { + super.loadClass(name, resolve) + } catch { + case e: ClassNotFoundException => + parentClassLoader.loadClass(name, resolve) } } From 593b294ce36c5e99e6a4c719e4e4f4d64d6b976b Mon Sep 17 00:00:00 2001 From: Zheng Tan Date: Sun, 8 May 2016 20:48:57 +0800 Subject: [PATCH 2/2] Trim tasks in a stage to avoid history server OOM --- .../main/scala/org/apache/spark/ui/SparkUI.scala | 1 + .../apache/spark/ui/jobs/JobProgressListener.scala | 13 +++++++++++++ .../scala/org/apache/spark/ui/jobs/StagePage.scala | 12 ++++++++++-- docs/configuration.md | 8 ++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 39155ff2649e..5fc6ad1a9603 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -141,6 +141,7 @@ private[spark] object SparkUI { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 val DEFAULT_RETAINED_JOBS = 1000 + val DEFAULT_RETAINED_TASKS = 1000 def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 945830c8bf24..cb5fc39f2899 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -93,6 +93,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) + val retainedTasks = conf.getInt("spark.ui.retainedTasks", SparkUI.DEFAULT_RETAINED_TASKS) // We can test for memory leaks by ensuring that collections that track non-active jobs and // stages do not grow without bound and that collections for active jobs/stages eventually become @@ -137,6 +138,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ) } + /** If Tasks is too large, remove and garbage collect old tasks */ + private def trimTasksIfNecessary(taskData: HashMap[Long, TaskUIData]) = synchronized { + if (taskData.size > retainedTasks) { + val toRemove = math.max(retainedTasks / 10, 1) + val oldIds = taskData.map(_._2.taskInfo.taskId).toList.sorted.take(toRemove) + for (id <- oldIds) { + taskData.remove(id) + } + } + } + /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { @@ -399,6 +411,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskData.taskInfo = info taskData.metrics = taskMetrics taskData.errorMessage = errorMessage + trimTasksIfNecessary(stageData.taskData) for ( activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId); diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 5d1928ac6b2c..007f82973fad 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -131,7 +131,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val stageData = stageDataOption.get val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) - val numCompleted = tasks.count(_.taskInfo.finished) + val numCompleted = stageData.numCompleteTasks + val totalTasks = stageData.numActiveTasks + + stageData.numCompleteTasks + stageData.numFailedTasks + val totalTasksNumStr = if (totalTasks == tasks.size) { + s"$totalTasks" + } else { + s"$totalTasks, only showing ${tasks.size}" + } val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal } @@ -576,7 +583,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++

Aggregated Metrics by Executor

++ executorTable.toNodeSeq ++ maybeAccumulableTable ++ -

Tasks

++ taskTableHTML ++ jsForScrollingDownToTaskTable +

Tasks ({totalTasksNumStr})

++ + taskTableHTML ++ jsForScrollingDownToTaskTable UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true) } } diff --git a/docs/configuration.md b/docs/configuration.md index 9191570d0762..f4beac2c1be5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -550,6 +550,14 @@ Apart from these, the following properties are also available, and may be useful collecting. + + spark.ui.retainedTasks + 1000 + + How many tasks the Spark UI and status APIs remember before garbage + collecting. + + spark.worker.ui.retainedExecutors 1000