diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 56fc9d946df5..eeca1669e746 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -106,7 +106,7 @@ object SQLExecution { try { sc.listenerBus.post(SparkListenerSQLExecutionStart( executionId = executionId, - rootExecutionId = rootExecutionId, + rootExecutionId = Some(rootExecutionId), description = desc, details = callSite.longForm, physicalPlanDescription = queryExecution.explainString(planDescriptionMode), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index cd8f31b3c21d..058ecbbb1cd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -260,7 +260,7 @@ private[ui] class ExecutionPagedTable( private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}" - private val showSubExecutions = subExecutions.nonEmpty + private val showSubExecutions = subExecutions.exists(_._2.nonEmpty) override def tableId: String = s"$executionTag-table" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 32b215b1c2e8..7b9f877bdef5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -356,7 +356,7 @@ class SQLAppStatusListener( kvstore.write(graphToStore) val exec = getOrCreateExecution(executionId) - exec.rootExecutionId = rootExecutionId + exec.rootExecutionId = rootExecutionId.getOrElse(executionId) exec.description = description exec.details = details exec.physicalPlanDescription = physicalPlanDescription diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b931b4fcde1b..d4c8f600a4e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -44,7 +44,10 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates( case class SparkListenerSQLExecutionStart( executionId: Long, // if the execution is a root, then rootExecutionId == executionId - rootExecutionId: Long, + // if the event is parsed from the event log that generated by Spark not support + // nested execution, then rootExecutionId = None + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + rootExecutionId: Option[Long], description: String, details: String, physicalPlanDescription: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index e9d98ee97157..49cca666d1d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -30,43 +30,53 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { test("SparkPlanGraph backward compatibility: metadata") { Seq(true, false).foreach { newExecutionStartEvent => - val event = if (newExecutionStartEvent) { - "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart" - } else { - "org.apache.spark.sql.execution.OldVersionSQLExecutionStart" - } - val SQLExecutionStartJsonString = - s""" - |{ - | "Event":"$event", - | "executionId":0, - | "description":"test desc", - | "details":"test detail", - | "physicalPlanDescription":"test plan", - | "sparkPlanInfo": { - | "nodeName":"TestNode", - | "simpleString":"test string", - | "children":[], - | "metadata":{}, - | "metrics":[] - | }, - | "time":0, - | "modifiedConfigs": { - | "k1":"v1" - | } - |} - """.stripMargin + Seq(true, false).foreach { newExecutionStartJson => + val event = if (newExecutionStartEvent) { + "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart" + } else { + "org.apache.spark.sql.execution.OldVersionSQLExecutionStart" + } + + val SQLExecutionStartJsonString = + s""" + |{ + | "Event":"$event", + | ${if (newExecutionStartJson) """"rootExecutionId": "1",""" else ""} + | "executionId":0, + | "description":"test desc", + | "details":"test detail", + | "physicalPlanDescription":"test plan", + | "sparkPlanInfo": { + | "nodeName":"TestNode", + | "simpleString":"test string", + | "children":[], + | "metadata":{}, + | "metrics":[] + | }, + | "time":0, + | "modifiedConfigs": { + | "k1":"v1" + | } + |} + """.stripMargin - val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString) - if (newExecutionStartEvent) { - val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", "test detail", - "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, - Map("k1" -> "v1")) - assert(reconstructedEvent == expectedEvent) - } else { - val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", - "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) - assert(reconstructedEvent == expectedOldEvent) + val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString) + if (newExecutionStartEvent) { + val expectedEvent = if (newExecutionStartJson) { + SparkListenerSQLExecutionStart(0, Some(1), "test desc", "test detail", + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, + Map("k1" -> "v1")) + } else { + SparkListenerSQLExecutionStart(0, None, "test desc", "test detail", + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0, + Map("k1" -> "v1")) + } + assert(reconstructedEvent == expectedEvent) + } else { + val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail", + "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) + assert(reconstructedEvent == expectedOldEvent) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala index 42b27bd9f28e..87ac58dbc3ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala @@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite { } // Start SQL Execution - listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", "details1", "plan", + listener.onOtherEvent(SparkListenerSQLExecutionStart(1, Some(1), "desc1", "details1", "plan", new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty)) time += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala index f1b77e502dfe..f9eea3816fca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala @@ -41,8 +41,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { val acceptFn = filter.acceptFn().lift // Verifying with finished SQL execution 1 - assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, "description1", "details1", - "plan", null, 0, Map.empty))) + assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, Some(1), + "description1", "details1", "plan", null, 0, Map.empty))) assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0))) assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null))) assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty))) @@ -88,8 +88,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite { } // Verifying with live SQL execution 2 - assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, "description2", "details2", - "plan", null, 0, Map.empty))) + assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, Some(2), + "description2", "details2", "plan", null, 0, Map.empty))) assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0))) assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null))) assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala index 7af58867f33a..d1cd32f36219 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala @@ -86,7 +86,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( 0, - 0, + Some(0), "test", "test", df.queryExecution.toString, @@ -142,7 +142,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( 0, - 0, + Some(0), "test", "test", df.queryExecution.toString, @@ -150,7 +150,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA System.currentTimeMillis())) listener.onOtherEvent(SparkListenerSQLExecutionStart( 1, - 0, + Some(0), "test", "test", df.queryExecution.toString, @@ -159,7 +159,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA // sub execution has a missing root execution listener.onOtherEvent(SparkListenerSQLExecutionStart( 2, - 100, + Some(100), "test", "test", df.queryExecution.toString, @@ -171,6 +171,43 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA assert(html.contains("id=2")) } + test("SPARK-42754: group sub executions - backward compatibility") { + val statusStore = createStatusStore + val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS) + val request = mock(classOf[HttpServletRequest]) + + val sparkConf = new SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true) + when(tab.conf).thenReturn(sparkConf) + when(tab.sqlStore).thenReturn(statusStore) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + + val listener = statusStore.listener.get + val page = new AllExecutionsPage(tab) + val df = createTestDataFrame + // testing compatibility with old event logs for which rootExecutionId = None + // because the field is missing when generated by a Spark version not support + // nested execution + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 0, + None, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerSQLExecutionStart( + 1, + None, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + assert(!html.contains("sub execution ids") && !html.contains("sub-execution-list")) + } + protected def createStatusStore: SQLAppStatusStore private def createTestDataFrame: DataFrame = { @@ -196,7 +233,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala index 3b9efb180578..252bcea8b8ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala @@ -75,7 +75,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase { val executionId = idgen.incrementAndGet() val executionStart = SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), getClass().getName(), getClass().getName(), getClass().getName(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 81c745029fd8..fdc633f35566 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -191,7 +191,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -382,7 +382,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -413,7 +413,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -455,7 +455,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -486,7 +486,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -518,7 +518,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes val df = createTestDataFrame listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString, @@ -659,7 +659,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 1, - 1, + Some(1), "test", "test", df.queryExecution.toString, @@ -669,7 +669,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 2, - 2, + Some(2), "test", "test", df.queryExecution.toString, @@ -687,7 +687,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 3, - 3, + Some(3), "test", "test", df.queryExecution.toString, @@ -724,7 +724,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, - executionId, + Some(executionId), "test", "test", df.queryExecution.toString,