diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsDistributions.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsDistributions.scala new file mode 100644 index 000000000000..559bfd2015ff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsDistributions.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.SerializerProvider +import com.fasterxml.jackson.databind.annotation.JsonSerialize + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics Distributions tracked for executors and the driver. + */ +@DeveloperApi +@JsonSerialize(using = classOf[ExecutorMetricsDistributionJsonSerializer]) +class ExecutorMetricsDistributions private[spark]( + val quantiles: IndexedSeq[Double], + val executorMetrics: IndexedSeq[ExecutorMetrics]) { + private lazy val count = executorMetrics.length + private lazy val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } + + /** Returns the distributions for the specified metric. */ + def getMetricDistribution(metricName: String): IndexedSeq[Double] = { + val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted + indices.map(i => sorted(i.toInt).toDouble).toIndexedSeq + } +} + +/** + * Serializer for ExecutorMetricsDistributions + * Convert to map with metric name as key + */ +private[spark] class ExecutorMetricsDistributionJsonSerializer + extends JsonSerializer[ExecutorMetricsDistributions] { + override def serialize( + metrics: ExecutorMetricsDistributions, + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { + val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) => + metric -> metrics.getMetricDistribution(metric) + } + jsonGenerator.writeObject(metricsMap) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index affa85b76cf1..a3c9ee6c9493 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.executor.ExecutorMetricsDistributions import org.apache.spark.status.api.v1 import org.apache.spark.ui.scope._ import org.apache.spark.util.Utils @@ -365,6 +366,18 @@ private[spark] class AppStatusStore( Some(computedQuantiles) } + /** + * Calculates a summary of the executor metrics for executors, returning the + * requested quantiles for the recorded metrics. + */ + def executorMetricSummary( + activeOnly: Boolean, + unsortedQuantiles: Array[Double]): Option[ExecutorMetricsDistributions] = { + val quantiles = unsortedQuantiles.sorted + val executors = executorList(activeOnly).flatMap(_.peakMemoryMetrics).toIndexedSeq + Some(new ExecutorMetricsDistributions(quantiles, executors)) + } + /** * Whether to cache information about a specific metric quantile. We cache quantiles at every 0.05 * step, which covers the default values used both in the API and in the stages page. diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala index fb64ff5e6024..7d1f7ba4a3a0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -25,6 +25,7 @@ import javax.ws.rs.core.{MediaType, Response, StreamingOutput} import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkContext} +import org.apache.spark.executor.ExecutorMetricsDistributions import org.apache.spark.status.api.v1 import org.apache.spark.util.Utils @@ -52,6 +53,25 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("executors") def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + @GET + @Path("executorMetricsDistribution") + def executorSummary( + @QueryParam("activeOnly") @DefaultValue("true") activeOnly: Boolean, + @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) + : ExecutorMetricsDistributions = withUI { ui => + val quantiles = quantileString.split(",").map { s => + try { + s.toDouble + } catch { + case nfe: NumberFormatException => + throw new BadParameterException("quantiles", "double", s) + } + } + + ui.store.executorMetricSummary(activeOnly, quantiles).getOrElse( + throw new NotFoundException(s"No executor reported metrics yet.")) + } + @GET @Path("executors/{executorId}/threads") def threadDump(@PathParam("executorId") execId: String): Array[ThreadStackTrace] = withUI { ui => diff --git a/core/src/test/resources/HistoryServerExpectations/executor_peak_memory_metrics_distributions_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_peak_memory_metrics_distributions_expectation.json new file mode 100644 index 000000000000..4c18c12c0b1d --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/executor_peak_memory_metrics_distributions_expectation.json @@ -0,0 +1,22 @@ +{ + "JVMHeapMemory" : [ 2.09883992E8, 4.6213568E8, 7.5947948E8, 9.8473656E8, 9.8473656E8 ], + "JVMOffHeapMemory" : [ 6.0829472E7, 6.1343616E7, 6.271752E7, 9.1926448E7, 9.1926448E7 ], + "OnHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "OffHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "OnHeapStorageMemory" : [ 7023.0, 12537.0, 19560.0, 19560.0, 19560.0 ], + "OffHeapStorageMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "OnHeapUnifiedMemory" : [ 7023.0, 12537.0, 19560.0, 19560.0, 19560.0 ], + "OffHeapUnifiedMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "DirectPoolMemory" : [ 10742.0, 10865.0, 12781.0, 157182.0, 157182.0 ], + "MappedPoolMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "ProcessTreeJVMVMemory" : [ 8.296026112E9, 9.678606336E9, 9.684373504E9, 9.691553792E9, 9.691553792E9 ], + "ProcessTreeJVMRSSMemory" : [ 5.26491648E8, 7.03639552E8, 9.64222976E8, 1.210867712E9, 1.210867712E9 ], + "ProcessTreePythonVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "ProcessTreePythonRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "ProcessTreeOtherVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "ProcessTreeOtherRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], + "MinorGCCount" : [ 7.0, 15.0, 24.0, 27.0, 27.0 ], + "MinorGCTime" : [ 55.0, 106.0, 140.0, 145.0, 145.0 ], + "MajorGCCount" : [ 2.0, 2.0, 2.0, 3.0, 3.0 ], + "MajorGCTime" : [ 60.0, 63.0, 75.0, 144.0, 144.0 ] +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 08b211806552..66c51d48d54a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -177,6 +177,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers "executor node excludeOnFailure unexcluding" -> "applications/app-20161115172038-0000/executors", "executor memory usage" -> "applications/app-20161116163331-0000/executors", + "executor peak memory metrics distributions" -> + "applications/application_1553914137147_0018/executorMetricsDistribution", "executor resource information" -> "applications/application_1555004656427_0144/executors", "multiple resource profiles" -> "applications/application_1578436911597_0052/environment", "stage list with peak metrics" -> "applications/app-20200706201101-0003/stages", diff --git a/docs/monitoring.md b/docs/monitoring.md index c6105188f07e..641352d972f0 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -518,6 +518,15 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/allexecutors A list of all(active and dead) executors for the given application. + + /applications/[app-id]/executorMetricsDistribution + + Distributions of peak memory metrics for executors. +
?activeOnly=[true (default) | false] lists only active executors +
?quantiles summarize the metrics with the given quantiles. +
Example: ?activeOnly=false&quantiles=0.01,0.5,0.99 + + /applications/[app-id]/storage/rdd A list of stored RDDs for the given application.