Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ private[spark] class AppStatusStore(
(stage, stageDataWrapper.jobIds.toSeq)
}

def stageExecutorSummary(
stageId: Int,
stageAttemptId: Int,
unsortedQuantiles: Array[Double]): Option[v1.ExecutorMetricsDistributions] = {
val quantiles = unsortedQuantiles.sorted
val summary = executorSummary(stageId, stageAttemptId)
if (summary.isEmpty) {
None
} else {
val executorMetricsSummary = summary.values.flatMap(_.peakMemoryMetrics).toIndexedSeq
Some(new v1.ExecutorMetricsDistributions(quantiles, executorMetricsSummary))
}
}

def taskCount(stageId: Int, stageAttemptId: Int): Long = {
store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ private[v1] class StagesResource extends BaseAppResource {
}
}

@GET
@Path("{stageId: \\d+}/{stageAttemptId: \\d+}/executorMetricsDistribution")
def executorSummary(
@PathParam("stageId") stageId: Int,
@PathParam("stageAttemptId") stageAttemptId: Int,
@DefaultValue("0.05,0.25,0.5,0.75,0.95")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the default value to 0.0,0.25,0.5,0.75,1.0. In a parallel system, the duration of a stage is often determined by the slowest task/executor. To monitor/debug a skew issue, the maximal value (or 100% percentile value) is more useful than the 95% percentile value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the default value to 0.0,0.25,0.5,0.75,1.0. In a parallel system, the duration of a stage is often determined by the slowest task/executor. To monitor/debug a skew issue, the maximal value (or 100% percentile value) is more useful than the 95% percentile value.

Add this value since

@GET
@Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskSummary")
def taskSummary(
@PathParam("stageId") stageId: Int,
@PathParam("stageAttemptId") stageAttemptId: Int,
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String)
: TaskMetricDistributions = withUI { ui =>

Similar API need keep consistent too....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You want to keep it consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You want to keep it consistent.

Emmmm, I found that in ui and restful API the quantiles is not same....

@QueryParam("quantiles") quantileString: String): ExecutorMetricsDistributions = {
withUI { ui =>
val quantiles = quantileString.split(",").map { s =>
try {
s.toDouble
} catch {
case nfe: NumberFormatException =>
throw new BadParameterException("quantiles", "double", s)
}
}

ui.store.stageExecutorSummary(stageId, stageAttemptId, quantiles).getOrElse(
throw new NotFoundException(s"No executor reported metrics yet.")
)
}
}

// Performs pagination on the server side
def doPagination(queryParameters: MultivaluedMap[String, String], stageId: Int,
stageAttemptId: Int, isSearch: Boolean, totalRecords: Int): Seq[TaskData] = {
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer,
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}

import org.apache.spark.JobExecutionStatus
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest}
Expand Down Expand Up @@ -170,6 +171,19 @@ private[spark] class ExecutorMetricsJsonSerializer
value.isEmpty
}

private[spark] class ExecutorMetricsDistributionJsonSerializer
extends JsonSerializer[ExecutorMetricsDistributions] {
override def serialize(
metrics: ExecutorMetricsDistributions,
jsonGenerator: JsonGenerator,
serializerProvider: SerializerProvider): Unit = {
val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
metric -> metrics.getMetricDistribution(metric)
}
jsonGenerator.writeObject(metricsMap)
}
}

class JobData private[spark](
val jobId: Int,
val name: String,
Expand Down Expand Up @@ -360,6 +374,21 @@ class TaskMetricDistributions private[spark](
val shuffleReadMetrics: ShuffleReadMetricDistributions,
val shuffleWriteMetrics: ShuffleWriteMetricDistributions)

@DeveloperApi
@JsonSerialize(using = classOf[ExecutorMetricsDistributionJsonSerializer])
class ExecutorMetricsDistributions private[spark](
val quantiles: IndexedSeq[Double],
val executorMetrics: IndexedSeq[ExecutorMetrics]) {
private lazy val count = executorMetrics.length
private lazy val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

/** Returns the distributions for the specified metric. */
def getMetricDistribution(metricName: String): IndexedSeq[Double] = {
val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted
indices.map(i => sorted(i.toInt).toDouble).toIndexedSeq
}
}

class InputMetricDistributions private[spark](
val bytesRead: IndexedSeq[Double],
val recordsRead: IndexedSeq[Double])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"JVMHeapMemory" : [ 1.40508272E8, 1.40508272E8, 4.99823904E8, 5.08119376E8, 5.08119376E8 ],
"JVMOffHeapMemory" : [ 5.8916608E7, 5.8916608E7, 5.8989648E7, 8.8453064E7, 8.8453064E7 ],
"OnHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"OffHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"OnHeapStorageMemory" : [ 5514.0, 5514.0, 5514.0, 5514.0, 5514.0 ],
"OffHeapStorageMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"OnHeapUnifiedMemory" : [ 5514.0, 5514.0, 5514.0, 5514.0, 5514.0 ],
"OffHeapUnifiedMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"DirectPoolMemory" : [ 10246.0, 10246.0, 10440.0, 137861.0, 137861.0 ],
"MappedPoolMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreeJVMVMemory" : [ 8.286560256E9, 8.286560256E9, 9.678606336E9, 9.680105472E9, 9.680105472E9 ],
"ProcessTreeJVMRSSMemory" : [ 4.97471488E8, 4.97471488E8, 7.7697024E8, 8.58959872E8, 8.58959872E8 ],
"ProcessTreePythonVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreePythonRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreeOtherVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreeOtherRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"MinorGCCount" : [ 7.0, 7.0, 19.0, 19.0, 19.0 ],
"MinorGCTime" : [ 55.0, 55.0, 118.0, 122.0, 122.0 ],
"MajorGCCount" : [ 2.0, 2.0, 2.0, 3.0, 3.0 ],
"MajorGCTime" : [ 60.0, 60.0, 63.0, 144.0, 144.0 ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"stage task list w/ status & sortBy short names: runtime" ->
"applications/local-1430917381534/stages/0/0/taskList?status=success&sortBy=runtime",

"stage executor peak memory metrics distributions json" ->
"applications/application_1553914137147_0018/stages/0/0/executorMetricsDistribution",

"stage list with accumulable json" -> "applications/local-1426533911241/1/stages",
"stage with accumulable json" -> "applications/local-1426533911241/1/stages/0/0",
"stage task list from multi-attempt app json(1)" ->
Expand Down
8 changes: 8 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,14 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<br>Example: <code>?offset=10&amp;length=50&amp;sortBy=runtime&amp;status=running</code>
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/executorMetricsDistribution</code></td>
<td>
Summary peak executor metrics of all executors in the given stage attempt.
<br><code>?quantiles</code> summarize the metrics with the given quantiles.
<br>Example: <code>?quantiles=0.01,0.5,0.99</code>
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/executors</code></td>
<td>A list of all active executors for the given application.</td>
Expand Down