Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.Future
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
Expand Down Expand Up @@ -65,7 +65,7 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with IsolatedRpcEndpoint with Logging {
extends SparkListener with IsolatedThreadSafeRpcEndpoint with Logging {

def this(sc: SparkContext) = {
this(sc, new SystemClock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class CoarseGrainedExecutorBackend(
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
extends IsolatedThreadSafeRpcEndpoint with ExecutorBackend with Logging {

import CoarseGrainedExecutorBackend._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package org.apache.spark.internal.plugin

import org.apache.spark.api.plugin.DriverPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv}

case class PluginMessage(pluginName: String, message: AnyRef)

private class PluginEndpoint(
plugins: Map[String, DriverPlugin],
override val rpcEnv: RpcEnv)
extends IsolatedRpcEndpoint with Logging {
extends IsolatedThreadSafeRpcEndpoint with Logging {

override def receive: PartialFunction[Any, Unit] = {
case PluginMessage(pluginName, message) =>
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,25 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
private[spark] trait IsolatedRpcEndpoint extends RpcEndpoint {

/**
* How many threads to use for delivering messages. By default, use a single thread.
* How many threads to use for delivering messages.
*
* Note that requesting more than one thread means that the endpoint should be able to handle
* messages arriving from many threads at once, and all the things that entails (including
* messages being delivered to the endpoint out of order).
*/
def threadCount(): Int = 1
def threadCount(): Int

}

/**
* An endpoint that uses a dedicated thread pool for delivering messages and
* ensured to be thread-safe.
*/
private[spark] trait IsolatedThreadSafeRpcEndpoint extends IsolatedRpcEndpoint {

/**
* Limit the threadCount to 1 so that messages are ensured to be handled in a thread-safe way.
*/
final def threadCount(): Int = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the PR description, this final def is the main contribution, @Ngone51 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and also the naming of IsolatedThreadSafeRpcEndpoint.


}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs")
}

class DriverEndpoint extends IsolatedRpcEndpoint with Logging {
class DriverEndpoint extends IsolatedThreadSafeRpcEndpoint with Logging {

override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}

/**
* BlockManagerMasterEndpoint is an [[IsolatedRpcEndpoint]] on the master node to track statuses
* of all the storage endpoints' block managers.
* BlockManagerMasterEndpoint is an [[IsolatedThreadSafeRpcEndpoint]] on the master node to
* track statuses of all the storage endpoints' block managers.
*/
private[spark]
class BlockManagerMasterEndpoint(
Expand All @@ -55,7 +55,7 @@ class BlockManagerMasterEndpoint(
mapOutputTracker: MapOutputTrackerMaster,
shuffleManager: ShuffleManager,
isDriver: Boolean)
extends IsolatedRpcEndpoint with Logging {
extends IsolatedThreadSafeRpcEndpoint with Logging {

// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.{MapOutputTracker, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{ThreadUtils, Utils}

Expand All @@ -34,7 +34,7 @@ class BlockManagerStorageEndpoint(
override val rpcEnv: RpcEnv,
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends IsolatedRpcEndpoint with Logging {
extends IsolatedThreadSafeRpcEndpoint with Logging {

private val asyncThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100)
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,8 @@ abstract class RpcEnvSuite extends SparkFunSuite {
val singleThreadedEnv = createRpcEnv(
new SparkConf().set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 1), "singleThread", 0)
try {
val blockingEndpoint = singleThreadedEnv.setupEndpoint("blocking", new IsolatedRpcEndpoint {
val blockingEndpoint = singleThreadedEnv
.setupEndpoint("blocking", new IsolatedThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = singleThreadedEnv

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down