diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index d149f9196b345..efb87c87c4131 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -514,6 +514,14 @@ To use a custom metrics.properties for the application master and executors, upd
1.6.0 |
+
+ spark.yarn.executor.bindAddress.mode |
+ HOSTNAME |
+
+ Configures executor behaviour of which network to listen sockets to. Possible choices are: HOSTNAME means to bind to hostname, ALL_IPS means to bind to 0.0.0.0
+ |
+ 4.0.0 |
+
spark.yarn.executor.nodeLabelExpression |
(none) |
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 11d22a3225d8a..ea7bb30dbe98e 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -462,8 +462,9 @@ private[spark] class ApplicationMaster(
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
- val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "",
- "", executorMemory, executorCores, appId, securityMgr, localResources,
+ val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl,
+ "", "", "",
+ executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 983ab5b4341b8..0fa871951a990 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -50,6 +50,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
+ bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
@@ -120,7 +121,7 @@ private[yarn] class ExecutorRunnable(
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
- s" on host $hostname", ex)
+ s" on host $hostname ($bindAddress)", ex)
}
}
@@ -192,6 +193,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
+ "--bind-address", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index c86195d0ef31e..7b52e7de8b96c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -87,6 +87,9 @@ private[yarn] class YarnAllocator(
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
+ @GuardedBy("this")
+ val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String]
+
// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
@@ -173,6 +176,8 @@ private[yarn] class YarnAllocator(
private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
+ private val bindAddressMode = sparkConf.get(EXECUTOR_BIND_ADDRESS_MODE)
+
private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
@@ -757,12 +762,17 @@ private[yarn] class YarnAllocator(
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
+ val executorBindAddress = bindAddressMode match {
+ case "ALL_IPS" => "0.0.0.0"
+ case _ => executorHostname
+ }
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize)
logInfo(log"Launching container ${MDC(LogKeys.CONTAINER_ID, containerId)} " +
- log"on host ${MDC(LogKeys.HOST, executorHostname)} for " +
+ log"on host ${MDC(LogKeys.HOST, executorHostname)} " +
+ log"with bind-address ${MDC(LogKeys.BIND_ADDRESS, executorBindAddress)} for " +
log"executor with ID ${MDC(LogKeys.EXECUTOR_ID, executorId)} for " +
log"ResourceProfile Id ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}")
@@ -788,6 +798,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
+ executorBindAddress,
executorHostname,
containerMem,
containerCores,
@@ -838,6 +849,10 @@ private[yarn] class YarnAllocator(
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
+ allocatedContainerToBindAddressMap.put(containerId, bindAddressMode match {
+ case "ALL_IPS" => "0.0.0.0"
+ case _ => executorHostname
+ })
launchingExecutorContainerIds.remove(containerId)
}
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
@@ -952,6 +967,7 @@ private[yarn] class YarnAllocator(
}
allocatedContainerToHostMap.remove(containerId)
+ allocatedContainerToBindAddressMap.remove(containerId)
}
containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 51e5e0bfb9087..ec75ad897be3f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.util.Properties
+import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
import org.apache.spark.internal.Logging
@@ -339,6 +339,17 @@ package object config extends Logging {
.stringConf
.createOptional
+ private[spark] val EXECUTOR_BIND_ADDRESS_MODE =
+ ConfigBuilder("spark.yarn.executor.bindAddress.mode")
+ .doc("Configures executor behaviour of which network to listen sockets to. " +
+ "Possible choices are: HOSTNAME means to bind to hostname, " +
+ "ALL_IPS means to bind to 0.0.0.0")
+ .version("4.0.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set("HOSTNAME", "ALL_IPS"))
+ .createWithDefault("HOSTNAME")
+
/* Unmanaged AM configuration. */
private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
index 1ef3c9c410af9..70c6c2474b603 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala
@@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite {
"yarn",
"exec-1",
"localhost",
+ "localhost",
1,
1,
"application_123_1",
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 09e35a308728c..8b172baffb210 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -173,7 +173,6 @@ class YarnAllocatorSuite extends SparkFunSuite
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}
-
test("single container allocated") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
@@ -186,6 +185,7 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)
@@ -364,7 +364,7 @@ class YarnAllocatorSuite extends SparkFunSuite
}
}
- test("container should not be created if requested number if met") {
+ test("container should not be created if requested number is met") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
@@ -904,4 +904,29 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be(0)
handler.getNumExecutorsStarting should be(0)
}
+
+ test("use 0.0.0.0 when requested bind-address mode is ALL_IPS") {
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "ALL_IPS"))
+ handler.updateResourceRequests()
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)
+
+ handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
+ }
+
+ test("use hostname when requested bind-address mode is HOSTNAME") {
+ val (handler, _) = createAllocator(maxExecutors = 1,
+ additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "HOSTNAME"))
+ handler.updateResourceRequests()
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)
+
+ handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
+ }
+
}