diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 35551d8ba77dc..c634205b83906 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst import scala.collection.JavaConverters._ +import scala.collection.immutable.ListMap +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.BoundedPriorityQueue @@ -41,6 +43,9 @@ object QueryPlanningTracker { val OPTIMIZATION = "optimization" val PLANNING = "planning" + val phaseHeader = "=== Spark Phase Timing Statistics ===" + val ruleHeader = "=== Spark Rule Timing Statistics ===" + /** * Summary for a rule. * @param totalTimeNs total amount of time, in nanosecs, spent in this rule. @@ -101,6 +106,8 @@ class QueryPlanningTracker { // From a phase to its start time and end time, in ms. private val phasesMap = new java.util.HashMap[String, PhaseSummary] + private val numRules = SQLConf.get.uiRulesShow + /** * Measure the start and end time of a phase. Note that if this function is called multiple * times for the same phase, the recorded start time will be the start time of the first call, @@ -160,4 +167,26 @@ class QueryPlanningTracker { } } + def compileStatsString(): String = { + val phaseStr = ListMap(phases.toSeq.sortBy(_._1): _*).view map { + case (phase, summary) => "Spark Phase: " + phase + "\nTime(ms): " + summary.durationMs + } mkString ("", "\n", "\n") + + val ruleStr = topRulesByTime(numRules).view map { + case (rule, summary) => + val splitstr = rule.split("\\.") + "Spark rule: " + splitstr.take(5).map(_.head).mkString("." ) + "." + + splitstr.drop(5).mkString(".") + "\nTime(ms): " + summary.totalTimeNs / 1000000.0 + + "\nNumber of Invocations: " + summary.numInvocations + + "\nNumber of Effective Invocations: " + summary.numEffectiveInvocations + } mkString ("", "\n", "") + + val compileStatStr = s""" + |$phaseHeader + |$phaseStr + |$ruleHeader + |$ruleStr + """.stripMargin + compileStatStr + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0839a2f487511..7cfb7ad426f27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3356,6 +3356,12 @@ object SQLConf { "'codegen', 'cost' and 'formatted'.") .createWithDefault("formatted") + val UI_RULE_SHOW = buildConf("spark.sql.expose.logicalPlanRules") + .internal() + .doc("number of individual Catalyst rules to show on UI for Spark compile metrics per query.") + .intConf + .createWithDefault(15) + val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength") .doc("The max length of a file that can be read by the binary file data source. " + "Spark will fail fast and not attempt to read the file if its length exceeds this value. " + @@ -4263,6 +4269,8 @@ class SQLConf extends Serializable with Logging { def uiExplainMode: String = getConf(UI_EXPLAIN_MODE) + def uiRulesShow: Int = getConf(UI_RULE_SHOW) + def addSingleFileInAddFile: Boolean = getConf(LEGACY_ADD_SINGLE_FILE_IN_ADD_FILE) def legacyMsSqlServerNumericMappingEnabled: Boolean = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 953c370297f01..34c85924b1ed1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -115,7 +115,8 @@ object SQLExecution { throw e } finally { val endTime = System.nanoTime() - val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis()) + val event = SparkListenerSQLExecutionEnd(executionId, System.currentTimeMillis(), + queryExecution.tracker.compileStatsString()) // Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name` // parameter. The `ExecutionListenerManager` only watches SQL executions with name. We // can specify the execution name in more places in the future, so that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index 498bb2a6c1c99..b803fdd04b7d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -84,8 +84,10 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging val metrics = sqlStore.executionMetrics(executionId) val graph = sqlStore.planGraph(executionId) + val compileStatsStr = sqlStore.getCompilerStats(executionId) summary ++ + compileStatsDescription(compileStatsStr) ++ planVisualization(request, metrics, graph) ++ physicalPlanDescription(executionUIData.physicalPlanDescription) ++ modifiedConfigs( @@ -219,6 +221,25 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
} + private def compileStatsDescription(compileStatsStr: String): Seq[Node] = { +
+ + + Compilation Stats + +
+ + +
+ } + private def propertyHeader = Seq("Name", "Value") private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 9988df025b6af..0ed3f97f9c16b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -393,7 +393,9 @@ class SQLAppStatusListener( } private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { - val SparkListenerSQLExecutionEnd(executionId, time) = event + val SparkListenerSQLExecutionEnd(executionId, time, tracker) = event + val compileStats = CompilerStats(executionId, tracker) + kvstore.write(compileStats) Option(liveExecutions.get(executionId)).foreach { exec => exec.completionTime = Some(new Date(time)) update(exec) @@ -474,6 +476,7 @@ class SQLAppStatusListener( toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) kvstore.delete(classOf[SparkPlanGraphWrapper], e.executionId) + kvstore.delete(classOf[CompilerStats], e.executionId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 95035c08a2cbf..9dd3baed07162 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -62,6 +62,10 @@ class SQLAppStatusStore( store.count(classOf[SparkPlanGraphWrapper]) } + def compilerStatsCount(): Long = { + store.count(classOf[CompilerStats]) + } + def executionMetrics(executionId: Long): Map[Long, String] = { def metricsFromStore(): Option[Map[Long, String]] = { val exec = store.read(classOf[SQLExecutionUIData], executionId) @@ -79,6 +83,14 @@ class SQLAppStatusStore( def planGraph(executionId: Long): SparkPlanGraph = { store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() } + + def getCompilerStats(executionId: Long): String = { + try { + store.read(classOf[CompilerStats], executionId).queryStats + } catch { + case _: NoSuchElementException => "" + } + } } class SQLExecutionUIData( @@ -148,3 +160,8 @@ case class SQLPlanMetric( name: String, accumulatorId: Long, metricType: String) + +case class CompilerStats( + @KVIndexParam val executionId: Long, + val queryStats: String +) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index e3f51cbe3b003..04ff345ee9453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -56,7 +56,7 @@ case class SparkListenerSQLExecutionStart( } @DeveloperApi -case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) +case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long, tracker: String = "") extends SparkListenerEvent { // The name of the execution, e.g. `df.collect` will trigger a SQL execution with name "collect". diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index 6c727c4369d87..70ca2700ab96d 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -42,4 +42,14 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext { @PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] = classOf[SQLDiagnosticResource] + + @Path("applications/{appId}/compiler") + def compilerStat(@PathParam("appId") appId: String): Class[SqlCompilerResource] = + classOf[SqlCompilerResource] + + @Path("applications/{appId}/{attemptId}/compiler") + def compilerStat( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): Class[SqlCompilerResource] = + classOf[SqlCompilerResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlCompilerResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlCompilerResource.scala new file mode 100644 index 0000000000000..9641d66ec5b79 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlCompilerResource.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import javax.ws.rs._ +import javax.ws.rs.core.MediaType +import scala.collection.mutable.ListBuffer +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.QueryPlanningTracker +import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData} +import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} + + + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class SqlCompilerResource extends BaseAppResource with Logging { + + @GET + def compilerStat( + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int): Seq[CompileData] = { + withUI { ui => + val sqlStore = new SQLAppStatusStore(ui.store.store) + sqlStore.executionsList(offset, length).map { exec => + val compileStats = sqlStore.getCompilerStats(exec.executionId) + prepareCompileData(exec, compileStats) + } + } + } + + @GET + @Path("{executionId:\\d+}") + def compilerStat( + @PathParam("executionId") execId: Long): CompileData = { + withUI { ui => + val sqlStore = new SQLAppStatusStore(ui.store.store) + val compileStats = sqlStore.getCompilerStats(execId) + sqlStore + .execution(execId) + .map(prepareCompileData(_, compileStats)) + .getOrElse(throw new NotFoundException("unknown query execution id: " + execId)) + } + } + + private def prepareCompileData( + exec: SQLExecutionUIData, + compileStats: String): CompileData = { + + val phaseString = compileStats.split(QueryPlanningTracker.ruleHeader) + val phaseListStr = phaseString.head.split(QueryPlanningTracker.phaseHeader)(1). + split("\\r?\\n") + val phaseTimes = new ListBuffer[Phase]() + for(i <- 1 until phaseListStr.length by 2) { + val phaseStr = phaseListStr(i).split(":")(1).trim + val time = phaseListStr(i + 1).split(":")(1).trim + phaseTimes += Phase(phaseStr, time.toLong) + } + + val rulesListStr = phaseString(1).trim().split("\\r?\\n") + val rules = new ListBuffer[Rule]() + for (i <- 0 until rulesListStr.length by 4) { + val name = rulesListStr(i).split(":")(1).trim + val time = rulesListStr(i + 1).split(":")(1).trim + val invocation = rulesListStr(i + 2).split(": ")(1).trim + val effective = rulesListStr(i + 3).split(": ")(1).trim + rules += Rule(name, time.toDouble, invocation.toLong, effective.toLong) + } + + new CompileData( + exec.executionId, + phaseTimes.toSeq, + rules.toSeq) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 3cafc10352f01..5900f353a2613 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -33,6 +33,17 @@ class ExecutionData private[spark] ( val nodes: Seq[Node], val edges: Seq[SparkPlanGraphEdge]) +class CompileData private[spark] ( + val executionId: Long, + val phases: Seq[Phase], + val rules: Seq[Rule]) + +case class Rule private[spark]( + ruleName: String, + timeMs: Double, + numInvocations: Long, + numEffectiveInvocations: Long) + case class Node private[spark]( nodeId: Long, nodeName: String, @@ -41,6 +52,8 @@ case class Node private[spark]( case class Metric private[spark] (name: String, value: String) +case class Phase private[spark](phase: String, timeMs: Long) + class SQLDiagnosticData private[spark] ( val id: Long, val physicalPlan: String, @@ -50,3 +63,5 @@ class SQLDiagnosticData private[spark] ( val planChanges: Seq[AdaptivePlanChange]) case class AdaptivePlanChange(updateTime: Date, physicalPlan: String) + +case class PhaseTime private[spark] (phase: String, timeMs: Long) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index 4fd8341b3f528..799c9b1ee6e20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -85,7 +85,8 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession { |{ | "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd", | "executionId" : 1, - | "time" : 10 + | "time" : 10, + | "tracker" : "" |} """.stripMargin)) val readBack = JsonProtocol.sparkEventFromJson(json) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 6ce4ab3c32456..2ce7c6d0450d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -1048,6 +1048,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { val statusStore = spark.sharedState.statusStore assert(statusStore.executionsCount() <= 50) assert(statusStore.planGraphCount() <= 50) + assert(statusStore.compilerStatsCount() <= 50) // No live data should be left behind after all executions end. assert(statusStore.listener.get.noLiveData()) } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlCompilerResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlCompilerResourceSuite.scala new file mode 100644 index 0000000000000..1192df60127fb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlCompilerResourceSuite.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import java.net.URL +import java.text.SimpleDateFormat +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods +import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils +import org.apache.spark.sql.internal.SQLConf.{ADAPTIVE_EXECUTION_ENABLED, UI_RULE_SHOW} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Sql Compiler Resource Public API Unit Tests running query and extracting the compile metrics. + */ +class SqlCompileResourceSuite + extends SharedSparkSession with SQLMetricsTestUtils with SQLHelper with PrivateMethodTester { + + import testImplicits._ + implicit val formats = new DefaultFormats { + override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") + } + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.ui.enabled", "true") + } + + test("Check Compile Stat Rest Api Endpoints") { + // Materalize result DataFrame + withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val count = getDF().count() + assert(count == 2, s"Expected Query Count is 2 but received: $count") + } + + // Spark apps launched by local-mode seems not having `attemptId` as default + // so UT is just added for existing endpoints. + val executionId = callCompilerStatRestEndpointAndVerifyResult() + callCompilerStatRestEndpointByExecutionIdAndVerifyResult(executionId) + } + + private def callCompilerStatRestEndpointAndVerifyResult(): Long = { + val url = new URL(spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/compiler") + val jsonResult = verifyAndGetCompileStatRestResult(url) + val compilerStats = JsonMethods.parse(jsonResult).extract[Seq[CompileData]] + assert(compilerStats.size > 0, + s"Expected Query Result Size is higher than 0 but received: ${compilerStats.size}") + val compilerStatData = compilerStats.head + verifyCompilerStatRestContent(compilerStatData) + compilerStatData.executionId + } + + private def callCompilerStatRestEndpointByExecutionIdAndVerifyResult(executionId: Long): Unit = { + val url = new URL(spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/compiler/${executionId}") + val jsonResult = verifyAndGetCompileStatRestResult(url) + val compilerStatData = JsonMethods.parse(jsonResult).extract[CompileData] + verifyCompilerStatRestContent(compilerStatData) + } + + private def verifyCompilerStatRestContent(compileStats: CompileData): Unit = { + assert(compileStats.executionId > -1, + s"Expected execution ID is valid, found ${compileStats.executionId}") + assert(compileStats.rules.length == UI_RULE_SHOW.defaultValue.get, + s"Expected number of Spark rules is ${UI_RULE_SHOW.defaultValue.get}," + + s" found ${compileStats.rules.length}") + assert(compileStats.phases.length == 3, + s"Expected number of phase info is: 3 found ${compileStats.phases.length}") + } + + private def verifyAndGetCompileStatRestResult(url: URL): String = { + val (code, resultOpt, error) = getContentAndCode(url) + assert(code == 200, s"Expected Http Response Code is 200 but received: $code for url: $url") + assert(resultOpt.nonEmpty, s"Rest result should not be empty for url: $url") + assert(error.isEmpty, s"Error message should be empty for url: $url") + resultOpt.get + } + + private def getDF(): DataFrame = { + val person: DataFrame = + spark.sparkContext.parallelize( + Person(0, "mike", 30) :: + Person(1, "jim", 20) :: Nil).toDF() + + val salary: DataFrame = + spark.sparkContext.parallelize( + Salary(0, 2000.0) :: + Salary(1, 1000.0) :: Nil).toDF() + + val salaryDF = salary.withColumnRenamed("personId", "id") + val ds = person.join(salaryDF, "id") + .groupBy("name", "age", "salary").avg("age", "salary") + .filter(_.getAs[Int]("age") <= 30) + .sort() + + ds.toDF + } +}