diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ce7121b806cb..cd2f2a31f708 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -511,6 +511,16 @@ To use a custom metrics.properties for the application master and executors, upd
1.6.0 |
+
+ spark.yarn.executor.bindAddress |
+ (executor hostname) |
+
+ Hostname or IP address where to bind listening sockets in YARN cluster and client mode.
+ It also allows a different address from the local one to be advertised to other
+ executors or external systems.
+ |
+ 3.5.3 |
+
spark.yarn.executor.nodeLabelExpression |
(none) |
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4fa7b66c9e5a..78a511b8e215 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -460,8 +460,9 @@ private[spark] class ApplicationMaster(
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
- val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "",
- "", executorMemory, executorCores, appId, securityMgr, localResources,
+ val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl,
+ "", "", "",
+ executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index e3fcf5472f54..484fe275ab53 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -49,6 +49,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
+ bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
@@ -117,7 +118,7 @@ private[yarn] class ExecutorRunnable(
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
- s" on host $hostname", ex)
+ s" on host $hostname ($bindAddress)", ex)
}
}
@@ -189,6 +190,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
+ "--bind-address", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5fccc8c9ff47..fc9e685f62d3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -85,6 +85,9 @@ private[yarn] class YarnAllocator(
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
+ @GuardedBy("this")
+ val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String]
+
// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
@@ -169,6 +172,8 @@ private[yarn] class YarnAllocator(
private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
+ private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS)
+
private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
@@ -743,11 +748,13 @@ private[yarn] class YarnAllocator(
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
+ val executorBindAddress = bindAddress.getOrElse(executorHostname)
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize)
logInfo(s"Launching container $containerId on host $executorHostname " +
+ s"with bind-address $executorBindAddress " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")
val rp = rpIdToResourceProfile(rpId)
@@ -772,6 +779,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
+ executorBindAddress,
executorHostname,
containerMem,
containerCores,
@@ -821,6 +829,7 @@ private[yarn] class YarnAllocator(
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
+ allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname))
launchingExecutorContainerIds.remove(containerId)
}
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
@@ -927,6 +936,7 @@ private[yarn] class YarnAllocator(
}
allocatedContainerToHostMap.remove(containerId)
+ allocatedContainerToBindAddressMap.remove(containerId)
}
containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 96ebc03bf163..684189c31e58 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -339,6 +339,13 @@ package object config extends Logging {
.stringConf
.createOptional
+ private[spark] val EXECUTOR_BIND_ADDRESS =
+ ConfigBuilder("spark.yarn.executor.bindAddress")
+ .doc("Address where to bind network listen sockets on the executor.")
+ .version("3.5.3")
+ .stringConf
+ .createOptional
+
/* Unmanaged AM configuration. */
private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
index 1ef3c9c410af..70c6c2474b60 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
@@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite {
"yarn",
"exec-1",
"localhost",
+ "localhost",
1,
1,
"application_123_1",
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index f6f2e1b11d58..6267806c7710 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -172,7 +172,6 @@ class YarnAllocatorSuite extends SparkFunSuite
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}
-
test("single container allocated") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
@@ -185,6 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)
@@ -362,7 +362,7 @@ class YarnAllocatorSuite extends SparkFunSuite
}
}
- test("container should not be created if requested number if met") {
+ test("container should not be created if requested number is met") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
@@ -868,4 +868,17 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be(0)
handler.getNumExecutorsStarting should be(0)
}
+
+ test("use requested bind-address") {
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0"))
+ handler.updateResourceRequests()
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)
+
+ handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
+ }
+
}