Skip to content
Closed
37 changes: 37 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,43 @@ To use a custom metrics.properties for the application master and executors, upd
Use lower-case suffixes, e.g. <code>k</code>, <code>m</code>, <code>g</code>, <code>t</code>, and <code>p</code>, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.resource.{resource-type}</code></td>
<td><code>(none)</code></td>
<td>
Amount of resource to use for the YARN Application Master in client mode.
In cluster mode, use <code>spark.yarn.driver.resource.&lt;resource-type&gt;</code> instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like spark.yarn.driver.resource.&lt;resource-type&gt; should be spark.yarn.driver.resource.{resource-type}

(yes, I realize resource-type is to be replaced with)

Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
<p/>
Example:
To request GPU resources from YARN, use: <code>spark.yarn.am.resource.yarn.io/gpu</code>
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.resource.{resource-type}</code></td>
<td><code>(none)</code></td>
<td>
Amount of resource to use for the YARN Application Master in cluster mode.
Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
<p/>
Example:
To request GPU resources from YARN, use: <code>spark.yarn.driver.resource.yarn.io/gpu</code>
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.resource.{resource-type}</code></td>
<td><code>(none)</code></td>
<td>
Amount of resource to use per executor process.
Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
<p/>
Example:
To request GPU resources from YARN, use: <code>spark.yarn.executor.resource.yarn.io/gpu</code>
</td>
</tr>
<tr>
<td><code>spark.yarn.am.cores</code></td>
<td><code>1</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
ResourceRequestHelper.validateResources(sparkConf)

