Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// These type aliases are public because they're used in the types of public fields:

type JobId = Int
type JobGroupId = String
type StageId = Int
type StageAttemptId = Int
type PoolName = String
Expand All @@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]

// Stages:
val pendingStages = new HashMap[StageId, StageInfo]
Expand Down Expand Up @@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
Map(
"jobIdToData" -> jobIdToData.size,
"stageIdToData" -> stageIdToData.size,
"stageIdToStageInfo" -> stageIdToInfo.size
"stageIdToStageInfo" -> stageIdToInfo.size,
"jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
// Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
"jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
)
}

Expand All @@ -140,7 +145,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobs.take(toRemove).foreach { job =>
jobIdToData.remove(job.jobId)
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
// A null jobGroupId is used for jobs that are run without a job group
val jobGroupId = removedJob.jobGroup.orNull
// Remove the job group -> job mapping entry, if it exists
jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
jobsInGroup.remove(job.jobId)
// If this was the last job in this job group, remove the map entry for the job group
if (jobsInGroup.isEmpty) {
jobGroupToJobIds.remove(jobGroupId)
}
}
}
}
jobs.trimStart(toRemove)
}
Expand All @@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// A null jobGroupId is used for jobs that are run without a job group
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean to have jobs in the null job group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null job group contains jobs that do not have any other job group set (e.g. jobs without a job group).

jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui.jobs

import java.util.Properties

import org.scalatest.FunSuite
import org.scalatest.Matchers

Expand Down Expand Up @@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
SparkListenerStageCompleted(stageInfo)
}

private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
private def createJobStartEvent(
jobId: Int,
stageIds: Seq[Int],
jobGroup: Option[String] = None): SparkListenerJobStart = {
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
}
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
val properties: Option[Properties] = jobGroup.map { groupId =>
val props = new Properties()
props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
props
}
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull)
}

private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
Expand Down Expand Up @@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.stageIdToActiveJobIds.size should be (0)
}

test("test clearing of jobGroupToJobIds") {
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
val listener = new JobProgressListener(conf)

// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)
}

test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
Expand Down