Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ message SQLExecutionUIData {
repeated int64 stages = 12;
bool metric_values_is_null = 13;
map<int64, string> metric_values = 14;
optional string query_id = 15;
}

message SparkPlanGraphNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ private[ui] class ExecutionPagedTable(
private val headerInfo: Seq[(String, Boolean, Option[String])] = {
Seq(
("ID", true, None),
("Query ID", true, None),
("Description", true, None),
("Submitted", true, None),
("Duration", true, Some("Time from query submission to completion (or if still executing," +
Expand Down Expand Up @@ -372,6 +373,9 @@ private[ui] class ExecutionPagedTable(
<td>
{executionUIData.executionId.toString}
</td>
<td>
{Option(executionUIData.queryId).getOrElse("N/A")}
</td>
<td>
{descriptionCell(executionUIData)}
</td>
Expand Down Expand Up @@ -428,6 +432,9 @@ private[ui] class ExecutionPagedTable(
<td>
{executionUIData.executionId.toString}
</td>
<td>
{Option(executionUIData.queryId).getOrElse("N/A")}
</td>
<td>
{descriptionCell(executionUIData)}
</td>
Expand Down Expand Up @@ -566,6 +573,7 @@ private[ui] class ExecutionDataSource(
private def ordering(sortColumn: String, desc: Boolean): Ordering[ExecutionTableRowData] = {
val ordering: Ordering[ExecutionTableRowData] = sortColumn match {
case "ID" => Ordering.by(_.executionUIData.executionId)
case "Query ID" => Ordering.by(_.executionUIData.queryId)
case "Description" => Ordering.by(_.executionUIData.description)
case "Submitted" => Ordering.by(_.executionUIData.submissionTime)
case "Duration" => Ordering.by(_.duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
<li>
<strong>Duration: </strong>{UIUtils.formatDuration(duration)}
</li>
{
Option(executionUIData.queryId).map { qId =>
<li>
<strong>Query ID: </strong>{qId}
</li>
}.getOrElse(Seq.empty)
}
{
if (executionUIData.rootExecutionId != executionId) {
<li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class SQLAppStatusListener(
val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId)
val executionData = new LiveExecutionData(executionId)
executionData.rootExecutionId = sqlStoreData.rootExecutionId
executionData.queryId = sqlStoreData.queryId
executionData.description = sqlStoreData.description
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
Expand Down Expand Up @@ -343,7 +344,7 @@ class SQLAppStatusListener(

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, rootExecutionId, description, details,
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _, _) = event
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs, _, _, queryId) = event

val planGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
Expand All @@ -358,6 +359,7 @@ class SQLAppStatusListener(

val exec = getOrCreateExecution(executionId)
exec.rootExecutionId = rootExecutionId.getOrElse(executionId)
exec.queryId = queryId.orNull
exec.description = description
exec.details = details
exec.physicalPlanDescription = physicalPlanDescription
Expand Down Expand Up @@ -487,6 +489,7 @@ class SQLAppStatusListener(
private class LiveExecutionData(val executionId: Long) extends LiveEntity {

var rootExecutionId: Long = _
var queryId: java.util.UUID = null
var description: String = null
var details: String = null
var physicalPlanDescription: String = null
Expand Down Expand Up @@ -525,7 +528,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
errorMessage,
jobs,
stages,
metricsValues)
metricsValues,
queryId)
}

def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.ui

import java.lang.{Long => JLong}
import java.util.Date
import java.util.{Date, UUID}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -102,7 +102,13 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String]) {
val metricValues: Map[Long, String],
/**
* A unique identifier for the query.
* For backward compatibility, queryId is null when we parse event logs
* generated by old versions of Spark.
*/
val queryId: UUID = null) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.status.protobuf.sql

import java.util.Date
import java.util.{Date, UUID}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -57,6 +57,9 @@ private[protobuf] class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLEx
case (k, v) => builder.putMetricValues(k, v)
}
}
if (ui.queryId != null) {
builder.setQueryId(ui.queryId.toString)
}
builder.build().toByteArray
}

Expand Down Expand Up @@ -92,7 +95,8 @@ private[protobuf] class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLEx
errorMessage = errorMessage,
jobs = jobs,
stages = ui.getStagesList.asScala.map(_.toInt).toSet,
metricValues = metricValues
metricValues = metricValues,
queryId = if (ui.hasQueryId) UUID.fromString(ui.getQueryId) else null
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ object SqlResourceSuite {
1 -> JobExecutionStatus.SUCCEEDED),
stages = Set[Int](),
metricValues = getMetricValues(),
errorMessage = None
errorMessage = None,
queryId = java.util.UUID.fromString("efe98ba7-1532-491e-9b4f-4be621cef37c")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
errorMessage = normal.errorMessage,
jobs = normal.jobs,
stages = normal.stages,
metricValues = normal.metricValues
metricValues = normal.metricValues,
queryId = null
)
Seq(normal, withNull).foreach { input =>
val bytes = serializer.serialize(input)
Expand All @@ -69,6 +70,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
assert(result.jobs == input.jobs)
assert(result.stages == input.stages)
assert(result.metricValues == input.metricValues)
assert(result.queryId == input.queryId)
}
}

Expand All @@ -88,7 +90,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
errorMessage = templateData.errorMessage,
jobs = templateData.jobs,
stages = templateData.stages,
metricValues = Map.empty
metricValues = Map.empty,
queryId = templateData.queryId
)
val bytes1 = serializer.serialize(input1)
val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData])
Expand All @@ -108,7 +111,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
errorMessage = templateData.errorMessage,
jobs = templateData.jobs,
stages = templateData.stages,
metricValues = null
metricValues = null,
queryId = templateData.queryId
)
val bytes2 = serializer.serialize(input2)
val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])
Expand Down