diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala index 106da1675f71..b18bf2665d6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -31,43 +31,43 @@ import org.apache.spark.storage.BlockManagerId * and dead executors. */ private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder { - private val _liveJobToStages = new mutable.HashMap[Int, Set[Int]] - private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] - private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]] + private val liveJobToStages = new mutable.HashMap[Int, Set[Int]] + private val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val stageToRDDs = new mutable.HashMap[Int, Set[Int]] private val _liveExecutors = new mutable.HashSet[String] private var totalJobs: Long = 0L private var totalStages: Long = 0L private var totalTasks: Long = 0L - def liveJobs: Set[Int] = _liveJobToStages.keySet.toSet - def liveStages: Set[Int] = _stageToRDDs.keySet.toSet - def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet - def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet - def liveExecutors: Set[String] = _liveExecutors.toSet + private[history] def liveJobs: Set[Int] = liveJobToStages.keySet.toSet + private[history] def liveStages: Set[Int] = stageToRDDs.keySet.toSet + private[history] def liveTasks: Set[Long] = stageToTasks.values.flatten.toSet + private[history] def liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet + private[history] def liveExecutors: Set[String] = _liveExecutors.toSet override def onJobStart(jobStart: SparkListenerJobStart): Unit = { totalJobs += 1 totalStages += jobStart.stageIds.length - _liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet + liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) - _liveJobToStages -= jobEnd.jobId - _stageToTasks --= stages - _stageToRDDs --= stages + val stages = liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) + liveJobToStages -= jobEnd.jobId + stageToTasks --= stages + stageToRDDs --= stages } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val stageId = stageSubmitted.stageInfo.stageId - _stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) - _stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) + stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) + stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { totalTasks += 1 - _stageToTasks.get(taskStart.stageId).foreach { tasks => + stageToTasks.get(taskStart.stageId).foreach { tasks => tasks += taskStart.taskInfo.taskId } } @@ -140,18 +140,18 @@ private[spark] abstract class JobEventFilter( * will be considered as "Don't mind". */ private[spark] class BasicEventFilter( - _stats: FilterStatistics, - _liveJobs: Set[Int], - _liveStages: Set[Int], - _liveTasks: Set[Long], - _liveRDDs: Set[Int], + stats: FilterStatistics, + liveJobs: Set[Int], + liveStages: Set[Int], + liveTasks: Set[Long], + liveRDDs: Set[Int], liveExecutors: Set[String]) extends JobEventFilter( - Some(_stats), - _liveJobs, - _liveStages, - _liveTasks, - _liveRDDs) with Logging { + Some(stats), + liveJobs, + liveStages, + liveTasks, + liveRDDs) with Logging { logDebug(s"live executors : $liveExecutors") diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 17fb55d9db86..14fb5ff07547 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -96,6 +96,9 @@ private[spark] object History { private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = ConfigBuilder("spark.history.fs.eventLog.rolling.compaction.score.threshold") + .doc("The threshold score to determine whether it's good to do the compaction or not. " + + "The compaction score is calculated in analyzing, and being compared to this value. " + + "Compaction will proceed only when the score is higher than the threshold value.") .internal() .doubleConf .createWithDefault(0.7d) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala index fbd729b9d7c3..e1f42d7abe0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala @@ -35,17 +35,17 @@ import org.apache.spark.sql.streaming.StreamingQueryListener * between finished job and live job without relation of SQL execution. */ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder { - private val _liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]] - private val _jobToStages = new mutable.HashMap[Int, Set[Int]] - private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] - private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]] + private val liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]] + private val jobToStages = new mutable.HashMap[Int, Set[Int]] + private val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val stageToRDDs = new mutable.HashMap[Int, Set[Int]] private val stages = new mutable.HashSet[Int] - def liveSQLExecutions: Set[Long] = _liveExecutionToJobs.keySet.toSet - def liveJobs: Set[Int] = _liveExecutionToJobs.values.flatten.toSet - def liveStages: Set[Int] = _stageToRDDs.keySet.toSet - def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet - def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet + private[history] def liveSQLExecutions: Set[Long] = liveExecutionToJobs.keySet.toSet + private[history] def liveJobs: Set[Int] = liveExecutionToJobs.values.flatten.toSet + private[history] def liveStages: Set[Int] = stageToRDDs.keySet.toSet + private[history] def liveTasks: Set[Long] = stageToTasks.values.flatten.toSet + private[history] def liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet override def onJobStart(jobStart: SparkListenerJobStart): Unit = { val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) @@ -57,24 +57,24 @@ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilte val executionId = executionIdString.toLong val jobId = jobStart.jobId - val jobsForExecution = _liveExecutionToJobs.getOrElseUpdate(executionId, + val jobsForExecution = liveExecutionToJobs.getOrElseUpdate(executionId, mutable.HashSet[Int]()) jobsForExecution += jobId - _jobToStages += jobStart.jobId -> jobStart.stageIds.toSet + jobToStages += jobStart.jobId -> jobStart.stageIds.toSet stages ++= jobStart.stageIds } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val stageId = stageSubmitted.stageInfo.stageId if (stages.contains(stageId)) { - _stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) - _stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) + stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) + stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) } } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - _stageToTasks.get(taskStart.stageId).foreach { tasks => + stageToTasks.get(taskStart.stageId).foreach { tasks => tasks += taskStart.taskInfo.taskId } } @@ -86,16 +86,16 @@ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilte } private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { - _liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]() + liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]() } private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { - _liveExecutionToJobs.remove(event.executionId).foreach { jobs => - val stagesToDrop = _jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten - _jobToStages --= jobs + liveExecutionToJobs.remove(event.executionId).foreach { jobs => + val stagesToDrop = jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten + jobToStages --= jobs stages --= stagesToDrop - _stageToTasks --= stagesToDrop - _stageToRDDs --= stagesToDrop + stageToTasks --= stagesToDrop + stageToRDDs --= stagesToDrop } } @@ -115,11 +115,11 @@ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilte */ private[spark] class SQLLiveEntitiesEventFilter( liveSQLExecutions: Set[Long], - _liveJobs: Set[Int], - _liveStages: Set[Int], - _liveTasks: Set[Long], - _liveRDDs: Set[Int]) - extends JobEventFilter(None, _liveJobs, _liveStages, _liveTasks, _liveRDDs) with Logging { + liveJobs: Set[Int], + liveStages: Set[Int], + liveTasks: Set[Long], + liveRDDs: Set[Int]) + extends JobEventFilter(None, liveJobs, liveStages, liveTasks, liveRDDs) with Logging { logDebug(s"live SQL executions : $liveSQLExecutions")