diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index b9cc9145feb4..96ea4ac626e9 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -146,6 +146,20 @@ private[spark] class AppStatusStore(
(stage, stageDataWrapper.jobIds.toSeq)
}
+ def stageExecutorSummary(
+ stageId: Int,
+ stageAttemptId: Int,
+ unsortedQuantiles: Array[Double]): Option[v1.ExecutorMetricsDistributions] = {
+ val quantiles = unsortedQuantiles.sorted
+ val summary = executorSummary(stageId, stageAttemptId)
+ if (summary.isEmpty) {
+ None
+ } else {
+ val executorMetricsSummary = summary.values.flatMap(_.peakMemoryMetrics).toIndexedSeq
+ Some(new v1.ExecutorMetricsDistributions(quantiles, executorMetricsSummary))
+ }
+ }
+
def taskCount(stageId: Int, stageAttemptId: Int): Long = {
store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId))
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
index 84bd430d9abe..127f70d3194a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
@@ -154,6 +154,29 @@ private[v1] class StagesResource extends BaseAppResource {
}
}
+ @GET
+ @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/executorMetricsDistribution")
+ def executorSummary(
+ @PathParam("stageId") stageId: Int,
+ @PathParam("stageAttemptId") stageAttemptId: Int,
+ @DefaultValue("0.05,0.25,0.5,0.75,0.95")
+ @QueryParam("quantiles") quantileString: String): ExecutorMetricsDistributions = {
+ withUI { ui =>
+ val quantiles = quantileString.split(",").map { s =>
+ try {
+ s.toDouble
+ } catch {
+ case nfe: NumberFormatException =>
+ throw new BadParameterException("quantiles", "double", s)
+ }
+ }
+
+ ui.store.stageExecutorSummary(stageId, stageAttemptId, quantiles).getOrElse(
+ throw new NotFoundException(s"No executor reported metrics yet.")
+ )
+ }
+ }
+
// Performs pagination on the server side
def doPagination(queryParameters: MultivaluedMap[String, String], stageId: Int,
stageAttemptId: Int, isSearch: Boolean, totalRecords: Int): Seq[TaskData] = {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 96f5b7b5cf27..f01ac1540a87 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer,
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
import org.apache.spark.JobExecutionStatus
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest}
@@ -170,6 +171,19 @@ private[spark] class ExecutorMetricsJsonSerializer
value.isEmpty
}
+private[spark] class ExecutorMetricsDistributionJsonSerializer
+ extends JsonSerializer[ExecutorMetricsDistributions] {
+ override def serialize(
+ metrics: ExecutorMetricsDistributions,
+ jsonGenerator: JsonGenerator,
+ serializerProvider: SerializerProvider): Unit = {
+ val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
+ metric -> metrics.getMetricDistribution(metric)
+ }
+ jsonGenerator.writeObject(metricsMap)
+ }
+}
+
class JobData private[spark](
val jobId: Int,
val name: String,
@@ -360,6 +374,21 @@ class TaskMetricDistributions private[spark](
val shuffleReadMetrics: ShuffleReadMetricDistributions,
val shuffleWriteMetrics: ShuffleWriteMetricDistributions)
+@DeveloperApi
+@JsonSerialize(using = classOf[ExecutorMetricsDistributionJsonSerializer])
+class ExecutorMetricsDistributions private[spark](
+ val quantiles: IndexedSeq[Double],
+ val executorMetrics: IndexedSeq[ExecutorMetrics]) {
+ private lazy val count = executorMetrics.length
+ private lazy val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
+
+ /** Returns the distributions for the specified metric. */
+ def getMetricDistribution(metricName: String): IndexedSeq[Double] = {
+ val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted
+ indices.map(i => sorted(i.toInt).toDouble).toIndexedSeq
+ }
+}
+
class InputMetricDistributions private[spark](
val bytesRead: IndexedSeq[Double],
val recordsRead: IndexedSeq[Double])
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_executor_peak_memory_metrics_distributions_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_executor_peak_memory_metrics_distributions_json_expectation.json
new file mode 100644
index 000000000000..9bae1b9134db
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/stage_executor_peak_memory_metrics_distributions_json_expectation.json
@@ -0,0 +1,22 @@
+{
+ "JVMHeapMemory" : [ 1.40508272E8, 1.40508272E8, 4.99823904E8, 5.08119376E8, 5.08119376E8 ],
+ "JVMOffHeapMemory" : [ 5.8916608E7, 5.8916608E7, 5.8989648E7, 8.8453064E7, 8.8453064E7 ],
+ "OnHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "OffHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "OnHeapStorageMemory" : [ 5514.0, 5514.0, 5514.0, 5514.0, 5514.0 ],
+ "OffHeapStorageMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "OnHeapUnifiedMemory" : [ 5514.0, 5514.0, 5514.0, 5514.0, 5514.0 ],
+ "OffHeapUnifiedMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "DirectPoolMemory" : [ 10246.0, 10246.0, 10440.0, 137861.0, 137861.0 ],
+ "MappedPoolMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "ProcessTreeJVMVMemory" : [ 8.286560256E9, 8.286560256E9, 9.678606336E9, 9.680105472E9, 9.680105472E9 ],
+ "ProcessTreeJVMRSSMemory" : [ 4.97471488E8, 4.97471488E8, 7.7697024E8, 8.58959872E8, 8.58959872E8 ],
+ "ProcessTreePythonVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "ProcessTreePythonRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "ProcessTreeOtherVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "ProcessTreeOtherRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "MinorGCCount" : [ 7.0, 7.0, 19.0, 19.0, 19.0 ],
+ "MinorGCTime" : [ 55.0, 55.0, 118.0, 122.0, 122.0 ],
+ "MajorGCCount" : [ 2.0, 2.0, 2.0, 3.0, 3.0 ],
+ "MajorGCTime" : [ 60.0, 60.0, 63.0, 144.0, 144.0 ]
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 08b211806552..32d94083181c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -163,6 +163,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"stage task list w/ status & sortBy short names: runtime" ->
"applications/local-1430917381534/stages/0/0/taskList?status=success&sortBy=runtime",
+ "stage executor peak memory metrics distributions json" ->
+ "applications/application_1553914137147_0018/stages/0/0/executorMetricsDistribution",
+
"stage list with accumulable json" -> "applications/local-1426533911241/1/stages",
"stage with accumulable json" -> "applications/local-1426533911241/1/stages/0/0",
"stage task list from multi-attempt app json(1)" ->
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 5b3278bca031..3be8cd3c2959 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -503,6 +503,14 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
Example: ?offset=10&length=50&sortBy=runtime&status=running
+
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/executorMetricsDistribution?quantiles summarize the metrics with the given quantiles.
+ ?quantiles=0.01,0.5,0.99
+ /applications/[app-id]/executors