Tip
|
Read up RpcEnv — RPC Environment on the concept of RPC Environment in Spark. |
The class org.apache.spark.rpc.netty.NettyRpcEnv is the implementation of RpcEnv using Netty - "an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients".
Netty-based RPC Environment is created by NettyRpcEnvFactory
when spark.rpc is netty
or org.apache.spark.rpc.netty.NettyRpcEnvFactory
.
It uses Java’s built-in serialization (the implementation of JavaSerializerInstance
).
Caution
|
FIXME What other choices of JavaSerializerInstance are available in Spark? |
NettyRpcEnv is only started on the driver. See Client Mode.
The default port to listen to is 7077
.
When NettyRpcEnv starts, the following INFO message is printed out in the logs:
INFO Utils: Successfully started service 'NettyRpcEnv' on port 0.
Tip
|
Set
FIXME: The message above in TransportServer has a space before |
Refer to Client Mode = is this an executor or the driver? for introduction about client mode. This is only for Netty-based RpcEnv.
When created, a Netty-based RpcEnv starts the RPC server and register necessary endpoints for non-client mode, i.e. when client mode is false
.
Caution
|
FIXME What endpoints? |
It means that the required services for remote communication with NettyRpcEnv are only started on the driver (not executors).
EventLoopGroup
uses a daemon thread pool called shuffle-server-ID
, where ID
is a unique integer for NioEventLoopGroup
(NIO
) or EpollEventLoopGroup
(EPOLL
) for the Shuffle server.
Caution
|
FIXME Review Netty’s NioEventLoopGroup .
|
Caution
|
FIXME Where are SO_BACKLOG , SO_RCVBUF , SO_SNDBUF channel options used?
|
NettyRpcEnv’s Dispatcher uses the daemon fixed thread pool with spark.rpc.netty.dispatcher.numThreads threads.
Thread names are formatted as dispatcher-event-loop-ID
, where ID
is a unique, sequentially assigned integer.
It starts the message processing loop on all of the threads.
NettyRpcEnv uses the daemon single-thread scheduled thread pool netty-rpc-env-timeout
.
"netty-rpc-env-timeout" #87 daemon prio=5 os_prio=31 tid=0x00007f887775a000 nid=0xc503 waiting on condition [0x0000000123397000]
NettyRpcEnv uses the daemon cached thread pool with up to spark.rpc.connect.threads threads.
Thread names are formatted as netty-rpc-connection-ID
, where ID
is a unique, sequentially assigned integer.
The Netty-based implementation uses the following properties:
-
spark.rpc.io.mode
(default:NIO
) -NIO
orEPOLL
for low-level IO.NIO
is always available, whileEPOLL
is only available on Linux.NIO
usesio.netty.channel.nio.NioEventLoopGroup
whileEPOLL
io.netty.channel.epoll.EpollEventLoopGroup
. -
spark.shuffle.io.numConnectionsPerPeer
always equals1
-
spark.rpc.io.threads
(default:0
; maximum:8
) - the number of threads to use for the Netty client and server thread pools.-
spark.shuffle.io.serverThreads
(default: the value ofspark.rpc.io.threads
) -
spark.shuffle.io.clientThreads
(default: the value ofspark.rpc.io.threads
)
-
-
spark.rpc.netty.dispatcher.numThreads
(default: the number of processors available to JVM) -
spark.rpc.connect.threads
(default:64
) - used in cluster mode to communicate with a remote RPC endpoint -
spark.port.maxRetries
(default:16
or100
for testing whenspark.testing
is set) controls the maximum number of binding attempts/retries to a port before giving up.
-
endpoint-verifier
(RpcEndpointVerifier
) - a RpcEndpoint for remote RpcEnvs to query whether an RpcEndpoint exists or not. It usesDispatcher
that keeps track of registered endpoints and respondstrue
/false
toCheckExistence
message.
endpoint-verifier
is used to check out whether a given endpoint exists or not before the endpoint’s reference is given back to clients.
One use case is when an AppClient connects to standalone Masters before it registers the application it acts for.
Caution
|
FIXME Who’d like to use endpoint-verifier and how?
|
A message dispatcher is responsible for routing RPC messages to the appropriate endpoint(s).
It uses the daemon fixed thread pool dispatcher-event-loop
with spark.rpc.netty.dispatcher.numThreads
threads for dispatching messages.
"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=31 tid=0x00007f8877153800 nid=0x7103 waiting on condition [0x000000011f78b000]