Skip to content

Commit ffb1624

Browse files
asl3cloud-fan
authored andcommitted
[SPARK-55008][SQL][WEBUI] Display Query ID in SparkUI
### What changes were proposed in this pull request? Display queryId in the Spark SQL UI. This is a follow-up to #53625 which added the UUIDv7 queryId to SQL execution events. Add queryId column to the All Executions page and Execution detail page. Also add protobuf serialization support for queryId persistence, for History Server. ### Why are the changes needed? The existing `executionId` is a sequential counter (0, 1, 2...) scoped to a single Spark application; it resets on restart and is not globally unique. `queryId` is a UUIDv7 that is globally unique and time-ordered, enabling reliable cross-system telemetry and query tracking. ### Does this PR introduce _any_ user-facing change? Yes. The SQL tab now displays a "Query ID" column showing the UUID for each execution. ### How was this patch tested? Manually test the SparkUI view: <img width="1180" height="408" alt="Screenshot 2026-01-11 at 8 37 44 PM" src="https://github.com/user-attachments/assets/63b74fe8-fec7-44b6-ba58-81eca65b40c4" /> <img width="1177" height="286" alt="Screenshot 2026-01-11 at 8 38 01 PM" src="https://github.com/user-attachments/assets/7771cecc-237d-4da7-9d3e-7d31628f2d0c" /> Updated `KVStoreProtobufSerializerSuite` to verify queryId round-trips through protobuf serialization. Existing test fixtures updated for new queryId field ### Was this patch authored or co-authored using generative AI tooling? No Closes #53765 from asl3/queryidsparkui. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 0218b8a commit ffb1624

File tree

8 files changed

+45
-10
lines changed

8 files changed

+45
-10
lines changed

core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ message SQLExecutionUIData {
424424
repeated int64 stages = 12;
425425
bool metric_values_is_null = 13;
426426
map<int64, string> metric_values = 14;
427+
optional string query_id = 15;
427428
}
428429

