Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change aggregation of executor CPU and run time for Qualification tool to speed up query #2620

Merged
merged 11 commits into from
Jun 8, 2021
26 changes: 10 additions & 16 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ It can also print out any potential problems it finds in a separate column, whic
currently only includes some UDFs. The tool won't catch all UDFs, and some of the UDFs can be handled with additional steps.
Please refer to [supported_ops.md](../docs/supported_ops.md) for more details on UDF.

There is also an optional column `Executor CPU Time Percent`
that can be reported that is not included in the score. This is an estimate at how much time the tasks spent doing
processing on the CPU vs waiting on IO. This is not always a good indicator because sometimes you may be doing IO that
is encrypted and the CPU has to do work to decrypt it, so the environment you are running on needs to be taken into account.
The output also contains a `Executor CPU Time Percent` column that is not included in the score. This is an estimate
at how much time the tasks spent doing processing on the CPU vs waiting on IO. This is not always a good indicator
because sometimes you may be doing IO that is encrypted and the CPU has to do work to decrypt it, so the environment
you are running on needs to be taken into account.

Sample output in csv:
```
Expand All @@ -112,7 +112,10 @@ in the local filesystem, HDFS, S3 or mixed.

### Use from spark-shell
1. Include `rapids-4-spark-tools_2.12-<version>.jar` in the '--jars' option to spark-shell or spark-submit
2. After starting spark-shell:
2. Starting spark-shell:
```bash
$SPARK_HOME/bin/spark-shell --driver-memory 5g --jars ~/rapids-4-spark-tools_2.12-<version>.jar
```

For multiple event logs:
```bash
Expand All @@ -121,7 +124,7 @@ com.nvidia.spark.rapids.tool.qualification.QualificationMain.main(Array("/path/t

### Use from spark-submit
```bash
$SPARK_HOME/bin/spark-submit --class com.nvidia.spark.rapids.tool.qualification.QualificationMain \
$SPARK_HOME/bin/spark-submit --driver-memory 5g --class com.nvidia.spark.rapids.tool.qualification.QualificationMain \
rapids-4-spark-tools_2.12-<version>.jar \
/path/to/eventlog1 /path/to/eventlog2 /directory/with/eventlogs
```
Expand All @@ -135,10 +138,7 @@ rapids-4-spark-tools_2.12-<version>.jar \

For usage see below:

-i, --include-exec-cpu-percent Include the executor CPU time percent. It
will take longer with this option and you may
want to limit the number of applications
processed at once.
--no-exec-cpu-percent Do not include the executor CPU time percent.
-f, --filter-criteria <arg> Filter newest or oldest N event logs for processing.
Supported formats are:
To process 10 recent event logs: --filter-criteria "10-newest"
Expand Down Expand Up @@ -175,12 +175,6 @@ The output location can be changed using the `--output-directory` option. Defaul

The output format can be changed using the `--output-format` option. Default is csv. The other option is text.

The `--include-exec-cpu-percent` option can be used to include executor CPU time percent which is based on the aggregated task metrics:
`sum(executorCPUTime)/sum(executorRunTime)`. It can show us roughly how much time is spent on CPU.
Note: This option needs lot of memory for large amount of event logs as input and will take longer to process.
We suggest you use around 30GB of driver memory if using this option and to only process around 50 apps at a time.
Note you could run it multiple times over 50 each and output to csv and combined afterwards.

Run `--help` for more information.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo], fileWriter: Option[ToolTextFi
}
}

// sql metrics aggregation specific for qualification because
// it aggregates executor time metrics differently
def sqlMetricsAggregationQual(): DataFrame = {
val query = apps
.filter(p => p.allDataFrames.contains(s"sqlDF_${p.index}"))
.map( app => "(" + app.sqlMetricsAggregationSQLQual + ")")
.mkString(" union ")
if (query.nonEmpty) {
apps.head.runQuery(query)
} else {
apps.head.sparkSession.emptyDataFrame
}
}

def sqlMetricsAggregationDurationAndCpuTime(): DataFrame = {
val messageHeader = "\nSQL Duration and Executor CPU Time Percent\n"
val query = apps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,16 @@ case class StageCase(
completionTime: Option[Long],
failureReason: Option[String],
duration: Option[Long],
durationStr: String, gpuMode: Boolean)
durationStr: String,
gpuMode: Boolean,
executorRunTimeSum: Long,
executorCPUTimeSum: Long)

class StageTaskQualificationSummary(
val stageId: Int,
val stageAttemptId: Int,
var executorRunTime: Long,
var executorCPUTime: Long)

