Skip to content

Commit e751bc6

Browse files
HeartSaVioRMarcelo Vanzin
authored andcommitted
[SPARK-30479][SQL] Apply compaction of event log to SQL events
### What changes were proposed in this pull request? This patch addresses adding event filter to handle SQL related events. This patch is next task of SPARK-29779 (#27085), please refer the description of PR #27085 to see overall rationalization of this patch. Below functionalities will be addressed in later parts: * integrate compaction into FsHistoryProvider * documentation about new configuration ### Why are the changes needed? One of major goal of SPARK-28594 is to prevent the event logs to become too huge, and SPARK-29779 achieves the goal. We've got another approach in prior, but the old approach required models in both KVStore and live entities to guarantee compatibility, while they're not designed to do so. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UTs. Closes #27164 from HeartSaVioR/SPARK-30479. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 990a2be commit e751bc6

File tree

5 files changed

+437
-0
lines changed

5 files changed

+437
-0
lines changed

core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,53 @@ object ListenerEventsTestHelper {
137137
SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates)
138138
}
139139

140+
case class JobInfo(
141+
stageIds: Seq[Int],
142+
stageToTaskIds: Map[Int, Seq[Long]],
143+
stageToRddIds: Map[Int, Seq[Int]])
144+
145+
def pushJobEventsWithoutJobEnd(
146+
listener: SparkListener,
147+
jobId: Int,
148+
jobProps: Properties,
149+
execIds: Array[String],
150+
time: Long): JobInfo = {
151+
// Start a job with 1 stage / 4 tasks each
152+
val rddsForStage = createRdds(2)
153+
val stage = createStage(rddsForStage, Nil)
154+
155+
listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps))
156+
157+
// Submit stage
158+
stage.submissionTime = Some(time)
159+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps))
160+
161+
// Start tasks from stage
162+
val s1Tasks = createTasks(4, execIds, time)
163+
s1Tasks.foreach { task =>
164+
listener.onTaskStart(SparkListenerTaskStart(stage.stageId,
165+
stage.attemptNumber(), task))
166+
}
167+
168+
// Succeed all tasks in stage.
169+
val s1Metrics = TaskMetrics.empty
170+
s1Metrics.setExecutorCpuTime(2L)
171+
s1Metrics.setExecutorRunTime(4L)
172+
173+
s1Tasks.foreach { task =>
174+
task.markFinished(TaskState.FINISHED, time)
175+
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber,
176+
"taskType", Success, task, new ExecutorMetrics, s1Metrics))
177+
}
178+
179+
// End stage.
180+
stage.completionTime = Some(time)
181+
listener.onStageCompleted(SparkListenerStageCompleted(stage))
182+
183+
JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)),
184+
Map(stage.stageId -> rddsForStage.map(_.id)))
185+
}
186+
140187
private def nextTaskId(): Long = {
141188
taskIdTracker += 1
142189
taskIdTracker
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.execution.history.SQLEventFilterBuilder
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.history
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.deploy.history.{EventFilter, EventFilterBuilder, JobEventFilter}
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.scheduler._
25+
import org.apache.spark.sql.execution.SQLExecution
26+
import org.apache.spark.sql.execution.ui._
27+
import org.apache.spark.sql.streaming.StreamingQueryListener
28+
29+
/**
30+
* This class tracks live SQL executions, and pass the list to the [[SQLLiveEntitiesEventFilter]]
31+
* to help SQLLiveEntitiesEventFilter to accept live SQL executions as well as relevant
32+
* jobs (+ stages/tasks/RDDs).
33+
*
34+
* Note that this class only tracks the jobs which are relevant to SQL executions - cannot classify
35+
* between finished job and live job without relation of SQL execution.
36+
*/
37+
private[spark] class SQLEventFilterBuilder extends SparkListener with EventFilterBuilder {
38+
private val _liveExecutionToJobs = new mutable.HashMap[Long, mutable.Set[Int]]
39+
private val _jobToStages = new mutable.HashMap[Int, Set[Int]]
40+
private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
41+
private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]]
42+
private val stages = new mutable.HashSet[Int]
43+
44+
def liveSQLExecutions: Set[Long] = _liveExecutionToJobs.keySet.toSet
45+
def liveJobs: Set[Int] = _liveExecutionToJobs.values.flatten.toSet
46+
def liveStages: Set[Int] = _stageToRDDs.keySet.toSet
47+
def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet
48+
def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet
49+
50+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
51+
val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
52+
if (executionIdString == null) {
53+
// This is not a job created by SQL
54+
return
55+
}
56+
57+
val executionId = executionIdString.toLong
58+
val jobId = jobStart.jobId
59+
60+
val jobsForExecution = _liveExecutionToJobs.getOrElseUpdate(executionId,
61+
mutable.HashSet[Int]())
62+
jobsForExecution += jobId
63+
64+
_jobToStages += jobStart.jobId -> jobStart.stageIds.toSet
65+
stages ++= jobStart.stageIds
66+
}
67+
68+
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
69+
val stageId = stageSubmitted.stageInfo.stageId
70+
if (stages.contains(stageId)) {
71+
_stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet)
72+
_stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]())
73+
}
74+
}
75+
76+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
77+
_stageToTasks.get(taskStart.stageId).foreach { tasks =>
78+
tasks += taskStart.taskInfo.taskId
79+
}
80+
}
81+
82+
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
83+
case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
84+
case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
85+
case _ => // Ignore
86+
}
87+
88+
private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
89+
_liveExecutionToJobs += event.executionId -> mutable.HashSet[Int]()
90+
}
91+
92+
private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
93+
_liveExecutionToJobs.remove(event.executionId).foreach { jobs =>
94+
val stagesToDrop = _jobToStages.filter(kv => jobs.contains(kv._1)).values.flatten
95+
_jobToStages --= jobs
96+
stages --= stagesToDrop
97+
_stageToTasks --= stagesToDrop
98+
_stageToRDDs --= stagesToDrop
99+
}
100+
}
101+
102+
override def createFilter(): EventFilter = {
103+
new SQLLiveEntitiesEventFilter(liveSQLExecutions, liveJobs, liveStages, liveTasks, liveRDDs)
104+
}
105+
}
106+
107+
/**
108+
* This class accepts events which are related to the live SQL executions based on the given
109+
* information.
110+
*
111+
* Note that acceptFn will not match the event ("Don't mind") instead of returning false on
112+
* job related events, because it cannot determine whether the job is related to the finished
113+
* SQL executions, or job is NOT related to the SQL executions. For this case, it just gives up
114+
* the decision and let other filters decide it.
115+
*/
116+
private[spark] class SQLLiveEntitiesEventFilter(
117+
liveSQLExecutions: Set[Long],
118+
_liveJobs: Set[Int],
119+
_liveStages: Set[Int],
120+
_liveTasks: Set[Long],
121+
_liveRDDs: Set[Int])
122+
extends JobEventFilter(None, _liveJobs, _liveStages, _liveTasks, _liveRDDs) with Logging {
123+
124+
logDebug(s"live SQL executions : $liveSQLExecutions")
125+
126+
private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = {
127+
case e: SparkListenerSQLExecutionStart =>
128+
liveSQLExecutions.contains(e.executionId)
129+
case e: SparkListenerSQLAdaptiveExecutionUpdate =>
130+
liveSQLExecutions.contains(e.executionId)
131+
case e: SparkListenerSQLExecutionEnd =>
132+
liveSQLExecutions.contains(e.executionId)
133+
case e: SparkListenerDriverAccumUpdates =>
134+
liveSQLExecutions.contains(e.executionId)
135+
136+
case e if acceptFnForJobEvents.lift(e).contains(true) =>
137+
// NOTE: if acceptFnForJobEvents(e) returns false, we should leave it to "unmatched"
138+
// because we don't know whether the job has relevant SQL execution which is finished,
139+
// or the job is not related to the SQL execution.
140+
true
141+
142+
// these events are for finished batches so safer to ignore
143+
case _: StreamingQueryListener.QueryProgressEvent => false
144+
}
145+
146+
override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = _acceptFn
147+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.history
19+
20+
import java.util.Properties
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.scheduler._
24+
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
25+
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
26+
import org.apache.spark.status.ListenerEventsTestHelper
27+
28+
class SQLEventFilterBuilderSuite extends SparkFunSuite {
29+
import ListenerEventsTestHelper._
30+
31+
override protected def beforeEach(): Unit = {
32+
ListenerEventsTestHelper.reset()
33+
}
34+
35+
test("track live SQL executions") {
36+
var time = 0L
37+
38+
val listener = new SQLEventFilterBuilder
39+
40+
listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion"))
41+
42+
// Start the application.
43+
time += 1
44+
listener.onApplicationStart(SparkListenerApplicationStart(
45+
"name",
46+
Some("id"),
47+
time,
48+
"user",
49+
Some("attempt"),
50+
None))
51+
52+
// Start a couple of executors.
53+
time += 1
54+
val execIds = Array("1", "2")
55+
execIds.foreach { id =>
56+
listener.onExecutorAdded(createExecutorAddedEvent(id, time))
57+
}
58+
59+
// Start SQL Execution
60+
listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", "details1", "plan",
61+
new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time))
62+
63+
time += 1
64+
65+
// job 1, 2: coupled with SQL execution 1, finished
66+
val jobProp = createJobProps()
67+
val jobPropWithSqlExecution = new Properties(jobProp)
68+
jobPropWithSqlExecution.setProperty(SQLExecution.EXECUTION_ID_KEY, "1")
69+
val jobInfoForJob1 = pushJobEventsWithoutJobEnd(listener, 1, jobPropWithSqlExecution,
70+
execIds, time)
71+
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
72+
73+
val jobInfoForJob2 = pushJobEventsWithoutJobEnd(listener, 2, jobPropWithSqlExecution,
74+
execIds, time)
75+
listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
76+
77+
// job 3: not coupled with SQL execution 1, finished
78+
pushJobEventsWithoutJobEnd(listener, 3, jobProp, execIds, time)
79+
listener.onJobEnd(SparkListenerJobEnd(3, time, JobSucceeded))
80+
81+
// job 4: not coupled with SQL execution 1, not finished
82+
pushJobEventsWithoutJobEnd(listener, 4, jobProp, execIds, time)
83+
listener.onJobEnd(SparkListenerJobEnd(4, time, JobSucceeded))
84+
85+
assert(listener.liveSQLExecutions === Set(1))
86+
87+
// only SQL executions related jobs are tracked
88+
assert(listener.liveJobs === Set(1, 2))
89+
assert(listener.liveStages ===
90+
(jobInfoForJob1.stageIds ++ jobInfoForJob2.stageIds).toSet)
91+
assert(listener.liveTasks ===
92+
(jobInfoForJob1.stageToTaskIds.values.flatten ++
93+
jobInfoForJob2.stageToTaskIds.values.flatten).toSet)
94+
assert(listener.liveRDDs ===
95+
(jobInfoForJob1.stageToRddIds.values.flatten ++
96+
jobInfoForJob2.stageToRddIds.values.flatten).toSet)
97+
98+
// End SQL execution
99+
listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, 0))
100+
101+
assert(listener.liveSQLExecutions.isEmpty)
102+
assert(listener.liveJobs.isEmpty)
103+
assert(listener.liveStages.isEmpty)
104+
assert(listener.liveTasks.isEmpty)
105+
assert(listener.liveRDDs.isEmpty)
106+
}
107+
}

0 commit comments

Comments
 (0)