diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 7d9d593b36241..c769594640c63 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -53,6 +53,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { */ @GuardedBy("this") private var draining = false + private val outboxStoppedEx = new SparkException("Message is dropped because Outbox is stopped") /** * Send a message. If there is no active connection, cache it and launch a new connection. If @@ -68,7 +69,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { } } if (dropped) { - message.callback.onFailure(new SparkException("Message is dropped because Outbox is stopped")) + message.callback.onFailure(outboxStoppedEx) } else { drainOutbox() } @@ -160,7 +161,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { } /** - * Stop [[Inbox]] and notify the waiting messages with the cause. + * Stop [[Outbox]] and notify the waiting messages with the cause. */ private def handleNetworkFailure(e: Throwable): Unit = { synchronized { @@ -215,7 +216,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { // update messages and it's safe to just drain the queue. var message = messages.poll() while (message != null) { - message.callback.onFailure(new SparkException("Message is dropped because Outbox is stopped")) + message.callback.onFailure(outboxStoppedEx) message = messages.poll() } }