var appId: ApplicationId = null
try {
launcherBackend.connect()
Expand Down Expand Up @@ -234,6 +236,13 @@ private[spark] class Client(
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val amResources =
if (isClusterMode) {
sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap
} else {
sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
}
logDebug(s"AM resources: $amResources")
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
Expand All @@ -256,6 +265,10 @@ private[spark] class Client(
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
if (amResources.nonEmpty) {
ResourceRequestHelper.setResourceRequests(amResources, capability)
}
logDebug(s"Created resource capability for AM request: $capability")

sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import java.lang.{Long => JLong}
import java.lang.reflect.InvocationTargetException

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.yarn.api.records.Resource

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

/**
* This helper class uses some of Hadoop 3 methods from the YARN API,
* so we need to use reflection to avoid compile error when building against Hadoop 2.x
*/
private object ResourceRequestHelper extends Logging {
private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation"

/**
* Validates sparkConf and throws a SparkException if any of standard resources (memory or cores)
* is defined with the property spark.yarn.x.resource.y
* Need to reject all combinations of AM / Driver / Executor and memory / CPU cores resources, as
* Spark has its own names for them (memory, cores),
* but YARN have its names too: (memory, memory-mb, mb) and (cores, vcores, cpu-vcores).
* We need to disable every possible way YARN could receive the resource definitions above.
*/
def validateResources(sparkConf: SparkConf): Unit = {
val resourceDefinitions = Seq[(String, String)](
(AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went and looked at the documentation because I remember this being confusing. The documentation mentions both memory and memory-mb as being valid, with the latter being preferred. So it sounds to me like you can use either, and that this code should disallow both.

You even initialize memory-mb in your tests, instead of memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still waiting for a word on this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!
Did you mean this documentation?
https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
I think it's required to check all the keys for memory / vcore that YARN deprecates, as those will flow trough Spark and eventually reach YARN's ResourceInformation and it will just blow up as only memory-mb and vcores are the ones that are not deprecated. The reason why it haven't caused a problem with current Spark code as it is using the Resource object and not using ResourceInformation at all.
So we need to disallow these:

  • cpu-vcores
  • memory
  • mb

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the YARN code or what it does here.

I'm just worried about users setting cpu/memory resources outside of the proper Spark settings, and also the inconsistency in your code (using both memory and memory-mb).

Copy link
Author

@szilard-nemeth szilard-nemeth Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are two separate things:

  1. One is that I don't reject all the deprecated standard resources has been known to YARN (explained in previous comment) which I will address soon. To be exact, I need to reject not just the deprecateds, but all possible ways to define standard resources for the memory and CPU cores.
  2. Using memory-mb is the only way to initialize the memory resource with the YARN client, with the method ResourceUtils.reinitializeResources.
    I played around with this a bit, if I omit the standard resources and try to specify custom resources and then call ResourceUtils.reinitializeResources, an internal YARN exception will be thrown as it relies on the fact that when you invoke this method, you always specify the standard resources, too.
    Unfortunately, invoking this method is the most simple way to build tests upon custom resource types, to my best knowledge, so I can't really do much about this.

and also the inconsistency in your code (using both memory and memory-mb).

What did you mean with this? The only use of "memory" all around the change is to prevent it from being used with the new resource configs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What did you mean with this?

I meant you were initializing memory-mb in tests but checking only memory here. That smells like you should be checking memory-mb here.

There kinds of things should have comments in the code so in the future we know why they are that way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my last commit with the updates.
I only added some tests, so they are not extensive for every combination of spark resources and YARN standard resources. If you think I can add more testcases but I think this is fine as it is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, adding some explanatory comments with my next commit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is now complete, please check!

(DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory"),
(EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory"),
(AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "mb"),
(DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "mb"),
(EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb"),
(AM_MEMORY.key, YARN_AM_RESOURCE_TYPES_PREFIX + "memory-mb"),
(DRIVER_MEMORY.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "memory-mb"),
(EXECUTOR_MEMORY.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb"),
(AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cores"),
(DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cores"),
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cores"),
(AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "vcores"),
(DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "vcores"),
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores"),
(AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"))
val errorMessage = new mutable.StringBuilder()

resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
if (sparkConf.contains(resourceRequest)) {
errorMessage.append(s"Error: Do not use $resourceRequest, " +
s"please use $sparkName instead!\n")
}
}

if (errorMessage.nonEmpty) {
throw new SparkException(errorMessage.toString())
}
}

/**
* Sets resource amount with the corresponding unit to the passed resource object.
* @param resources resource values to set
* @param resource resource object to update
*/
def setResourceRequests(
resources: Map[String, String],
resource: Resource): Unit = {
require(resource != null, "Resource parameter should not be null!")

logDebug(s"Custom resources requested: $resources")
if (!isYarnResourceTypesAvailable()) {
if (resources.nonEmpty) {
logWarning("Ignoring custom resource requests because " +
"the version of YARN does not support it!")
}
return
}

val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS)
val setResourceInformationMethod =
resource.getClass.getMethod("setResourceInformation", classOf[String], resInfoClass)
resources.foreach { case (name, rawAmount) =>
try {
val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
val amount = amountPart.toLong
val unit = unitPart match {
case "g" => "G"
case "t" => "T"
case "p" => "P"
case _ => unitPart
}
logDebug(s"Registering resource with name: $name, amount: $amount, unit: $unit")
val resourceInformation = createResourceInformation(name, amount, unit, resInfoClass)
setResourceInformationMethod.invoke(
resource, name, resourceInformation.asInstanceOf[AnyRef])
} catch {
case _: MatchError =>
throw new IllegalArgumentException(s"Resource request for '$name' ('$rawAmount') " +
s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
case e: InvocationTargetException if e.getCause != null => throw e.getCause
}
}
}

private def createResourceInformation(
resourceName: String,
amount: Long,
unit: String,
resInfoClass: Class[_]): Any = {
val resourceInformation =
if (unit.nonEmpty) {
val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
classOf[String], classOf[String], JLong.TYPE)
resInfoNewInstanceMethod.invoke(null, resourceName, unit, amount.asInstanceOf[JLong])
} else {
val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
classOf[String], JLong.TYPE)
resInfoNewInstanceMethod.invoke(null, resourceName, amount.asInstanceOf[JLong])
}
resourceInformation
}

/**
* Checks whether Hadoop 2.x or 3 is used as a dependency.
* In case of Hadoop 3 and later, the ResourceInformation class
* should be available on the classpath.
*/
def isYarnResourceTypesAvailable(): Boolean = {
Try(Utils.classForName(RESOURCE_INFO_CLASS)).isSuccess
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,18 @@ private[yarn] class YarnAllocator(
}
// Number of cores per executor.
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(
executorMemory + memoryOverhead + pysparkWorkerMemory,
executorCores)

private val executorResourceRequests =
sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap

// Resource capability requested for each executor
private[yarn] val resource: Resource = {
val resource = Resource.newInstance(
executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource
}

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
Expand Down Expand Up @@ -288,9 +296,16 @@ private[yarn] class YarnAllocator(
s"executorsStarting: ${numExecutorsStarting.get}")

if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
executorResourceRequests.nonEmpty) {
requestContainerMessage ++= s" with custom resources: " + resource.toString
}
logInfo(requestContainerMessage)
}

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
Expand Down Expand Up @@ -456,13 +471,20 @@ private[yarn] class YarnAllocator(
// memory, but use the asked vcore count for matching, effectively disabling matching on vcore
// count.
val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
resource.getVirtualCores)
resource.getVirtualCores)

ResourceRequestHelper.setResourceRequests(executorResourceRequests, matchingResource)

logDebug(s"Calling amClient.getMatchingRequests with parameters: " +
s"priority: ${allocatedContainer.getPriority}, " +
s"location: $location, resource: $matchingResource")
val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
matchingResource)

// Match the allocation to a request
if (!matchingRequests.isEmpty) {
val containerRequest = matchingRequests.get(0).iterator.next
logDebug(s"Removing container request via AM client: $containerRequest")
amClient.removeContainerRequest(containerRequest)
containersToUse += allocatedContainer
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,4 +345,8 @@ package object config {
.booleanConf
.createWithDefault(false)

private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource."
private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap => MutableHashMap}
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -199,6 +200,20 @@ class ClientSuite extends SparkFunSuite with Matchers {
appContext.getMaxAppAttempts should be (42)
}

test("resource request (client mode)") {
val sparkConf = new SparkConf().set("spark.submit.deployMode", "client")
.set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "2")
.set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "3")
testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 2), ("gpu", 3)))
}

test("resource request (cluster mode)") {
val sparkConf = new SparkConf().set("spark.submit.deployMode", "cluster")
.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "4")
.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "gpu", "5")
testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 4), ("gpu", 5)))
}

test("spark.yarn.jars with multiple paths and globs") {
val libs = Utils.createTempDir()
val single = Utils.createTempDir()
Expand Down Expand Up @@ -433,4 +448,30 @@ class ClientSuite extends SparkFunSuite with Matchers {
classpath(env)
}

private def testResourceRequest(
sparkConf: SparkConf,
resources: List[String],
expectedResources: Seq[(String, Long)]): Unit = {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(resources)

val args = new ClientArguments(Array())

val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])

val client = new Client(args, sparkConf)
client.createApplicationSubmissionContext(
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)

appContext.getAMContainerSpec should be (containerLaunchContext)
appContext.getApplicationType should be ("SPARK")

expectedResources.foreach { case (name, value) =>
ResourceRequestTestHelper.getResourceTypeValue(appContext.getResource, name) should be (value)
}
}

}
Loading