From 97275a7a472ba782c268f391876529fec8fbf2ab Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Feb 2015 23:29:23 -0800 Subject: [PATCH 1/3] Add jobGroup to jobId index to JobProgressListener --- .../org/apache/spark/SparkStatusTracker.scala | 3 +- .../spark/ui/jobs/JobProgressListener.scala | 17 ++++++++-- .../ui/jobs/JobProgressListenerSuite.scala | 31 +++++++++++++++++-- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index edbdda8a0bcb6..909453f3f71e3 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { */ def getJobIdsForGroup(jobGroup: String): Array[Int] = { jobProgressListener.synchronized { - val jobData = jobProgressListener.jobIdToData.valuesIterator - jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray + jobProgressListener.jobGroupToJobIds(jobGroup).toArray } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 937d95a934b59..8b0e0b6bf9239 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // These type aliases are public because they're used in the types of public fields: type JobId = Int + type JobGroupId = String type StageId = Int type StageAttemptId = Int type PoolName = String @@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val completedJobs = ListBuffer[JobUIData]() val failedJobs = ListBuffer[JobUIData]() val jobIdToData = new HashMap[JobId, JobUIData] + val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]] // Stages: val pendingStages = new HashMap[StageId, StageInfo] @@ -109,7 +111,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { "failedJobs" -> failedJobs.size, "completedStages" -> completedStages.size, "skippedStages" -> skippedStages.size, - "failedStages" -> failedStages.size + "failedStages" -> failedStages.size, + "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum ) } @@ -140,7 +143,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { if (jobs.size > retainedJobs) { val toRemove = math.max(retainedJobs / 10, 1) jobs.take(toRemove).foreach { job => - jobIdToData.remove(job.jobId) + for ( + removedJob <- jobIdToData.remove(job.jobId); + jobGroupId = removedJob.jobGroup.orNull; + jobsInGroup <- jobGroupToJobIds.get(jobGroupId) + ) { + jobsInGroup.remove(job.jobId) + if (jobsInGroup.isEmpty) { + jobGroupToJobIds.remove(jobGroupId) + } + } } jobs.trimStart(toRemove) } @@ -158,6 +170,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId) jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. // This may be an underestimate because the job start event references all of the result diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 730a4b54f5aa1..09484dff30b3c 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.jobs +import java.util.Properties + import org.scalatest.FunSuite import org.scalatest.Matchers @@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc SparkListenerStageCompleted(stageInfo) } - private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = { + private def createJobStartEvent( + jobId: Int, + stageIds: Seq[Int], + jobGroup: Option[String] = None): SparkListenerJobStart = { val stageInfos = stageIds.map { stageId => new StageInfo(stageId, 0, stageId.toString, 0, null, "") } - SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos) + val properties: Option[Properties] = jobGroup.map { groupId => + val props = new Properties() + props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) + props + } + SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull) } private def createJobEndEvent(jobId: Int, failed: Boolean = false) = { @@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.stageIdToActiveJobIds.size should be (0) } + test("test clearing of jobGroupToJobIds") { + val conf = new SparkConf() + conf.set("spark.ui.retainedJobs", 5.toString) + val listener = new JobProgressListener(conf) + + // Run 50 jobs, each with one stage + for (jobId <- 0 to 50) { + listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString))) + listener.onStageSubmitted(createStageStartEvent(0)) + listener.onStageCompleted(createStageEndEvent(0, failed = false)) + listener.onJobEnd(createJobEndEvent(jobId, false)) + } + assertActiveJobsStateIsEmpty(listener) + // + listener.jobGroupToJobIds.size should be (5) + } + test("test LRU eviction of jobs") { val conf = new SparkConf() conf.set("spark.ui.retainedStages", 5.toString) From 2c49614cc4f92dc1a47044be362db51cfe4da77b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Feb 2015 23:31:27 -0800 Subject: [PATCH 2/3] getOrElse --- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 909453f3f71e3..34ee3a48f8e74 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -45,7 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { */ def getJobIdsForGroup(jobGroup: String): Array[Int] = { jobProgressListener.synchronized { - jobProgressListener.jobGroupToJobIds(jobGroup).toArray + jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray } } From e39c5c78c182684827c0525d25e86b10c1723b58 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 25 Mar 2015 14:44:42 -0700 Subject: [PATCH 3/3] Address review feedback --- .../spark/ui/jobs/JobProgressListener.scala | 28 +++++++++++-------- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 7e85b59f9fd9d..625596885faa1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -111,8 +111,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { "failedJobs" -> failedJobs.size, "completedStages" -> completedStages.size, "skippedStages" -> skippedStages.size, - "failedStages" -> failedStages.size, - "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum + "failedStages" -> failedStages.size ) } @@ -122,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { Map( "jobIdToData" -> jobIdToData.size, "stageIdToData" -> stageIdToData.size, - "stageIdToStageInfo" -> stageIdToInfo.size + "stageIdToStageInfo" -> stageIdToInfo.size, + "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum, + // Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values: + "jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size ) } @@ -143,14 +145,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { if (jobs.size > retainedJobs) { val toRemove = math.max(retainedJobs / 10, 1) jobs.take(toRemove).foreach { job => - for ( - removedJob <- jobIdToData.remove(job.jobId); - jobGroupId = removedJob.jobGroup.orNull; - jobsInGroup <- jobGroupToJobIds.get(jobGroupId) - ) { - jobsInGroup.remove(job.jobId) - if (jobsInGroup.isEmpty) { - jobGroupToJobIds.remove(jobGroupId) + // Remove the job's UI data, if it exists + jobIdToData.remove(job.jobId).foreach { removedJob => + // A null jobGroupId is used for jobs that are run without a job group + val jobGroupId = removedJob.jobGroup.orNull + // Remove the job group -> job mapping entry, if it exists + jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup => + jobsInGroup.remove(job.jobId) + // If this was the last job in this job group, remove the map entry for the job group + if (jobsInGroup.isEmpty) { + jobGroupToJobIds.remove(jobGroupId) + } } } } @@ -170,6 +175,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + // A null jobGroupId is used for jobs that are run without a job group jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId) jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 09484dff30b3c..c0c28cb60e21d 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -133,7 +133,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.onJobEnd(createJobEndEvent(jobId, false)) } assertActiveJobsStateIsEmpty(listener) - // + // This collection won't become empty, but it should be bounded by spark.ui.retainedJobs listener.jobGroupToJobIds.size should be (5) }