429430
message SparkPlanGraphNode {

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ private[ui] class ExecutionPagedTable(
298298
private val headerInfo: Seq[(String, Boolean, Option[String])] = {
299299
Seq(
300300
("ID", true, None),
301+
("Query ID", true, None),
301302
("Description", true, None),
302303
("Submitted", true, None),
303304
("Duration", true, Some("Time from query submission to completion (or if still executing," +
@@ -372,6 +373,9 @@ private[ui] class ExecutionPagedTable(
372373
<td>
373374
{executionUIData.executionId.toString}
374375
</td>
376+
<td>
377+
{Option(executionUIData.queryId).getOrElse("N/A")}
378+
</td>
375379
<td>
376380
{descriptionCell(executionUIData)}
377381
</td>
@@ -428,6 +432,9 @@ private[ui] class ExecutionPagedTable(
428432
<td>
429433
{executionUIData.executionId.toString}
430434
</td>
435+
<td>
436+
{Option(executionUIData.queryId).getOrElse("N/A")}
437+
</td>
431438
<td>
432439
{descriptionCell(executionUIData)}
433440
</td>
@@ -566,6 +573,7 @@ private[ui] class ExecutionDataSource(
566573
private def ordering(sortColumn: String, desc: Boolean): Ordering[ExecutionTableRowData] = {
567574
val ordering: Ordering[ExecutionTableRowData] = sortColumn match {
568575
case "ID" => Ordering.by(_.executionUIData.executionId)
576+
case "Query ID" => Ordering.by(_.executionUIData.queryId)
569577
case "Description" => Ordering.by(_.executionUIData.description)
570578
case "Submitted" => Ordering.by(_.executionUIData.submissionTime)
571579
case "Duration" => Ordering.by(_.duration)

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
7474
<li>
7575
<strong>Duration: </strong>{UIUtils.formatDuration(duration)}
7676
</li>
77+
{
78+
Option(executionUIData.queryId).map { qId =>
79+
<li>
80+
<strong>Query ID: </strong>{qId}
81+
</li>
82+
}.getOrElse(Seq.empty)
83+
}
7784
{
7885
if (executionUIData.rootExecutionId != executionId) {
7986
<li>

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class SQLAppStatusListener(
9292
val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId)
9393
val executionData = new LiveExecutionData(executionId)
9494
executionData.rootExecutionId = sqlStoreData.rootExecutionId
95+
executionData.queryId = sqlStoreData.queryId
9596
executionData.description = sqlStoreData.description
9697
executionData.details = sqlStoreData.details
9798
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
@@ -343,7 +344,7 @@ class SQLAppStatusListener(
343344

344345
private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
345346
val SparkListenerSQLExecutionStart(executionId, rootExecutionId, description, details,
346-
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _, _) = event
347+
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _, queryId) = event
347348

348349
val planGraph = SparkPlanGraph(sparkPlanInfo)
349350
val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
@@ -358,6 +359,7 @@ class SQLAppStatusListener(
358359

359360
val exec = getOrCreateExecution(executionId)
360361
exec.rootExecutionId = rootExecutionId.getOrElse(executionId)
362+
exec.queryId = queryId.orNull
361363
exec.description = description
362364
exec.details = details
363365
exec.physicalPlanDescription = physicalPlanDescription
@@ -487,6 +489,7 @@ class SQLAppStatusListener(
487489
private class LiveExecutionData(val executionId: Long) extends LiveEntity {
488490

489491
var rootExecutionId: Long = _
492+
var queryId: java.util.UUID = null
490493
var description: String = null
491494
var details: String = null
492495
var physicalPlanDescription: String = null
@@ -525,7 +528,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
525528
errorMessage,
526529
jobs,
527530
stages,
528-
metricsValues)
531+
metricsValues,
532+
queryId)
529533
}
530534

531535
def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution.ui
1919

2020
import java.lang.{Long => JLong}
21-
import java.util.Date
21+
import java.util.{Date, UUID}
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

@@ -102,7 +102,13 @@ class SQLExecutionUIData(
102102
* from the SQL listener instance.
103103
*/
104104
@JsonDeserialize(keyAs = classOf[JLong])
105-
val metricValues: Map[Long, String]) {
105+
val metricValues: Map[Long, String],
106+
/**
107+
* A unique identifier for the query.
108+
* For backward compatibility, queryId is null when we parse event logs
109+
* generated by old versions of Spark.
110+
*/
111+
val queryId: UUID = null) {
106112

107113
@JsonIgnore @KVIndex("completionTime")
108114
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)

sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.status.protobuf.sql
1919

20-
import java.util.Date
20+
import java.util.{Date, UUID}
2121

2222
import scala.jdk.CollectionConverters._
2323

@@ -57,6 +57,9 @@ private[protobuf] class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLEx
5757
case (k, v) => builder.putMetricValues(k, v)
5858
}
5959
}
60+
if (ui.queryId != null) {
61+
builder.setQueryId(ui.queryId.toString)
62+
}
6063
builder.build().toByteArray
6164
}
6265

@@ -92,7 +95,8 @@ private[protobuf] class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLEx
9295
errorMessage = errorMessage,
9396
jobs = jobs,
9497
stages = ui.getStagesList.asScala.map(_.toInt).toSet,
95-
metricValues = metricValues
98+
metricValues = metricValues,
99+
queryId = if (ui.hasQueryId) UUID.fromString(ui.getQueryId) else null
96100
)
97101
}
98102
}

sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ object SqlResourceSuite {
9494
1 -> JobExecutionStatus.SUCCEEDED),
9595
stages = Set[Int](),
9696
metricValues = getMetricValues(),
97-
errorMessage = None
97+
errorMessage = None,
98+
queryId = java.util.UUID.fromString("efe98ba7-1532-491e-9b4f-4be621cef37c")
9899
)
99100
}
100101

sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
5151
errorMessage = normal.errorMessage,
5252
jobs = normal.jobs,
5353
stages = normal.stages,
54-
metricValues = normal.metricValues
54+
metricValues = normal.metricValues,
55+
queryId = null
5556
)
5657
Seq(normal, withNull).foreach { input =>
5758
val bytes = serializer.serialize(input)
@@ -69,6 +70,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
6970
assert(result.jobs == input.jobs)
7071
assert(result.stages == input.stages)
7172
assert(result.metricValues == input.metricValues)
73+
assert(result.queryId == input.queryId)
7274
}
7375
}
7476

@@ -88,7 +90,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
8890
errorMessage = templateData.errorMessage,
8991
jobs = templateData.jobs,
9092
stages = templateData.stages,
91-
metricValues = Map.empty
93+
metricValues = Map.empty,
94+
queryId = templateData.queryId
9295
)
9396
val bytes1 = serializer.serialize(input1)
9497
val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData])
@@ -108,7 +111,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
108111
errorMessage = templateData.errorMessage,
109112
jobs = templateData.jobs,
110113
stages = templateData.stages,
111-
metricValues = null
114+
metricValues = null,
115+
queryId = templateData.queryId
112116
)
113117
val bytes2 = serializer.serialize(input2)
114118
val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])

0 commit comments

Comments
 (0)