Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,43 +31,43 @@ import org.apache.spark.storage.BlockManagerId
* and dead executors.
*/
private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
private val _liveJobToStages = new mutable.HashMap[Int, Set[Int]]
private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]]
private val liveJobToStages = new mutable.HashMap[Int, Set[Int]]
private val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
private val stageToRDDs = new mutable.HashMap[Int, Set[Int]]
private val _liveExecutors = new mutable.HashSet[String]

private var totalJobs: Long = 0L
private var totalStages: Long = 0L
private var totalTasks: Long = 0L

def liveJobs: Set[Int] = _liveJobToStages.keySet.toSet
def liveStages: Set[Int] = _stageToRDDs.keySet.toSet
def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet
def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet
def liveExecutors: Set[String] = _liveExecutors.toSet
private[history] def liveJobs: Set[Int] = liveJobToStages.keySet.toSet
private[history] def liveStages: Set[Int] = stageToRDDs.keySet.toSet
private[history] def liveTasks: Set[Long] = stageToTasks.values.flatten.toSet
private[history] def liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet
private[history] def liveExecutors: Set[String] = _liveExecutors.toSet

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
totalJobs += 1
totalStages += jobStart.stageIds.length
_liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet
liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
_liveJobToStages -= jobEnd.jobId
_stageToTasks --= stages
_stageToRDDs --= stages
val stages = liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
liveJobToStages -= jobEnd.jobId
stageToTasks --= stages
stageToRDDs --= stages
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
_stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet)
_stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]())
stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet)
stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]())
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
totalTasks += 1
_stageToTasks.get(taskStart.stageId).foreach { tasks =>
stageToTasks.get(taskStart.stageId).foreach { tasks =>
tasks += taskStart.taskInfo.taskId
}
}
Expand Down Expand Up @@ -140,18 +140,18 @@ private[spark] abstract class JobEventFilter(
* will be considered as "Don't mind".
*/
private[spark] class BasicEventFilter(
_stats: FilterStatistics,
_liveJobs: Set[Int],
_liveStages: Set[Int],
_liveTasks: Set[Long],
_liveRDDs: Set[Int],
stats: FilterStatistics,
liveJobs: Set[Int],
liveStages: Set[Int],
liveTasks: Set[Long],
liveRDDs: Set[Int],
liveExecutors: Set[String])
extends JobEventFilter(
Some(_stats),
_liveJobs,
_liveStages,
_liveTasks,
_liveRDDs) with Logging {
Some(stats),
liveJobs,
liveStages,
liveTasks,
liveRDDs) with Logging {

logDebug(s"live executors : $liveExecutors")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ private[spark] object History {

private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD =
ConfigBuilder("spark.history.fs.eventLog.rolling.compaction.score.threshold")
.doc("The threshold score to determine whether it's good to do the compaction or not. " +
"The compaction score is calculated in analyzing, and being compared to this value. " +
"Compaction will proceed only when the score is higher than the threshold value.")
.internal()
.doubleConf
.createWithDefault(0.7d)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ import org.apache.spark.sql.streaming.StreamingQueryListener
* between finished job and live job without relation of SQL execution.
*/
private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
private val _liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]]
private val _jobToStages = new mutable.HashMap[Int, Set[Int]]
private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]]
private val liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]]
private val jobToStages = new mutable.HashMap[Int, Set[Int]]
private val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
private val stageToRDDs = new mutable.HashMap[Int, Set[Int]]
private val stages = new mutable.HashSet[Int]

def liveSQLExecutions: Set[Long] = _liveExecutionToJobs.keySet.toSet
def liveJobs: Set[Int] = _liveExecutionToJobs.values.flatten.toSet
def liveStages: Set[Int] = _stageToRDDs.keySet.toSet
def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet
def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet
private[history] def liveSQLExecutions: Set[Long] = liveExecutionToJobs.keySet.toSet
private[history] def liveJobs: Set[Int] = liveExecutionToJobs.values.flatten.toSet
private[history] def liveStages: Set[Int] = stageToRDDs.keySet.toSet
private[history] def liveTasks: Set[Long] = stageToTasks.values.flatten.toSet
private[history] def liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
Expand All @@ -57,24 +57,24 @@ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilte
val executionId = executionIdString.toLong
val jobId = jobStart.jobId

val jobsForExecution = _liveExecutionToJobs.getOrElseUpdate(executionId,
val jobsForExecution = liveExecutionToJobs.getOrElseUpdate(executionId,
mutable.HashSet[Int]())
jobsForExecution += jobId

_jobToStages += jobStart.jobId -> jobStart.stageIds.toSet
jobToStages += jobStart.jobId -> jobStart.stageIds.toSet
stages ++= jobStart.stageIds
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageId = stageSubmitted.stageInfo.stageId
if (stages.contains(stageId)) {
_stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet)
_stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]())
stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet)
stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]())
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
_stageToTasks.get(taskStart.stageId).foreach { tasks =>
stageToTasks.get(taskStart.stageId).foreach { tasks =>
tasks += taskStart.taskInfo.taskId
}
}
Expand All @@ -86,16 +86,16 @@ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilte
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
_liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]()
liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]()
}

private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
_liveExecutionToJobs.remove(event.executionId).foreach { jobs =>
val stagesToDrop = _jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten
_jobToStages --= jobs
liveExecutionToJobs.remove(event.executionId).foreach { jobs =>
val stagesToDrop = jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten
jobToStages --= jobs
stages --= stagesToDrop
_stageToTasks --= stagesToDrop
_stageToRDDs --= stagesToDrop
stageToTasks --= stagesToDrop
stageToRDDs --= stagesToDrop
}
}

Expand All @@ -115,11 +115,11 @@ private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilte
*/
private[spark] class SQLLiveEntitiesEventFilter(
liveSQLExecutions: Set[Long],
_liveJobs: Set[Int],
_liveStages: Set[Int],
_liveTasks: Set[Long],
_liveRDDs: Set[Int])
extends JobEventFilter(None, _liveJobs, _liveStages, _liveTasks, _liveRDDs) with Logging {
liveJobs: Set[Int],
liveStages: Set[Int],
liveTasks: Set[Long],
liveRDDs: Set[Int])
extends JobEventFilter(None, liveJobs, liveStages, liveTasks, liveRDDs) with Logging {

logDebug(s"live SQL executions : $liveSQLExecutions")

Expand Down