Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonSerialize

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.metrics.ExecutorMetricType

/**
* :: DeveloperApi ::
* Metrics Distributions tracked for executors and the driver.
*/
@DeveloperApi
@JsonSerialize(using = classOf[ExecutorMetricsDistributionJsonSerializer])
class ExecutorMetricsDistributions private[spark](
val quantiles: IndexedSeq[Double],
val executorMetrics: IndexedSeq[ExecutorMetrics]) {
private lazy val count = executorMetrics.length
private lazy val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

/** Returns the distributions for the specified metric. */
def getMetricDistribution(metricName: String): IndexedSeq[Double] = {
val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted
indices.map(i => sorted(i.toInt).toDouble).toIndexedSeq
}
}

/**
* Serializer for ExecutorMetricsDistributions
* Convert to map with metric name as key
*/
private[spark] class ExecutorMetricsDistributionJsonSerializer
extends JsonSerializer[ExecutorMetricsDistributions] {
override def serialize(
metrics: ExecutorMetricsDistributions,
jsonGenerator: JsonGenerator,
serializerProvider: SerializerProvider): Unit = {
val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
metric -> metrics.getMetricDistribution(metric)
}
jsonGenerator.writeObject(metricsMap)
}
}
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.executor.ExecutorMetricsDistributions
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -365,6 +366,18 @@ private[spark] class AppStatusStore(
Some(computedQuantiles)
}

/**
* Calculates a summary of the executor metrics for executors, returning the
* requested quantiles for the recorded metrics.
*/
def executorMetricSummary(
activeOnly: Boolean,
unsortedQuantiles: Array[Double]): Option[ExecutorMetricsDistributions] = {
val quantiles = unsortedQuantiles.sorted
val executors = executorList(activeOnly).flatMap(_.peakMemoryMetrics).toIndexedSeq
Some(new ExecutorMetricsDistributions(quantiles, executors))
}

/**
* Whether to cache information about a specific metric quantile. We cache quantiles at every 0.05
* step, which covers the default values used both in the API and in the stages page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
import scala.util.control.NonFatal

import org.apache.spark.{JobExecutionStatus, SparkContext}
import org.apache.spark.executor.ExecutorMetricsDistributions
import org.apache.spark.status.api.v1
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -52,6 +53,25 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("executors")
def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true))

@GET
@Path("executorMetricsDistribution")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorPeakMemoryMetricsDistribution?

def executorSummary(
@QueryParam("activeOnly") @DefaultValue("true") activeOnly: Boolean,
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 0.05,0.25,0.5,0.75,0.95? In stage page, Spark shows the quantiles of Min/Max

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI. In the corresponding web UI, the quantiles of Min/Max are displayed in the table "Summary Metrics for Completed Tasks" for a given stage page. In a parallel system, the duration of a stage is often determined by the slowest task/executor. To monitor/debug a skew issue, the maximal value (or 100% percentile value) is more useful than the 95% percentile value. On the other hand, 95% percentile value has been used in the past. One wise man once told me: Consistency means to repeat yesterday's mistake.

: ExecutorMetricsDistributions = withUI { ui =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

val quantiles = quantileString.split(",").map { s =>
try {
s.toDouble
} catch {
case nfe: NumberFormatException =>
throw new BadParameterException("quantiles", "double", s)
}
}

ui.store.executorMetricSummary(activeOnly, quantiles).getOrElse(
throw new NotFoundException(s"No executor reported metrics yet."))
}

@GET
@Path("executors/{executorId}/threads")
def threadDump(@PathParam("executorId") execId: String): Array[ThreadStackTrace] = withUI { ui =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"JVMHeapMemory" : [ 2.09883992E8, 4.6213568E8, 7.5947948E8, 9.8473656E8, 9.8473656E8 ],
"JVMOffHeapMemory" : [ 6.0829472E7, 6.1343616E7, 6.271752E7, 9.1926448E7, 9.1926448E7 ],
"OnHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"OffHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"OnHeapStorageMemory" : [ 7023.0, 12537.0, 19560.0, 19560.0, 19560.0 ],
"OffHeapStorageMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"OnHeapUnifiedMemory" : [ 7023.0, 12537.0, 19560.0, 19560.0, 19560.0 ],
"OffHeapUnifiedMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"DirectPoolMemory" : [ 10742.0, 10865.0, 12781.0, 157182.0, 157182.0 ],
"MappedPoolMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreeJVMVMemory" : [ 8.296026112E9, 9.678606336E9, 9.684373504E9, 9.691553792E9, 9.691553792E9 ],
"ProcessTreeJVMRSSMemory" : [ 5.26491648E8, 7.03639552E8, 9.64222976E8, 1.210867712E9, 1.210867712E9 ],
"ProcessTreePythonVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreePythonRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreeOtherVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"ProcessTreeOtherRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"MinorGCCount" : [ 7.0, 15.0, 24.0, 27.0, 27.0 ],
"MinorGCTime" : [ 55.0, 106.0, 140.0, 145.0, 145.0 ],
"MajorGCCount" : [ 2.0, 2.0, 2.0, 3.0, 3.0 ],
"MajorGCTime" : [ 60.0, 63.0, 75.0, 144.0, 144.0 ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"executor node excludeOnFailure unexcluding" ->
"applications/app-20161115172038-0000/executors",
"executor memory usage" -> "applications/app-20161116163331-0000/executors",
"executor peak memory metrics distributions" ->
"applications/application_1553914137147_0018/executorMetricsDistribution",
"executor resource information" -> "applications/application_1555004656427_0144/executors",
"multiple resource profiles" -> "applications/application_1578436911597_0052/environment",
"stage list with peak metrics" -> "applications/app-20200706201101-0003/stages",
Expand Down
9 changes: 9 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,15 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<td><code>/applications/[app-id]/allexecutors</code></td>
<td>A list of all(active and dead) executors for the given application.</td>
</tr>
<tr>
<td><code>/applications/[app-id]/executorMetricsDistribution</code></td>
<td>
Distributions of peak memory metrics for executors.
<br><code>?activeOnly=[true (default) | false]</code> lists only active executors
<br><code>?quantiles</code> summarize the metrics with the given quantiles.
<br>Example: <code>?activeOnly=false&quantiles=0.01,0.5,0.99</code>
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/storage/rdd</code></td>
<td>A list of stored RDDs for the given application.</td>
Expand Down