Skip to content

Commit c2632ed

Browse files
myroslavlisniakMarcelo Vanzin
authored andcommitted
[SPARK-23670][SQL] Fix memory leak on SparkPlanGraphWrapper
Clean up SparkPlanGraphWrapper objects from InMemoryStore together with cleaning up SQLExecutionUIData existing unit test was extended to check also SparkPlanGraphWrapper object count vanzin Author: myroslavlisniak <acnipin@gmail.com> Closes #20813 from myroslavlisniak/master.
1 parent 3675af7 commit c2632ed

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,10 @@ class SQLAppStatusListener(
334334

335335
val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
336336
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
337-
toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
337+
toDelete.foreach { e =>
338+
kvstore.delete(e.getClass(), e.executionId)
339+
kvstore.delete(classOf[SparkPlanGraphWrapper], e.executionId)
340+
}
338341
}
339342

340343
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ class SQLAppStatusStore(
5454
store.count(classOf[SQLExecutionUIData])
5555
}
5656

57+
def planGraphCount(): Long = {
58+
store.count(classOf[SparkPlanGraphWrapper])
59+
}
60+
5761
def executionMetrics(executionId: Long): Map[Long, String] = {
5862
def metricsFromStore(): Option[Map[Long, String]] = {
5963
val exec = store.read(classOf[SQLExecutionUIData], executionId)

sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
611611
sc.listenerBus.waitUntilEmpty(10000)
612612
val statusStore = spark.sharedState.statusStore
613613
assert(statusStore.executionsCount() <= 50)
614+
assert(statusStore.planGraphCount() <= 50)
614615
// No live data should be left behind after all executions end.
615616
assert(statusStore.listener.get.noLiveData())
616617
}

0 commit comments

Comments
 (0)