Skip to content
Closed
1 change: 1 addition & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ set of sinks to which metrics are reported. The following instances are currentl
* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
* `shuffleService`: The Spark shuffle service.
* `applicationMaster`: The Spark ApplicationMaster when running on YARN.

Each instance can report to zero or more _sinks_. Sinks are contained in the
`org.apache.spark.metrics.sink` package:
Expand Down
9 changes: 8 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,14 @@ To use a custom metrics.properties for the application master and executors, upd
<code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
</td>
</tr>

<tr>
<td><code>spark.yarn.metrics.namespace</code></td>
<td>(none)</td>
<td>
The root namespace for AM metrics reporting.
If it is not set then the YARN application ID is used.
</td>
</tr>
</table>

# Important notes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand All @@ -67,6 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends

private val securityMgr = new SecurityManager(sparkConf)

private var metricsSystem: Option[MetricsSystem] = None

// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
// - The user application creating a new SparkConf in cluster mode
Expand Down Expand Up @@ -309,6 +312,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
try {
metricsSystem.foreach { ms =>
ms.report()
ms.stop()
}
} catch {
case e: Exception =>
logWarning("Exception during stopping of the metric system: ", e)
}
}
}

Expand Down Expand Up @@ -434,6 +447,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
ms.start()
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.metrics.source.Source

private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: YarnAllocator)
extends Source {

override val sourceName: String = prefix + ".applicationMaster"
override val metricRegistry: MetricRegistry = new MetricRegistry()

metricRegistry.register(MetricRegistry.name("numExecutorsFailed"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumExecutorsFailed
})

metricRegistry.register(MetricRegistry.name("numExecutorsRunning"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumExecutorsRunning
})

metricRegistry.register(MetricRegistry.name("numReleasedContainers"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.getNumReleasedContainers
})

metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numLocalityAwareTasks
})

metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numContainersPendingAllocate
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,16 @@ private[yarn] class YarnAllocator(
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty

// Number of tasks that have locality preferences in active stages
private var numLocalityAwareTasks: Int = 0
private[yarn] var numLocalityAwareTasks: Int = 0

// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumReleasedContainers: Int = releasedContainers.size()

def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors

def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted
Expand All @@ -167,6 +169,10 @@ private[yarn] class YarnAllocator(
*/
def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)

def numContainersPendingAllocate: Int = synchronized {
getPendingAllocate.size
}

/**
* A sequence of pending container requests at the given location that have not yet been
* fulfilled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.BlacklistTracker
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{Clock, SystemClock}

/**
* YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100s")

private[spark] val YARN_METRICS_NAMESPACE = ConfigBuilder("spark.yarn.metrics.namespace")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add this configuration to the yarn doc?

.doc("The root namespace for AM metrics reporting.")
.stringConf
.createOptional

private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
.doc("Node label expression for the AM.")
.stringConf
Expand Down