Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Oct 8, 2019

The current RPC backend in Spark supports single- and multi-threaded
message delivery to endpoints, but they all share the same underlying
thread pool. So an RPC endpoint that blocks a dispatcher thread can
negatively affect other endpoints.

This can be more pronounced with configurations that limit the number
of RPC dispatch threads based on configuration and / or running
environment. And exposing the RPC layer to other code (for example
with something like SPARK-29396) could make it easy to affect normal
Spark operation with a badly written RPC handler.

This change adds a new RPC endpoint type that tells the RPC env to
create dedicated dispatch threads, so that those effects are minimised.
Other endpoints will still need CPU to process their messages, but
they won't be able to actively block the dispatch thread of these
isolated endpoints.

As part of the change, I've changed the most important Spark endpoints
(the driver, executor and block manager endpoints) to be isolated from
others. This means a couple of extra threads are created on the driver
and executor for these endpoints.

Tested with existing unit tests, which hammer the RPC system extensively,
and also by running applications on a cluster (with a prototype of
SPARK-29396).

The current RPC backend in Spark supports single- and multi-threaded
message delivery to endpoints, but the all share the same underlying
thread pool. So an RPC endpoint that blocks a dispatcher thread can
negatively affect other endpoints.

This can be more pronounced with configurations that limit the number
of RPC dispatch threads based on configuration and / or running
environment. And exposing the RPC layer to other code (for example
with something like SPARK-29396) could make it easy to affect normal
Spark operation with a badly written RPC handler.

This change adds a new RPC endpoint type that tells the RPC env to
create dedicated dispatch threads, so that those effects are minimised.
Other endpoints will still need CPU to process their messages, but
they won't be able to actively block the dispatch thread of these
isolated endpoints.

As part of the change, I've changed the most important Spark endpoints
(the driver, executor and block manager endpoints) to be isolated from
others. This means a couple of extra threads are created on the driver
and executor for these endpoints.

Tested with existing unit tests, which hammer the RPC system extensively,
and also by running applications on a cluster (with a prototype of
SPARK-29396).
@SparkQA
Copy link

SparkQA commented Oct 9, 2019

Test build #111919 has finished for PR 26059 at commit 3316d5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DriverEndpoint extends IsolatedRpcEndpoint with Logging

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, but want to check my understanding on one point

private[spark] trait IsolatedRpcEndpoint extends RpcEndpoint {

/** How many threads to use for delivering messages. By default, use a single thread. */
def threadCount(): Int = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to wrap my head around what happens if you create an IsolatedRpcEndpoint with threadCount() > 1, given the code in Inbox which checks for inheritance from ThreadSafeRpcEndpoint:

if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {

I guess if you expect one endpoint to be served by multiple threads, it makes sense you'd want Inbox.enableConcurrent = false and you'd have to make your endpoint safe for that -- but worth a comment here at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question with @squito . How do you deal with ThreadSafeRpcEndpoint ?

Though we could set Inbox.enableConcurrent = false with threadCount() > 0, but multiple threads would be wasted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already updated the comment. ThreadSafeRpcEndpoint is irrelevant here. You may even extend both if you want; but if you do that, either it does nothing (because the thread pool has a single thread), or you're doing it wrong (because the thread pool has multiple thread but you just want one).

So it's pointless to mix in both traits.

@squito
Copy link
Contributor

squito commented Oct 11, 2019

lgtm

@vanzin
Copy link
Contributor Author

vanzin commented Oct 16, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112192 has finished for PR 26059 at commit b674b4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private[spark] trait IsolatedRpcEndpoint extends RpcEndpoint {

/** How many threads to use for delivering messages. By default, use a single thread. */
def threadCount(): Int = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question with @squito . How do you deal with ThreadSafeRpcEndpoint ?

Though we could set Inbox.enableConcurrent = false with threadCount() > 0, but multiple threads would be wasted.


conf.get(EXECUTOR_ID).map { id =>
val role = if (id == SparkContext.DRIVER_IDENTIFIER) "driver" else "executor"
conf.getInt(s"spark.$role.rpc.netty.dispatcher.numThreads", modNumThreads)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that some threads resources could be wasted if user keeps the original config here and upgrades Spark without realizing this PR change. As they may considered for driver, block manager endpoints, etc, previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll be "wasting" at most 2 threads, which is not a big deal. If they weren't really needed, they'll just sit there doing nothing. Spark creates many other threads that don't do much, this will just be noise.

setActive(inbox)
}

override def unregister(endpointName: String): Unit = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be an idempotent method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dispatcher makes sure only to call this once.

@asfgit asfgit closed this in 2f0a38c Oct 17, 2019
@squito
Copy link
Contributor

squito commented Oct 17, 2019

merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants