Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst

import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.BoundedPriorityQueue


Expand All @@ -41,6 +43,9 @@ object QueryPlanningTracker {
val OPTIMIZATION = "optimization"
val PLANNING = "planning"

val phaseHeader = "=== Spark Phase Timing Statistics ==="
val ruleHeader = "=== Spark Rule Timing Statistics ==="

/**
* Summary for a rule.
* @param totalTimeNs total amount of time, in nanosecs, spent in this rule.
Expand Down Expand Up @@ -101,6 +106,8 @@ class QueryPlanningTracker {
// From a phase to its start time and end time, in ms.
private val phasesMap = new java.util.HashMap[String, PhaseSummary]

private val numRules = SQLConf.get.uiRulesShow

/**
* Measure the start and end time of a phase. Note that if this function is called multiple
* times for the same phase, the recorded start time will be the start time of the first call,
Expand Down Expand Up @@ -160,4 +167,26 @@ class QueryPlanningTracker {
}
}

def compileStatsString(): String = {
val phaseStr = ListMap(phases.toSeq.sortBy(_._1): _*).view map {
case (phase, summary) => "Spark Phase: " + phase + "\nTime(ms): " + summary.durationMs
} mkString ("", "\n", "\n")

val ruleStr = topRulesByTime(numRules).view map {
case (rule, summary) =>
val splitstr = rule.split("\\.")
"Spark rule: " + splitstr.take(5).map(_.head).mkString("." ) + "." +
splitstr.drop(5).mkString(".") + "\nTime(ms): " + summary.totalTimeNs / 1000000.0 +
"\nNumber of Invocations: " + summary.numInvocations +
"\nNumber of Effective Invocations: " + summary.numEffectiveInvocations
} mkString ("", "\n", "")

val compileStatStr = s"""
|$phaseHeader
|$phaseStr
|$ruleHeader
|$ruleStr
""".stripMargin
compileStatStr
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3356,6 +3356,12 @@ object SQLConf {
"'codegen', 'cost' and 'formatted'.")
.createWithDefault("formatted")

val UI_RULE_SHOW = buildConf("spark.sql.expose.logicalPlanRules")
.internal()
.doc("number of individual Catalyst rules to show on UI for Spark compile metrics per query.")
.intConf
.createWithDefault(15)

val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")
.doc("The max length of a file that can be read by the binary file data source. " +
"Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
Expand Down Expand Up @@ -4263,6 +4269,8 @@ class SQLConf extends Serializable with Logging {

def uiExplainMode: String = getConf(UI_EXPLAIN_MODE)

def uiRulesShow: Int = getConf(UI_RULE_SHOW)

def addSingleFileInAddFile: Boolean = getConf(LEGACY_ADD_SINGLE_FILE_IN_ADD_FILE)

def legacyMsSqlServerNumericMappingEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ object SQLExecution {
throw e
} finally {
val endTime = System.nanoTime()
val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis())
val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis(),
queryExecution.tracker.compileStatsString())
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name`
// parameter. The `ExecutionListenerManager` only watches SQL executions with name. We
// can specify the execution name in more places in the future, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging

val metrics = sqlStore.executionMetrics(executionId)
val graph = sqlStore.planGraph(executionId)
val compileStatsStr = sqlStore.getCompilerStats(executionId)

summary ++
compileStatsDescription(compileStatsStr) ++
planVisualization(request, metrics, graph) ++
physicalPlanDescription(executionUIData.physicalPlanDescription) ++
modifiedConfigs(
Expand Down Expand Up @@ -219,6 +221,25 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
<br/>
}

private def compileStatsDescription(compileStatsStr: String): Seq[Node] = {
<div>
<span style="cursor: pointer;" onclick="clickCompileStatsDetails();">
<span id="compile-stats-details-arrow" class="arrow-closed"></span>
<a>Compilation Stats</a>
</span>
</div>
<div id="compile-stats-details" style="display: none;">
<pre>{compileStatsStr}</pre>
</div>
<script>
function clickCompileStatsDetails() {{
$('#compile-stats-details').toggle();
$('#compile-stats-details-arrow').toggleClass('arrow-open').toggleClass('arrow-closed');
}}
</script>
<br/>
}

private def propertyHeader = Seq("Name", "Value")
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ class SQLAppStatusListener(
}

private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
val SparkListenerSQLExecutionEnd(executionId, time) = event
val SparkListenerSQLExecutionEnd(executionId, time, tracker) = event
val compileStats = CompilerStats(executionId, tracker)
kvstore.write(compileStats)
Option(liveExecutions.get(executionId)).foreach { exec =>
exec.completionTime = Some(new Date(time))
update(exec)
Expand Down Expand Up @@ -474,6 +476,7 @@ class SQLAppStatusListener(
toDelete.foreach { e =>
kvstore.delete(e.getClass(), e.executionId)
kvstore.delete(classOf[SparkPlanGraphWrapper], e.executionId)
kvstore.delete(classOf[CompilerStats], e.executionId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class SQLAppStatusStore(
store.count(classOf[SparkPlanGraphWrapper])
}

def compilerStatsCount(): Long = {
store.count(classOf[CompilerStats])
}

def executionMetrics(executionId: Long): Map[Long, String] = {
def metricsFromStore(): Option[Map[Long, String]] = {
val exec = store.read(classOf[SQLExecutionUIData], executionId)
Expand All @@ -79,6 +83,14 @@ class SQLAppStatusStore(
def planGraph(executionId: Long): SparkPlanGraph = {
store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph()
}

def getCompilerStats(executionId: Long): String = {
try {
store.read(classOf[CompilerStats], executionId).queryStats
} catch {
case _: NoSuchElementException => ""
}
}
}

class SQLExecutionUIData(
Expand Down Expand Up @@ -148,3 +160,8 @@ case class SQLPlanMetric(
name: String,
accumulatorId: Long,
metricType: String)

case class CompilerStats(
@KVIndexParam val executionId: Long,
val queryStats: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class SparkListenerSQLExecutionStart(
}

@DeveloperApi
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long, tracker: String = "")
extends SparkListenerEvent {

// The name of the execution, e.g. `df.collect` will trigger a SQL execution with name "collect".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext {
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] =
classOf[SQLDiagnosticResource]

@Path("applications/{appId}/compiler")
def compilerStat(@PathParam("appId") appId: String): Class[SqlCompilerResource] =
classOf[SqlCompilerResource]

@Path("applications/{appId}/{attemptId}/compiler")
def compilerStat(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SqlCompilerResource] =
classOf[SqlCompilerResource]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1.sql

import javax.ws.rs._
import javax.ws.rs.core.MediaType
import scala.collection.mutable.ListBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData}
import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}



@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class SqlCompilerResource extends BaseAppResource with Logging {

@GET
def compilerStat(
@DefaultValue("0") @QueryParam("offset") offset: Int,
@DefaultValue("20") @QueryParam("length") length: Int): Seq[CompileData] = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
sqlStore.executionsList(offset, length).map { exec =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should offset and length be validated to be positive ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be needed. I added SqlCompilerResource.scala based on SqlResource.scala

val compileStats = sqlStore.getCompilerStats(exec.executionId)
prepareCompileData(exec, compileStats)
}
}
}

@GET
@Path("{executionId:\\d+}")
def compilerStat(
@PathParam("executionId") execId: Long): CompileData = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
val compileStats = sqlStore.getCompilerStats(execId)
sqlStore
.execution(execId)
.map(prepareCompileData(_, compileStats))
.getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
}
}

private def prepareCompileData(
exec: SQLExecutionUIData,
compileStats: String): CompileData = {

val phaseString = compileStats.split(QueryPlanningTracker.ruleHeader)
val phaseListStr = phaseString.head.split(QueryPlanningTracker.phaseHeader)(1).
split("\\r?\\n")
val phaseTimes = new ListBuffer[Phase]()
for(i <- 1 until phaseListStr.length by 2) {
val phaseStr = phaseListStr(i).split(":")(1).trim
val time = phaseListStr(i + 1).split(":")(1).trim
phaseTimes += Phase(phaseStr, time.toLong)
}

val rulesListStr = phaseString(1).trim().split("\\r?\\n")
val rules = new ListBuffer[Rule]()
for (i <- 0 until rulesListStr.length by 4) {
val name = rulesListStr(i).split(":")(1).trim
val time = rulesListStr(i + 1).split(":")(1).trim
val invocation = rulesListStr(i + 2).split(": ")(1).trim
val effective = rulesListStr(i + 3).split(": ")(1).trim
rules += Rule(name, time.toDouble, invocation.toLong, effective.toLong)
}

new CompileData(
exec.executionId,
phaseTimes.toSeq,
rules.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ class ExecutionData private[spark] (
val nodes: Seq[Node],
val edges: Seq[SparkPlanGraphEdge])

class CompileData private[spark] (
val executionId: Long,
val phases: Seq[Phase],
val rules: Seq[Rule])

case class Rule private[spark](
ruleName: String,
timeMs: Double,
numInvocations: Long,
numEffectiveInvocations: Long)

case class Node private[spark](
nodeId: Long,
nodeName: String,
Expand All @@ -41,6 +52,8 @@ case class Node private[spark](

case class Metric private[spark] (name: String, value: String)

case class Phase private[spark](phase: String, timeMs: Long)

class SQLDiagnosticData private[spark] (
val id: Long,
val physicalPlan: String,
Expand All @@ -50,3 +63,5 @@ class SQLDiagnosticData private[spark] (
val planChanges: Seq[AdaptivePlanChange])

case class AdaptivePlanChange(updateTime: Date, physicalPlan: String)

case class PhaseTime private[spark] (phase: String, timeMs: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
|{
| "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
| "executionId" : 1,
| "time" : 10
| "time" : 10,
| "tracker" : ""
|}
""".stripMargin))
val readBack = JsonProtocol.sparkEventFromJson(json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
val statusStore = spark.sharedState.statusStore
assert(statusStore.executionsCount() <= 50)
assert(statusStore.planGraphCount() <= 50)
assert(statusStore.compilerStatsCount() <= 50)
// No live data should be left behind after all executions end.
assert(statusStore.listener.get.noLiveData())
}
Expand Down
Loading