// Note: sr = Shuffle Read; sw = Shuffle Write
// Totally 39 columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Qualification extends Logging {
if (apps.isEmpty) return None
val analysis = new Analysis(apps, None)
if (includeCpuPercent) {
val sqlAggMetricsDF = analysis.sqlMetricsAggregation()
val sqlAggMetricsDF = analysis.sqlMetricsAggregationQual()
sqlAggMetricsDF.cache().createOrReplaceTempView("sqlAggMetricsDF")
// materialize table to cache
sqlAggMetricsDF.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ For usage see below:
val numOutputRows: ScallopOption[Int] =
opt[Int](required = false,
descr = "Number of output rows for each Application. Default is 1000.")
val includeExecCpuPercent: ScallopOption[Boolean] =
val noExecCpuPercent: ScallopOption[Boolean] =
opt[Boolean](
required = false,
default = Some(false),
descr = "Include the executor CPU time percent. It will take longer with this option" +
" and you may want to limit the number of applications processed at once.")
descr = "Do not include the executor CPU time percent.")
verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object QualificationMain extends Logging {
// Get the event logs required to process
lazy val allPaths = ToolUtils.processAllPaths(filterN, matchEventLogs, eventlogPaths)

val includeCpuPercent = appArgs.includeExecCpuPercent.getOrElse(false)
val includeCpuPercent = !(appArgs.noExecCpuPercent.getOrElse(false))
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
val dfOpt = Qualification.qualifyApps(allPaths,
numOutputRows, sparkSession, includeCpuPercent, dropTempViews)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package org.apache.spark.sql.rapids.tool.profiling

import scala.collection.Map
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.{Codec, Source}

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import com.nvidia.spark.rapids.tool.profiling._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods.parse
import scala.collection.Map
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.{Codec, Source}

import org.apache.spark.deploy.history.EventLogFileReader
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -125,10 +126,16 @@ class ApplicationInfo(
val stageFailureReason: HashMap[Int, Option[String]] = HashMap.empty[Int, Option[String]]

// From SparkListenerTaskStart & SparkListenerTaskEnd
// taskEnd contains task level metrics
var taskStart: ArrayBuffer[SparkListenerTaskStart] = ArrayBuffer[SparkListenerTaskStart]()
// taskStart was not used so comment out for now
// var taskStart: ArrayBuffer[SparkListenerTaskStart] = ArrayBuffer[SparkListenerTaskStart]()
// taskEnd contains task level metrics - only used for profiling
var taskEnd: ArrayBuffer[TaskCase] = ArrayBuffer[TaskCase]()

// this is used to aggregate metrics for qualification to speed up processing and
// minimize memory usage
var stageTaskQualificationEnd: HashMap[String, StageTaskQualificationSummary] =
HashMap.empty[String, StageTaskQualificationSummary]

// From SparkListenerTaskGettingResult
var taskGettingResult: ArrayBuffer[SparkListenerTaskGettingResult] =
ArrayBuffer[SparkListenerTaskGettingResult]()
Expand Down Expand Up @@ -201,13 +208,14 @@ class ApplicationInfo(
val fs = FileSystem.get(eventlog.toUri,new Configuration())
var totalNumEvents = 0

val eventsProcessor = new EventsProcessor(forQualification)
Utils.tryWithResource(EventLogFileReader.openEventLog(eventlog, fs)) { in =>
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList
totalNumEvents = lines.size
lines.foreach { line =>
try {
val event = JsonProtocol.sparkEventFromJson(parse(line))
EventsProcessor.processAnyEvent(this, event)
eventsProcessor.processAnyEvent(this, event)
logDebug(line)
}
catch {
Expand Down Expand Up @@ -397,10 +405,19 @@ class ApplicationInfo(
case None => ""
}

// only for qualification set the runtime and cputime
// could expand later for profiling
val stageAndAttempt = s"${res.stageId}:${res.attemptId}"
val stageTaskExecSum = stageTaskQualificationEnd.get(stageAndAttempt)
val runTime = stageTaskExecSum.map(_.executorRunTime).getOrElse(0L)
val cpuTime = stageTaskExecSum.map(_.executorCPUTime).getOrElse(0L)

val stageNew = res.copy(completionTime = thisEndTime,
failureReason = thisFailureReason,
duration = durationResult,
durationStr = durationString)
durationStr = durationString,
executorRunTimeSum = runTime,
executorCPUTimeSum = cpuTime)
stageSubmittedNew += stageNew
}
allDataFrames += (s"stageDF_$index" -> stageSubmittedNew.toDF)
Expand All @@ -410,11 +427,13 @@ class ApplicationInfo(
}

// For taskDF
if (taskEnd.nonEmpty) {
allDataFrames += (s"taskDF_$index" -> taskEnd.toDF)
} else {
logError("task is empty! Exiting...")
System.exit(1)
if (!forQualification) {
if (taskEnd.nonEmpty) {
allDataFrames += (s"taskDF_$index" -> taskEnd.toDF)
} else {
logError("task is empty! Exiting...")
System.exit(1)
}
}

// For sqlMetricsDF
Expand Down Expand Up @@ -666,6 +685,21 @@ class ApplicationInfo(
|""".stripMargin
}

// Function to generate a query for getting the executor CPU time and run time
// specifically for how we aggregate for qualification
def sqlMetricsAggregationSQLQual: String = {
s"""select $index as appIndex, '$appId' as appID,
|sq.sqlID, sq.description,
|sum(executorCPUTimeSum) as executorCPUTime,
|sum(executorRunTimeSum) as executorRunTime
|from stageDF_$index s,
|jobDF_$index j, sqlDF_$index sq
|where array_contains(j.stageIds, s.stageId)
|and sq.sqlID=j.sqlID
|group by sq.sqlID,sq.description
|""".stripMargin
}

// Function to generate a query for printing SQL metrics(accumulables)
def generateSQLAccums: String = {
s"""with allaccums as
Expand Down
Loading