Skip to content

Commit 71d1c90

Browse files
author
Marcelo Vanzin
committed
[SPARK-10997][CORE] Add "client mode" to netty rpc env.
"Client mode" means the RPC env will not listen for incoming connections. This allows certain processes in the Spark stack (such as Executors or tha YARN client-mode AM) to act as pure clients when using the netty-based RPC backend, reducing the number of sockets needed by the app and also the number of open ports. Client connections are also preferred when endpoints that actually have a listening socket are involved; so, for example, if a Worker connects to a Master and the Master needs to send a message to a Worker endpoint, that client connection will be used, even though the Worker is also listening for incoming connections. With this change, the workaround for SPARK-10987 isn't necessary anymore, and is removed. The AM connects to the driver in "client mode", and that connection is used for all driver <-> AM communication, and so the AM is properly notified when the connection goes down. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9210 from vanzin/SPARK-10997.
1 parent a930e62 commit 71d1c90

File tree

17 files changed

+266
-190
lines changed

17 files changed

+266
-190
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ object SparkEnv extends Logging {
252252

253253
// Create the ActorSystem for Akka and get the port it binds to.
254254
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
255-
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
255+
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
256+
clientMode = !isDriver)
256257
val actorSystem: ActorSystem =
257258
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
258259
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
@@ -262,9 +263,11 @@ object SparkEnv extends Logging {
262263
}
263264

264265
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
266+
// In the non-driver case, the RPC env's address may be null since it may not be listening
267+
// for incoming connections.
265268
if (isDriver) {
266269
conf.set("spark.driver.port", rpcEnv.address.port.toString)
267-
} else {
270+
} else if (rpcEnv.address != null) {
268271
conf.set("spark.executor.port", rpcEnv.address.port.toString)
269272
}
270273

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ private[spark] class CoarseGrainedExecutorBackend(
4545
env: SparkEnv)
4646
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
4747

48-
Utils.checkHostPort(hostPort, "Expected hostport")
49-
5048
var executor: Executor = null
5149
@volatile var driver: Option[RpcEndpointRef] = None
5250

@@ -80,9 +78,8 @@ private[spark] class CoarseGrainedExecutorBackend(
8078
}
8179

8280
override def receive: PartialFunction[Any, Unit] = {
83-
case RegisteredExecutor =>
81+
case RegisteredExecutor(hostname) =>
8482
logInfo("Successfully registered with driver")
85-
val (hostname, _) = Utils.parseHostPort(hostPort)
8683
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
8784

8885
case RegisterExecutorFailed(message) =>
@@ -163,7 +160,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
163160
hostname,
164161
port,
165162
executorConf,
166-
new SecurityManager(executorConf))
163+
new SecurityManager(executorConf),
164+
clientMode = true)
167165
val driver = fetcher.setupEndpointRefByURI(driverUrl)
168166
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
169167
Seq[(String, String)](("spark.app.id", appId))
@@ -188,12 +186,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
188186
val env = SparkEnv.createExecutorEnv(
189187
driverConf, executorId, hostname, port, cores, isLocal = false)
190188

191-
// SparkEnv sets spark.driver.port so it shouldn't be 0 anymore.
192-
val boundPort = env.conf.getInt("spark.executor.port", 0)
193-
assert(boundPort != 0)
194-
195-
// Start the CoarseGrainedExecutorBackend endpoint.
196-
val sparkHostPort = hostname + ":" + boundPort
189+
// SparkEnv will set spark.executor.port if the rpc env is listening for incoming
190+
// connections (e.g., if it's using akka). Otherwise, the executor is running in
191+
// client mode only, and does not accept incoming connections.
192+
val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
193+
hostname + ":" + port
194+
}.orNull
197195
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
198196
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
199197
workerUrl.foreach { url =>

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ private[spark] object RpcEnv {
4343
host: String,
4444
port: Int,
4545
conf: SparkConf,
46-
securityManager: SecurityManager): RpcEnv = {
46+
securityManager: SecurityManager,
47+
clientMode: Boolean = false): RpcEnv = {
4748
// Using Reflection to create the RpcEnv to avoid to depend on Akka directly
48-
val config = RpcEnvConfig(conf, name, host, port, securityManager)
49+
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
4950
getRpcEnvFactory(conf).create(config)
5051
}
5152
}
@@ -139,4 +140,5 @@ private[spark] case class RpcEnvConfig(
139140
name: String,
140141
host: String,
141142
port: Int,
142-
securityManager: SecurityManager)
143+
securityManager: SecurityManager,
144+
clientMode: Boolean)

core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
5555
private var stopped = false
5656

5757
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
58-
val addr = new RpcEndpointAddress(nettyEnv.address.host, nettyEnv.address.port, name)
58+
val addr = RpcEndpointAddress(nettyEnv.address, name)
5959
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
6060
synchronized {
6161
if (stopped) {

0 commit comments

Comments
 (0)