From b373c103d623c985e03e5fc6e81d86a2c829bb0f Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 25 Jan 2017 15:47:01 -0800 Subject: [PATCH 1/5] Optimize RequestMessage serialization --- .../apache/spark/rpc/RpcEndpointAddress.scala | 7 +- .../org/apache/spark/rpc/RpcEndpointRef.scala | 3 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 112 ++++++++++++++---- .../rpc/netty/NettyRpcHandlerSuite.scala | 2 +- 4 files changed, 97 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala index b9db60a7797d8..213d3b6303a50 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala @@ -17,6 +17,8 @@ package org.apache.spark.rpc +import javax.annotation.Nullable + import org.apache.spark.SparkException /** @@ -25,10 +27,11 @@ import org.apache.spark.SparkException * The `rpcAddress` may be null, in which case the endpoint is registered via a client-only * connection and can only be reached via the client that sent the endpoint reference. * - * @param rpcAddress The socket address of the endpoint. + * @param rpcAddress The socket address of the endpoint. It's `null` when this address pointing to + * an endpoint in a client `NettyRpcEnv`. * @param name Name of the endpoint. */ -private[spark] case class RpcEndpointAddress(val rpcAddress: RpcAddress, val name: String) { +private[spark] case class RpcEndpointAddress(@Nullable rpcAddress: RpcAddress, name: String) { require(name != null, "RpcEndpoint name must be provided.") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 994e18676ec49..f8776d79e65d3 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -27,8 +27,7 @@ import org.apache.spark.util.RpcUtils /** * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe. */ -private[spark] abstract class RpcEndpointRef(conf: SparkConf) - extends Serializable with Logging { +private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging { private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 1e448b2f1a5c6..3fad65bcf58a2 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -38,7 +38,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server._ import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, ThreadUtils, Utils} private[netty] class NettyRpcEnv( val conf: SparkConf, @@ -189,7 +189,7 @@ private[netty] class NettyRpcEnv( } } else { // Message to a remote RPC endpoint. - postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message))) + postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this))) } } @@ -224,7 +224,7 @@ private[netty] class NettyRpcEnv( }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { - val rpcMessage = RpcOutboxMessage(serialize(message), + val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) @@ -480,16 +480,13 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { */ private[netty] class NettyRpcEndpointRef( @transient private val conf: SparkConf, - endpointAddress: RpcEndpointAddress, - @transient @volatile private var nettyEnv: NettyRpcEnv) - extends RpcEndpointRef(conf) with Serializable with Logging { + val endpointAddress: RpcEndpointAddress, + @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) { @transient @volatile var client: TransportClient = _ - private val _address = if (endpointAddress.rpcAddress != null) endpointAddress else null - private val _name = endpointAddress.name - - override def address: RpcAddress = if (_address != null) _address.rpcAddress else null + override def address: RpcAddress = + if (endpointAddress.rpcAddress != null) endpointAddress.rpcAddress else null private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() @@ -501,34 +498,105 @@ private[netty] class NettyRpcEndpointRef( out.defaultWriteObject() } - override def name: String = _name + override def name: String = endpointAddress.name override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { - nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout) + nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout) } override def send(message: Any): Unit = { require(message != null, "Message is null") - nettyEnv.send(RequestMessage(nettyEnv.address, this, message)) + nettyEnv.send(new RequestMessage(nettyEnv.address, this, message)) } - override def toString: String = s"NettyRpcEndpointRef(${_address})" - - def toURI: URI = new URI(_address.toString) + override def toString: String = s"NettyRpcEndpointRef(${endpointAddress})" final override def equals(that: Any): Boolean = that match { - case other: NettyRpcEndpointRef => _address == other._address + case other: NettyRpcEndpointRef => endpointAddress == other.endpointAddress case _ => false } - final override def hashCode(): Int = if (_address == null) 0 else _address.hashCode() + final override def hashCode(): Int = + if (endpointAddress == null) 0 else endpointAddress.hashCode() } /** * The message that is sent from the sender to the receiver. + * + * @param senderAddress the sender address. It's `null` if this message is from a client + * `NettyRpcEnv`. + * @param receiver the receiver of this message. + * @param content the message content. */ -private[netty] case class RequestMessage( - senderAddress: RpcAddress, receiver: NettyRpcEndpointRef, content: Any) +private[netty] class RequestMessage( + @Nullable val senderAddress: RpcAddress, + val receiver: NettyRpcEndpointRef, val content: Any) { + + /** Manually serialize [[RequestMessage]] to minimize the size of bytes. */ + def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = { + val bos = new ByteBufferOutputStream() + val out = new DataOutputStream(bos) + try { + if (senderAddress == null) { + out.writeBoolean(false) + } else { + out.writeBoolean(true) + out.writeUTF(senderAddress.host) + out.writeInt(senderAddress.port) + } + val receiverAddress = receiver.endpointAddress + if (receiverAddress.rpcAddress == null) { + out.writeBoolean(false) + } else { + out.writeBoolean(true) + out.writeUTF(receiverAddress.rpcAddress.host) + out.writeInt(receiverAddress.rpcAddress.port) + } + out.writeUTF(receiverAddress.name) + val contentBytes = nettyEnv.serialize(content) + assert(contentBytes.hasArray) + out.write(contentBytes.array(), contentBytes.arrayOffset(), contentBytes.remaining()) + } finally { + out.close() + } + bos.toByteBuffer + } + + override def toString: String = s"RequestMessage($senderAddress, $receiver, $content)" +} + +private[netty] object RequestMessage { + + def apply(nettyEnv: NettyRpcEnv, client: TransportClient, bytes: ByteBuffer): RequestMessage = { + val bis = new ByteBufferInputStream(bytes) + val in = new DataInputStream(bis) + try { + val hasSenderAddress = in.readBoolean() + val senderAddress = if (hasSenderAddress) { + RpcAddress(in.readUTF(), in.readInt()) + } else { + null + } + val endpointAddress = { + val hasRpcAddress = in.readBoolean() + val rpcAddress = if (hasRpcAddress) { + RpcAddress(in.readUTF(), in.readInt()) + } else { + null + } + RpcEndpointAddress(rpcAddress, in.readUTF()) + } + val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv) + ref.client = client + new RequestMessage( + senderAddress, + ref, + nettyEnv.deserialize(client, bytes)) + } finally { + in.close() + } + } +} /** * A response that indicates some failure happens in the receiver side. @@ -574,10 +642,10 @@ private[netty] class NettyRpcHandler( val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] assert(addr != null) val clientAddr = RpcAddress(addr.getHostString, addr.getPort) - val requestMessage = nettyEnv.deserialize[RequestMessage](client, message) + val requestMessage = RequestMessage(nettyEnv, client, message) if (requestMessage.senderAddress == null) { // Create a new message with the socket address of the client as the sender. - RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) + new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) } else { // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for // the listening address diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala index 0c156fef0ae0f..a71d8726e7066 100644 --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala @@ -34,7 +34,7 @@ class NettyRpcHandlerSuite extends SparkFunSuite { val env = mock(classOf[NettyRpcEnv]) val sm = mock(classOf[StreamManager]) when(env.deserialize(any(classOf[TransportClient]), any(classOf[ByteBuffer]))(any())) - .thenReturn(RequestMessage(RpcAddress("localhost", 12345), null, null)) + .thenReturn(new RequestMessage(RpcAddress("localhost", 12345), null, null)) test("receive") { val dispatcher = mock(classOf[Dispatcher]) From 0872dfc0b2a2ae9cf4c1189505dd0df2d5bb5b11 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 26 Jan 2017 10:39:57 -0800 Subject: [PATCH 2/5] Minor update --- .../org/apache/spark/rpc/RpcEndpointRef.scala | 3 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 58 +++++++++---------- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index f8776d79e65d3..994e18676ec49 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -27,7 +27,8 @@ import org.apache.spark.util.RpcUtils /** * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe. */ -private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging { +private[spark] abstract class RpcEndpointRef(conf: SparkConf) + extends Serializable with Logging { private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 3fad65bcf58a2..22383497eebcb 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -537,60 +537,54 @@ private[netty] class RequestMessage( val bos = new ByteBufferOutputStream() val out = new DataOutputStream(bos) try { - if (senderAddress == null) { - out.writeBoolean(false) - } else { - out.writeBoolean(true) - out.writeUTF(senderAddress.host) - out.writeInt(senderAddress.port) - } - val receiverAddress = receiver.endpointAddress - if (receiverAddress.rpcAddress == null) { - out.writeBoolean(false) - } else { - out.writeBoolean(true) - out.writeUTF(receiverAddress.rpcAddress.host) - out.writeInt(receiverAddress.rpcAddress.port) - } - out.writeUTF(receiverAddress.name) + writeRpcAddress(out, senderAddress) + writeRpcAddress(out, receiver.endpointAddress.rpcAddress) + out.writeUTF(receiver.endpointAddress.name) val contentBytes = nettyEnv.serialize(content) assert(contentBytes.hasArray) - out.write(contentBytes.array(), contentBytes.arrayOffset(), contentBytes.remaining()) + out.write(contentBytes.array, contentBytes.arrayOffset, contentBytes.remaining) } finally { out.close() } bos.toByteBuffer } + private def writeRpcAddress(out: DataOutputStream, @Nullable rpcAddress: RpcAddress): Unit = { + if (rpcAddress == null) { + out.writeBoolean(false) + } else { + out.writeBoolean(true) + out.writeUTF(rpcAddress.host) + out.writeInt(rpcAddress.port) + } + } + override def toString: String = s"RequestMessage($senderAddress, $receiver, $content)" } private[netty] object RequestMessage { + private def readRpcAddress(in: DataInputStream): RpcAddress = { + val hasRpcAddress = in.readBoolean() + if (hasRpcAddress) { + RpcAddress(in.readUTF(), in.readInt()) + } else { + null + } + } + def apply(nettyEnv: NettyRpcEnv, client: TransportClient, bytes: ByteBuffer): RequestMessage = { val bis = new ByteBufferInputStream(bytes) val in = new DataInputStream(bis) try { - val hasSenderAddress = in.readBoolean() - val senderAddress = if (hasSenderAddress) { - RpcAddress(in.readUTF(), in.readInt()) - } else { - null - } - val endpointAddress = { - val hasRpcAddress = in.readBoolean() - val rpcAddress = if (hasRpcAddress) { - RpcAddress(in.readUTF(), in.readInt()) - } else { - null - } - RpcEndpointAddress(rpcAddress, in.readUTF()) - } + val senderAddress = readRpcAddress(in) + val endpointAddress = RpcEndpointAddress(readRpcAddress(in), in.readUTF()) val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv) ref.client = client new RequestMessage( senderAddress, ref, + // The remaining bytes in `bytes` are the message content. nettyEnv.deserialize(client, bytes)) } finally { in.close() From c4b8ff07e0695d6a30f923c8313be070d4335822 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 26 Jan 2017 10:41:43 -0800 Subject: [PATCH 3/5] make endpointAddress private --- .../main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 22383497eebcb..1c415bfd391e6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -480,7 +480,7 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { */ private[netty] class NettyRpcEndpointRef( @transient private val conf: SparkConf, - val endpointAddress: RpcEndpointAddress, + private val endpointAddress: RpcEndpointAddress, @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) { @transient @volatile var client: TransportClient = _ @@ -538,8 +538,8 @@ private[netty] class RequestMessage( val out = new DataOutputStream(bos) try { writeRpcAddress(out, senderAddress) - writeRpcAddress(out, receiver.endpointAddress.rpcAddress) - out.writeUTF(receiver.endpointAddress.name) + writeRpcAddress(out, receiver.address) + out.writeUTF(receiver.name) val contentBytes = nettyEnv.serialize(content) assert(contentBytes.hasArray) out.write(contentBytes.array, contentBytes.arrayOffset, contentBytes.remaining) From 195adc76d97368780672cf2b8c462a5d6d419532 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 26 Jan 2017 13:43:05 -0800 Subject: [PATCH 4/5] Remove nullable --- .../main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala | 4 +--- .../main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala index 213d3b6303a50..fdbccc9e74c37 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointAddress.scala @@ -17,8 +17,6 @@ package org.apache.spark.rpc -import javax.annotation.Nullable - import org.apache.spark.SparkException /** @@ -31,7 +29,7 @@ import org.apache.spark.SparkException * an endpoint in a client `NettyRpcEnv`. * @param name Name of the endpoint. */ -private[spark] case class RpcEndpointAddress(@Nullable rpcAddress: RpcAddress, name: String) { +private[spark] case class RpcEndpointAddress(rpcAddress: RpcAddress, name: String) { require(name != null, "RpcEndpoint name must be provided.") diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 1c415bfd391e6..64a34226f35de 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -529,7 +529,7 @@ private[netty] class NettyRpcEndpointRef( * @param content the message content. */ private[netty] class RequestMessage( - @Nullable val senderAddress: RpcAddress, + val senderAddress: RpcAddress, val receiver: NettyRpcEndpointRef, val content: Any) { /** Manually serialize [[RequestMessage]] to minimize the size of bytes. */ @@ -549,7 +549,7 @@ private[netty] class RequestMessage( bos.toByteBuffer } - private def writeRpcAddress(out: DataOutputStream, @Nullable rpcAddress: RpcAddress): Unit = { + private def writeRpcAddress(out: DataOutputStream, rpcAddress: RpcAddress): Unit = { if (rpcAddress == null) { out.writeBoolean(false) } else { From 05e9a0c517d1357e4ffc23bed6bdecb5325552b9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 26 Jan 2017 22:21:05 -0800 Subject: [PATCH 5/5] Address --- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 23 +++++++++---- .../spark/rpc/netty/NettyRpcEnvSuite.scala | 33 ++++++++++++++++++- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 64a34226f35de..ff5e39a8dcbc8 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -37,7 +37,7 @@ import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server._ import org.apache.spark.rpc._ -import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance} +import org.apache.spark.serializer.{JavaSerializer, JavaSerializerInstance, SerializationStream} import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, ThreadUtils, Utils} private[netty] class NettyRpcEnv( @@ -253,6 +253,13 @@ private[netty] class NettyRpcEnv( javaSerializerInstance.serialize(content) } + /** + * Returns [[SerializationStream]] that forwards the serialized bytes to `out`. + */ + private[netty] def serializeStream(out: OutputStream): SerializationStream = { + javaSerializerInstance.serializeStream(out) + } + private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = { NettyRpcEnv.currentClient.withValue(client) { deserialize { () => @@ -530,9 +537,10 @@ private[netty] class NettyRpcEndpointRef( */ private[netty] class RequestMessage( val senderAddress: RpcAddress, - val receiver: NettyRpcEndpointRef, val content: Any) { + val receiver: NettyRpcEndpointRef, + val content: Any) { - /** Manually serialize [[RequestMessage]] to minimize the size of bytes. */ + /** Manually serialize [[RequestMessage]] to minimize the size. */ def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = { val bos = new ByteBufferOutputStream() val out = new DataOutputStream(bos) @@ -540,9 +548,12 @@ private[netty] class RequestMessage( writeRpcAddress(out, senderAddress) writeRpcAddress(out, receiver.address) out.writeUTF(receiver.name) - val contentBytes = nettyEnv.serialize(content) - assert(contentBytes.hasArray) - out.write(contentBytes.array, contentBytes.arrayOffset, contentBytes.remaining) + val s = nettyEnv.serializeStream(out) + try { + s.writeObject(content) + } finally { + s.close() + } } finally { out.close() } diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala index 0409aa3a5dee1..2b1bce4d208f6 100644 --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.rpc.netty +import org.scalatest.mock.MockitoSugar + import org.apache.spark._ +import org.apache.spark.network.client.TransportClient import org.apache.spark.rpc._ -class NettyRpcEnvSuite extends RpcEnvSuite { +class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar { override def createRpcEnv( conf: SparkConf, @@ -53,4 +56,32 @@ class NettyRpcEnvSuite extends RpcEnvSuite { } } + test("RequestMessage serialization") { + def assertRequestMessageEquals(expected: RequestMessage, actual: RequestMessage): Unit = { + assert(expected.senderAddress === actual.senderAddress) + assert(expected.receiver === actual.receiver) + assert(expected.content === actual.content) + } + + val nettyEnv = env.asInstanceOf[NettyRpcEnv] + val client = mock[TransportClient] + val senderAddress = RpcAddress("locahost", 12345) + val receiverAddress = RpcEndpointAddress("localhost", 54321, "test") + val receiver = new NettyRpcEndpointRef(nettyEnv.conf, receiverAddress, nettyEnv) + + val msg = new RequestMessage(senderAddress, receiver, "foo") + assertRequestMessageEquals( + msg, + RequestMessage(nettyEnv, client, msg.serialize(nettyEnv))) + + val msg2 = new RequestMessage(null, receiver, "foo") + assertRequestMessageEquals( + msg2, + RequestMessage(nettyEnv, client, msg2.serialize(nettyEnv))) + + val msg3 = new RequestMessage(senderAddress, receiver, null) + assertRequestMessageEquals( + msg3, + RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv))) + } }