diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 386c660b16def..2b6ac763cc3e3 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -424,6 +424,7 @@ message SQLExecutionUIData { repeated int64 stages = 12; bool metric_values_is_null = 13; map metric_values = 14; + optional string query_id = 15; } message SparkPlanGraphNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 3a25f5770c9d3..fd364c457a42e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -298,6 +298,7 @@ private[ui] class ExecutionPagedTable( private val headerInfo: Seq[(String, Boolean, Option[String])] = { Seq( ("ID", true, None), + ("Query ID", true, None), ("Description", true, None), ("Submitted", true, None), ("Duration", true, Some("Time from query submission to completion (or if still executing," + @@ -372,6 +373,9 @@ private[ui] class ExecutionPagedTable( {executionUIData.executionId.toString} + + {Option(executionUIData.queryId).getOrElse("N/A")} + {descriptionCell(executionUIData)} @@ -428,6 +432,9 @@ private[ui] class ExecutionPagedTable( {executionUIData.executionId.toString} + + {Option(executionUIData.queryId).getOrElse("N/A")} + {descriptionCell(executionUIData)} @@ -566,6 +573,7 @@ private[ui] class ExecutionDataSource( private def ordering(sortColumn: String, desc: Boolean): Ordering[ExecutionTableRowData] = { val ordering: Ordering[ExecutionTableRowData] = sortColumn match { case "ID" => Ordering.by(_.executionUIData.executionId) + case "Query ID" => Ordering.by(_.executionUIData.queryId) case "Description" => Ordering.by(_.executionUIData.description) case "Submitted" => Ordering.by(_.executionUIData.submissionTime) case "Duration" => Ordering.by(_.duration) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index 9d10e5f9545c9..694ea605e4a30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -74,6 +74,13 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
  • Duration: {UIUtils.formatDuration(duration)}
  • + { + Option(executionUIData.queryId).map { qId => +
  • + Query ID: {qId} +
  • + }.getOrElse(Seq.empty) + } { if (executionUIData.rootExecutionId != executionId) {
  • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index c3b82c3d37008..0125702a37807 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -92,6 +92,7 @@ class SQLAppStatusListener( val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId) val executionData = new LiveExecutionData(executionId) executionData.rootExecutionId = sqlStoreData.rootExecutionId + executionData.queryId = sqlStoreData.queryId executionData.description = sqlStoreData.description executionData.details = sqlStoreData.details executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription @@ -343,7 +344,7 @@ class SQLAppStatusListener( private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, rootExecutionId, description, details, - physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _, _) = event + physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _, queryId) = event val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => @@ -358,6 +359,7 @@ class SQLAppStatusListener( val exec = getOrCreateExecution(executionId) exec.rootExecutionId = rootExecutionId.getOrElse(executionId) + exec.queryId = queryId.orNull exec.description = description exec.details = details exec.physicalPlanDescription = physicalPlanDescription @@ -487,6 +489,7 @@ class SQLAppStatusListener( private class LiveExecutionData(val executionId: Long) extends LiveEntity { var rootExecutionId: Long = _ + var queryId: java.util.UUID = null var description: String = null var details: String = null var physicalPlanDescription: String = null @@ -525,7 +528,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { errorMessage, jobs, stages, - metricsValues) + metricsValues, + queryId) } def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 8681bfb2342b4..e419597c1dc56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.ui import java.lang.{Long => JLong} -import java.util.Date +import java.util.{Date, UUID} import scala.collection.mutable.ArrayBuffer @@ -102,7 +102,13 @@ class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) { + val metricValues: Map[Long, String], + /** + * A unique identifier for the query. + * For backward compatibility, queryId is null when we parse event logs + * generated by old versions of Spark. + */ + val queryId: UUID = null) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index 3e27cdfcf154a..4e3bb6de2b8db 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.status.protobuf.sql -import java.util.Date +import java.util.{Date, UUID} import scala.jdk.CollectionConverters._ @@ -57,6 +57,9 @@ private[protobuf] class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLEx case (k, v) => builder.putMetricValues(k, v) } } + if (ui.queryId != null) { + builder.setQueryId(ui.queryId.toString) + } builder.build().toByteArray } @@ -92,7 +95,8 @@ private[protobuf] class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLEx errorMessage = errorMessage, jobs = jobs, stages = ui.getStagesList.asScala.map(_.toInt).toSet, - metricValues = metricValues + metricValues = metricValues, + queryId = if (ui.hasQueryId) UUID.fromString(ui.getQueryId) else null ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index c5e2e657de8cb..ba742cc9d5271 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -94,7 +94,8 @@ object SqlResourceSuite { 1 -> JobExecutionStatus.SUCCEEDED), stages = Set[Int](), metricValues = getMetricValues(), - errorMessage = None + errorMessage = None, + queryId = java.util.UUID.fromString("efe98ba7-1532-491e-9b4f-4be621cef37c") ) } diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 3f3a6925409cd..6b437aabd6efc 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -51,7 +51,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { errorMessage = normal.errorMessage, jobs = normal.jobs, stages = normal.stages, - metricValues = normal.metricValues + metricValues = normal.metricValues, + queryId = null ) Seq(normal, withNull).foreach { input => val bytes = serializer.serialize(input) @@ -69,6 +70,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.jobs == input.jobs) assert(result.stages == input.stages) assert(result.metricValues == input.metricValues) + assert(result.queryId == input.queryId) } } @@ -88,7 +90,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { errorMessage = templateData.errorMessage, jobs = templateData.jobs, stages = templateData.stages, - metricValues = Map.empty + metricValues = Map.empty, + queryId = templateData.queryId ) val bytes1 = serializer.serialize(input1) val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData]) @@ -108,7 +111,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { errorMessage = templateData.errorMessage, jobs = templateData.jobs, stages = templateData.stages, - metricValues = null + metricValues = null, + queryId = templateData.queryId ) val bytes2 = serializer.serialize(input2) val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])