Skip to content

Commit a99b6e3

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6400] Log detail sql execution metrics (#1162)
* [CARMEL-6400] Log detail sql execution metrics * Fix code style * Fix code style
1 parent e3898e8 commit a99b6e3

File tree

7 files changed

+183
-2
lines changed

7 files changed

+183
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ object QueryPlanningTracker {
7474
}
7575
}
7676

77+
class ExternalSystemCallSummary {
78+
var callCount: Int = 0
79+
var totalDuration: Long = 0
80+
81+
def updateCallInfo(duration: Long): Unit = {
82+
callCount += 1
83+
totalDuration += duration
84+
}
85+
86+
override def toString: String = {
87+
s"callCount=$callCount, totalDuration=${totalDuration}ms"
88+
}
89+
}
90+
7791
/**
7892
* A thread local variable to implicitly pass the tracker around. This assumes the query planner
7993
* is single-threaded, and avoids passing the same tracker context in every function call.
@@ -104,6 +118,9 @@ class QueryPlanningTracker {
104118
// From a phase to its start time and end time, in ms.
105119
private val phasesMap = new java.util.HashMap[String, PhaseSummary]
106120

121+
// external system call tracking during planning
122+
private val externalSystemCalls = new java.util.HashMap[String, ExternalSystemCallSummary]
123+
107124
val queryPlanningContext = new QueryPlanningContext()
108125

109126
var totalTimeNs = 0L
@@ -171,6 +188,20 @@ class QueryPlanningTracker {
171188

172189
def getViewMap: java.util.HashMap[String, Int] = viewMap
173190

191+
/**
192+
* track external system call information
193+
* @param callName
194+
* @param duration in ms
195+
*/
196+
def recordExternalSystemCall(callName: String, duration: Long): Unit = {
197+
var s = externalSystemCalls.get(callName)
198+
if (s eq null) {
199+
s = new ExternalSystemCallSummary
200+
externalSystemCalls.put(callName, s)
201+
}
202+
s.updateCallInfo(duration)
203+
}
204+
174205
def inheritViewUsage(oldTracker: QueryPlanningTracker): Unit = {
175206
viewMap.putAll(oldTracker.getViewMap)
176207
}
@@ -244,4 +275,23 @@ class QueryPlanningTracker {
244275
""".stripMargin
245276

246277
}
278+
279+
def formattedPlanPhrases(): String = {
280+
val phaseInfo = phases.toSeq.map {
281+
case(phase, summary) =>
282+
s"$phase: ${summary.toString}, duration(${summary.durationMs}ms)"
283+
}.mkString(System.lineSeparator())
284+
285+
val externalSysCalls = externalSystemCalls.asScala.toSeq.map {
286+
case(callName, callSummary) =>
287+
s"$callName: ${callSummary.toString}"
288+
}.mkString(System.lineSeparator())
289+
290+
s"""
291+
|=== phases ===
292+
|$phaseInfo
293+
|=== external system calls ===
294+
|$externalSysCalls
295+
|""".stripMargin
296+
}
247297
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.internal.Logging
3535
import org.apache.spark.internal.io.SparkHadoopWriterUtils
3636
import org.apache.spark.metrics.source.HiveCatalogMetrics
3737
import org.apache.spark.sql.SparkSession
38+
import org.apache.spark.sql.catalyst.QueryPlanningTracker
3839
import org.apache.spark.sql.execution.streaming.FileStreamSink
3940
import org.apache.spark.sql.internal.SQLConf
4041
import org.apache.spark.sql.types.StructType
@@ -158,8 +159,11 @@ class InMemoryFileIndex(
158159
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
159160
output ++= leafFiles
160161
}
161-
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf files" +
162+
val duration = (System.nanoTime() - startTime) / (1000 * 1000)
163+
logInfo(s"It took ${duration} ms to list leaf files" +
162164
s" for ${paths.length} paths.")
165+
QueryPlanningTracker.getCurrent.foreach(t =>
166+
t.recordExternalSystemCall("listFiles", duration))
163167
output
164168
}
165169
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ private[hive] class SparkDownloadDataOperation(
292292

293293
setState(OperationState.FINISHED)
294294
logQueryInfo(s"Finished query [$statementId].")
295+
logInfo(s"Plan details:${result.queryExecution.tracker.formattedPlanPhrases()}")
295296
} catch {
296297
case NonFatal(e) =>
297298
logQueryError(s"Error executing query [$statementId]", e)

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ private[hive] class SparkExecuteStatementOperation(
345345
iter = executeForResults()
346346
dataTypes = result.schema.fields.map(_.dataType)
347347
logQueryInfo(s"Finished query [$statementId].")
348+
logInfo(s"Query [$statementId] plan details:" +
349+
s"${result.queryExecution.tracker.formattedPlanPhrases()}")
348350
}
349351
} catch {
350352
// Actually do need to catch Throwable as some failures don't inherit from Exception and

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,24 @@ private[thriftserver] class HiveThriftServer2Listener(
316316
}
317317
}
318318

319+
def logExecutionMetrics(metrics: Option[QueryExecutionMetrics]): Unit = {
320+
metrics.foreach(m => {
321+
val stageStatsHeader = "stage start finish taskNum slowestTask(taskId execId " +
322+
"taskDuration execRunTime launchDelay inputSize shuffleReadSize)"
323+
val stats =
324+
s"""
325+
|Query [${m.groupId}] execution metrics:
326+
|Task Num: ${m.taskNum}
327+
|Task Time: ${m.totalTaskTime}
328+
|Stage stats:
329+
|$stageStatsHeader
330+
|${m.stageStatsInfo()}
331+
""".stripMargin
332+
logInfo(stats)
333+
}
334+
)
335+
}
336+
319337
private def onQueryExit(e: SparkListenerThriftServerQueryExit): Unit = synchronized {
320338
val execInfo = executionList.get(e.id)
321339
if (execInfo == null) {
@@ -326,6 +344,7 @@ private[thriftserver] class HiveThriftServer2Listener(
326344
val session = sessionList.get(execInfo.sessionId)
327345
val metrics = Option(executionMetrics.get(execInfo.groupId))
328346
l.logQuery(e.id, execInfo, session, e.qlObjects, e.extInfo, metrics)
347+
logExecutionMetrics(metrics)
329348
}
330349
executionMetrics.remove(execInfo.groupId)
331350
if (delayCleanExecution && executionList.get(e.id).state == ExecutionState.CLOSED) {
@@ -659,6 +678,7 @@ class QueryExecutionMetrics(val groupId: String, val sessionId: String,
659678

660679
stageStats.foreach { stats =>
661680
stats.finishTaskNum += 1
681+
stats.updateSlowestTaskMetrics(taskEnd)
662682
if (taskEnd.taskInfo.attemptNumber == 0 && !taskEnd.taskInfo.speculative) {
663683
val launchDelay = taskEnd.taskInfo.launchTime - stats.startTime
664684
if (launchDelay > maxLaunchDelay) maxLaunchDelay = launchDelay
@@ -695,6 +715,13 @@ class QueryExecutionMetrics(val groupId: String, val sessionId: String,
695715
lastExecutionEndTime = exec.time
696716
}
697717

718+
def stageStatsInfo(): String = {
719+
stageStatsMap.toSeq.map { case ((stageId, attemptId), stageStats) =>
720+
s"$stageId-$attemptId ${stageStats.startTime} ${stageStats.endTime} " +
721+
s"${stageStats.totalTaskNum} (${stageStats.slowestTaskStatsInfo()})"
722+
}.mkString(System.lineSeparator())
723+
}
724+
698725
override def toString(): String = {
699726
String.join(
700727
DELIMITER,
@@ -728,6 +755,8 @@ class StageStats(val name: String) {
728755
var maxShuffleWaitTime: Long = 0
729756
var maxInputWaitTime: Long = 0
730757

758+
var slowestTask: Option[SparkListenerTaskEnd] = None
759+
731760
def updateTaskWaitResourceTime(waitResourceTime: Long): Unit = {
732761
if (waitResourceTime > maxWaitResourceTime) {
733762
maxWaitResourceTime = waitResourceTime
@@ -745,8 +774,50 @@ class StageStats(val name: String) {
745774
maxInputWaitTime = inputWaitTime
746775
}
747776
}
777+
778+
def updateSlowestTaskMetrics(taskEnd: SparkListenerTaskEnd): Unit = {
779+
if (slowestTask.isEmpty || taskEnd.taskInfo.finishTime > slowestTask.get.taskInfo.finishTime) {
780+
slowestTask = Some(taskEnd)
781+
}
782+
}
783+
784+
def slowestTaskStatsInfo(): String = {
785+
def extractTaskStats(end: SparkListenerTaskEnd): TaskStats = {
786+
TaskStats(
787+
end.taskInfo.finishTime,
788+
end.taskInfo.taskId,
789+
end.taskInfo.executorId,
790+
end.taskInfo.finishTime - end.taskInfo.launchTime,
791+
end.taskMetrics.executorRunTime,
792+
this.startTime - end.taskInfo.launchTime,
793+
end.taskMetrics.inputMetrics.bytesRead,
794+
end.taskMetrics.shuffleReadMetrics.totalBytesRead
795+
)
796+
}
797+
slowestTask.map(taskEnd => {
798+
val taskStats = extractTaskStats(taskEnd)
799+
s"${taskStats.taskId} " +
800+
s"${taskStats.execId} " +
801+
s"${taskStats.duration} " +
802+
s"${taskStats.executorRunTime} " +
803+
s"${taskStats.launchDelay} " +
804+
s"${taskStats.inputSize} " +
805+
s"${taskStats.shuffleReadSize}"
806+
}).getOrElse("")
807+
}
748808
}
749809

810+
case class TaskStats(
811+
finishTime: Long,
812+
taskId: Long,
813+
execId: String,
814+
duration: Long,
815+
executorRunTime: Long,
816+
launchDelay: Long,
817+
inputSize: Long,
818+
shuffleReadSize: Long
819+
)
820+
750821
case class SqlWithStats(
751822
queryId: String,
752823
statement: String,

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,57 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
221221
assert(sqlWithStats.size == 1)
222222
}
223223

224+
test("log execution metrics") {
225+
val (_, listener: HiveThriftServer2Listener) = createAppStatusStore(true)
226+
227+
val executionMetrics = new QueryExecutionMetrics("groupId", "sessionId")
228+
executionMetrics.stageStart(createStageInfo(0, 0, 2))
229+
executionMetrics.taskStart(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0)))
230+
executionMetrics.taskStart(SparkListenerTaskStart(0, 0, createTaskInfo(1, 0)))
231+
232+
val tm1 = new TaskMetrics
233+
tm1.inputMetrics.setBytesRead(10000)
234+
tm1.inputMetrics.setRecordsRead(1000)
235+
236+
val tm2 = new TaskMetrics
237+
tm2.inputMetrics.setBytesRead(10000)
238+
tm2.inputMetrics.setRecordsRead(1000)
239+
240+
executionMetrics.taskEnd(SparkListenerTaskEnd(
241+
stageId = 0,
242+
stageAttemptId = 0,
243+
taskType = "",
244+
reason = null,
245+
createTaskInfo(0, 0),
246+
new ExecutorMetrics,
247+
tm1))
248+
executionMetrics.taskEnd(SparkListenerTaskEnd(
249+
stageId = 0,
250+
stageAttemptId = 0,
251+
taskType = "",
252+
reason = null,
253+
createTaskInfo(1, 0),
254+
new ExecutorMetrics,
255+
tm2))
256+
executionMetrics.stageComplete(createStageInfo(0, 0, 2))
257+
258+
executionMetrics.stageStart(createStageInfo(1, 0, 1))
259+
executionMetrics.taskStart(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0)))
260+
val tm3 = new TaskMetrics
261+
tm3.inputMetrics.setBytesRead(10000)
262+
tm3.inputMetrics.setRecordsRead(1000)
263+
executionMetrics.taskEnd(SparkListenerTaskEnd(
264+
stageId = 1,
265+
stageAttemptId = 0,
266+
taskType = "",
267+
reason = null,
268+
createTaskInfo(1, 0),
269+
new ExecutorMetrics,
270+
tm3))
271+
executionMetrics.stageComplete(createStageInfo(1, 0, 1))
272+
listener.logExecutionMetrics(Some(executionMetrics))
273+
}
274+
224275
private def createStageInfo(stageId: Int, attemptId: Int, numTasks: Int): StageInfo = {
225276
new StageInfo(stageId = stageId,
226277
attemptId = attemptId,

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.{SparkConf, SparkException}
4949
import org.apache.spark.internal.Logging
5050
import org.apache.spark.metrics.source.HiveCatalogMetrics
5151
import org.apache.spark.sql.AnalysisException
52-
import org.apache.spark.sql.catalyst.TableIdentifier
52+
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier}
5353
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
5454
import org.apache.spark.sql.catalyst.catalog._
5555
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -331,6 +331,8 @@ private[hive] class HiveClientImpl(
331331
val end = System.currentTimeMillis()
332332
HiveCatalogMetrics.updateCallDuration(call, end - start)
333333
HiveCatalogMetrics.incrementCallCount(call, 1)
334+
QueryPlanningTracker.getCurrent.foreach(t =>
335+
t.recordExternalSystemCall("hiveMetastoreRpc", end - start))
334336
t
335337
}
336338

0 commit comments

Comments
 (0)