Skip to content
28 changes: 19 additions & 9 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,6 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.failuresValidityInterval</code></td>
<td>(none)</td>
<td>
Defines the validity interval for executor failure tracking.
Executor failures which are older than the validity interval will be ignored.
</td>
<td>2.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
<td><code>true</code></td>
Expand All @@ -528,6 +519,25 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>1.6.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.bindAddress</code></td>
<td><code>(executor hostname)</code></td>
<td>
Hostname or IP address where to bind listening sockets in YARN cluster mode.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why YARN cluster mode? The change affects client mode as well, doesn't it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm true, will benefit client mode as well

<br />It also allows a different address from the local one to be advertised to other
executors or external systems.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.failuresValidityInterval</code></td>
<td>(none)</td>
<td>
Defines the validity interval for executor failure tracking.
Executor failures which are older than the validity interval will be ignored.
</td>
<td>2.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,9 @@ private[spark] class ApplicationMaster(
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl,
"<executorId>", "<bindAddress>", "<hostname>",
executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -117,7 +118,7 @@ private[yarn] class ExecutorRunnable(
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
s" on host $hostname ($bindAddress)", ex)
}
}

Expand Down Expand Up @@ -189,6 +190,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--bind-address", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ private[yarn] class YarnAllocator(
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]

@GuardedBy("this")
val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String]

// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
Expand Down Expand Up @@ -169,6 +172,8 @@ private[yarn] class YarnAllocator(

private val isPythonApp = sparkConf.get(IS_PYTHON_APP)

private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS)

private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
Expand Down Expand Up @@ -734,12 +739,13 @@ private[yarn] class YarnAllocator(
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val executorBindAddress = bindAddress.getOrElse(executorHostname)
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")
s"($executorBindAddress) for executor with ID $executorId for ResourceProfile Id $rpId")

val rp = rpIdToResourceProfile(rpId)
val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
Expand All @@ -763,6 +769,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
executorBindAddress,
executorHostname,
containerMem,
containerCores,
Expand Down Expand Up @@ -812,6 +819,7 @@ private[yarn] class YarnAllocator(
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname))
launchingExecutorContainerIds.remove(containerId)
}
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
Expand Down Expand Up @@ -918,6 +926,7 @@ private[yarn] class YarnAllocator(
}

allocatedContainerToHostMap.remove(containerId)
allocatedContainerToBindAddressMap.remove(containerId)
}

containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val EXECUTOR_BIND_ADDRESS =
ConfigBuilder("spark.yarn.executor.bindAddress")
.doc("Address where to bind network listen sockets on the executor.")
.version("4.0.0")
.stringConf
.createOptional

/* Unmanaged AM configuration. */

private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite {
"yarn",
"exec-1",
"localhost",
"localhost",
1,
1,
"application_123_1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ class YarnAllocatorSuite extends SparkFunSuite
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}


test("single container allocated") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
Expand All @@ -185,6 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)

Expand Down Expand Up @@ -362,7 +362,7 @@ class YarnAllocatorSuite extends SparkFunSuite
}
}

test("container should not be created if requested number if met") {
test("container should not be created if requested number is met") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
Expand Down Expand Up @@ -868,4 +868,17 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be(0)
handler.getNumExecutorsStarting should be(0)
}

test("use requested bind-address") {
val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0"))
handler.updateResourceRequests()

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))

handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
}

}