Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,16 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>1.6.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.bindAddress</code></td>
<td><code>(executor hostname)</code></td>
<td>
Hostname or IP address where to bind listening sockets in YARN cluster and client mode.
<br />It also allows a different address from the local one to be advertised to other
executors or external systems.
</td>
<td>3.5.3</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,9 @@ private[spark] class ApplicationMaster(
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl,
"<executorId>", "<bindAddress>", "<hostname>",
executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -117,7 +118,7 @@ private[yarn] class ExecutorRunnable(
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
s" on host $hostname ($bindAddress)", ex)
}
}

Expand Down Expand Up @@ -189,6 +190,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--bind-address", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ private[yarn] class YarnAllocator(
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]

@GuardedBy("this")
val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String]

// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
Expand Down Expand Up @@ -169,6 +172,8 @@ private[yarn] class YarnAllocator(

private val isPythonApp = sparkConf.get(IS_PYTHON_APP)

private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS)

private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
Expand Down Expand Up @@ -743,11 +748,13 @@ private[yarn] class YarnAllocator(
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val executorBindAddress = bindAddress.getOrElse(executorHostname)
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"with bind-address $executorBindAddress " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")

val rp = rpIdToResourceProfile(rpId)
Expand All @@ -772,6 +779,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
executorBindAddress,
executorHostname,
containerMem,
containerCores,
Expand Down Expand Up @@ -821,6 +829,7 @@ private[yarn] class YarnAllocator(
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname))
launchingExecutorContainerIds.remove(containerId)
}
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
Expand Down Expand Up @@ -927,6 +936,7 @@ private[yarn] class YarnAllocator(
}

allocatedContainerToHostMap.remove(containerId)
allocatedContainerToBindAddressMap.remove(containerId)
}

containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val EXECUTOR_BIND_ADDRESS =
ConfigBuilder("spark.yarn.executor.bindAddress")
.doc("Address where to bind network listen sockets on the executor.")
.version("3.5.3")
.stringConf
.createOptional

/* Unmanaged AM configuration. */

private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite {
"yarn",
"exec-1",
"localhost",
"localhost",
1,
1,
"application_123_1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ class YarnAllocatorSuite extends SparkFunSuite
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}


test("single container allocated") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
Expand All @@ -185,6 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)

Expand Down Expand Up @@ -362,7 +362,7 @@ class YarnAllocatorSuite extends SparkFunSuite
}
}

test("container should not be created if requested number if met") {
test("container should not be created if requested number is met") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
Expand Down Expand Up @@ -868,4 +868,17 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be(0)
handler.getNumExecutorsStarting should be(0)
}

test("use requested bind-address") {
val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0"))
handler.updateResourceRequests()

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)

handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
}

}