diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6eaf33135744..2717dd091c75 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl
* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
* `shuffleService`: The Spark shuffle service.
+* `applicationMaster`: The Spark ApplicationMaster when running on YARN.
Each instance can report to zero or more _sinks_. Sinks are contained in the
`org.apache.spark.metrics.sink` package:
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 0b265b0cb1b3..1c1f40c028a9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -421,7 +421,14 @@ To use a custom metrics.properties for the application master and executors, upd
spark.blacklist.application.maxFailedExecutorsPerNode.
-
+
+ spark.yarn.metrics.namespace |
+ (none) |
+
+ The root namespace for AM metrics reporting.
+ If it is not set then the YARN application ID is used.
+ |
+
# Important notes
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ecc576910db9..55ed114f8500 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -43,6 +43,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -67,6 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
private val securityMgr = new SecurityManager(sparkConf)
+ private var metricsSystem: Option[MetricsSystem] = None
+
// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
@@ -309,6 +312,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
+ } finally {
+ try {
+ metricsSystem.foreach { ms =>
+ ms.report()
+ ms.stop()
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Exception during stopping of the metric system: ", e)
+ }
}
}
@@ -434,6 +447,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
allocator.allocateResources()
+ val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
+ val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
+ ms.registerSource(new ApplicationMasterSource(prefix, allocator))
+ ms.start()
+ metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
new file mode 100644
index 000000000000..0fec91658260
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: YarnAllocator)
+ extends Source {
+
+ override val sourceName: String = prefix + ".applicationMaster"
+ override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+ metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] {
+ override def getValue: Int = yarnAllocator.getNumExecutorsFailed
+ })
+
+ metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] {
+ override def getValue: Int = yarnAllocator.getNumExecutorsRunning
+ })
+
+ metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] {
+ override def getValue: Int = yarnAllocator.getNumReleasedContainers
+ })
+
+ metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
+ override def getValue: Int = yarnAllocator.numLocalityAwareTasks
+ })
+
+ metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
+ override def getValue: Int = yarnAllocator.numContainersPendingAllocate
+ })
+
+}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index fae054e0eea0..40f1222fcd83 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -150,7 +150,7 @@ private[yarn] class YarnAllocator(
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
// Number of tasks that have locality preferences in active stages
- private var numLocalityAwareTasks: Int = 0
+ private[yarn] var numLocalityAwareTasks: Int = 0
// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
@@ -158,6 +158,8 @@ private[yarn] class YarnAllocator(
def getNumExecutorsRunning: Int = runningExecutors.size()
+ def getNumReleasedContainers: Int = releasedContainers.size()
+
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted
@@ -167,6 +169,10 @@ private[yarn] class YarnAllocator(
*/
def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
+ def numContainersPendingAllocate: Int = synchronized {
+ getPendingAllocate.size
+ }
+
/**
* A sequence of pending container requests at the given location that have not yet been
* fulfilled.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index 1b48a0ee7ad3..ceac7cda5f8b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.BlacklistTracker
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{Clock, SystemClock}
/**
* YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 129084a86597..1013fd2cc4a8 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -152,6 +152,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100s")
+ private[spark] val YARN_METRICS_NAMESPACE = ConfigBuilder("spark.yarn.metrics.namespace")
+ .doc("The root namespace for AM metrics reporting.")
+ .stringConf
+ .createOptional
+
private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
.doc("Node label expression for the AM.")
.stringConf