diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index fe738f414996..825d9ce77947 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -25,7 +25,7 @@ import scala.concurrent.Future import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Network -import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv} +import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -65,7 +65,7 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) * Lives in the driver to receive heartbeats from executors.. */ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) - extends SparkListener with IsolatedRpcEndpoint with Logging { + extends SparkListener with IsolatedThreadSafeRpcEndpoint with Logging { def this(sc: SparkContext) = { this(sc, new SystemClock) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4903421f9063..d8f33a06123f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -56,7 +56,7 @@ private[spark] class CoarseGrainedExecutorBackend( env: SparkEnv, resourcesFileOpt: Option[String], resourceProfile: ResourceProfile) - extends IsolatedRpcEndpoint with ExecutorBackend with Logging { + extends IsolatedThreadSafeRpcEndpoint with ExecutorBackend with Logging { import CoarseGrainedExecutorBackend._ diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala index 9a59b6bf678f..989ef8f2edf2 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala @@ -19,14 +19,14 @@ package org.apache.spark.internal.plugin import org.apache.spark.api.plugin.DriverPlugin import org.apache.spark.internal.Logging -import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv} +import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} case class PluginMessage(pluginName: String, message: AnyRef) private class PluginEndpoint( plugins: Map[String, DriverPlugin], override val rpcEnv: RpcEnv) - extends IsolatedRpcEndpoint with Logging { + extends IsolatedThreadSafeRpcEndpoint with Logging { override def receive: PartialFunction[Any, Unit] = { case PluginMessage(pluginName, message) => diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala index 4728759e7fb0..627f17f8862b 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala @@ -153,12 +153,25 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint private[spark] trait IsolatedRpcEndpoint extends RpcEndpoint { /** - * How many threads to use for delivering messages. By default, use a single thread. + * How many threads to use for delivering messages. * * Note that requesting more than one thread means that the endpoint should be able to handle * messages arriving from many threads at once, and all the things that entails (including * messages being delivered to the endpoint out of order). */ - def threadCount(): Int = 1 + def threadCount(): Int + +} + +/** + * An endpoint that uses a dedicated thread pool for delivering messages and + * ensured to be thread-safe. + */ +private[spark] trait IsolatedThreadSafeRpcEndpoint extends IsolatedRpcEndpoint { + + /** + * Limit the threadCount to 1 so that messages are ensured to be handled in a thread-safe way. + */ + final def threadCount(): Int = 1 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 225dd1d75bfa..2d3cf2ebc4f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -127,7 +127,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs") } - class DriverEndpoint extends IsolatedRpcEndpoint with Logging { + class DriverEndpoint extends IsolatedThreadSafeRpcEndpoint with Logging { override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index adeb507941c0..495d91fe0e44 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -33,7 +33,7 @@ import org.apache.spark.{MapOutputTrackerMaster, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExternalBlockStoreClient -import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} import org.apache.spark.shuffle.ShuffleManager @@ -41,8 +41,8 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** - * BlockManagerMasterEndpoint is an [[IsolatedRpcEndpoint]] on the master node to track statuses - * of all the storage endpoints' block managers. + * BlockManagerMasterEndpoint is an [[IsolatedThreadSafeRpcEndpoint]] on the master node to + * track statuses of all the storage endpoints' block managers. */ private[spark] class BlockManagerMasterEndpoint( @@ -55,7 +55,7 @@ class BlockManagerMasterEndpoint( mapOutputTracker: MapOutputTrackerMaster, shuffleManager: ShuffleManager, isDriver: Boolean) - extends IsolatedRpcEndpoint with Logging { + extends IsolatedThreadSafeRpcEndpoint with Logging { // Mapping from executor id to the block manager's local disk directories. private val executorIdToLocalDirs = diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 54a72568b18f..71c7a4de4c13 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{MapOutputTracker, SparkEnv} import org.apache.spark.internal.Logging -import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv} +import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -34,7 +34,7 @@ class BlockManagerStorageEndpoint( override val rpcEnv: RpcEnv, blockManager: BlockManager, mapOutputTracker: MapOutputTracker) - extends IsolatedRpcEndpoint with Logging { + extends IsolatedThreadSafeRpcEndpoint with Logging { private val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index c70dde79b3c2..6e5eb7732201 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -962,7 +962,8 @@ abstract class RpcEnvSuite extends SparkFunSuite { val singleThreadedEnv = createRpcEnv( new SparkConf().set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 1), "singleThread", 0) try { - val blockingEndpoint = singleThreadedEnv.setupEndpoint("blocking", new IsolatedRpcEndpoint { + val blockingEndpoint = singleThreadedEnv + .setupEndpoint("blocking", new IsolatedThreadSafeRpcEndpoint { override val rpcEnv: RpcEnv = singleThreadedEnv override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {