diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java index 1ea6c109512..42b02795343 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java @@ -38,6 +38,7 @@ import org.apache.dubbo.remoting.transport.CodecSupport; import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -148,19 +149,22 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro byte status = header[3]; res.setStatus(status); try { - ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); if (status == Response.OK) { Object data; - if (res.isHeartbeat()) { - data = decodeHeartbeatData(channel, in); - } else if (res.isEvent()) { - data = decodeEventData(channel, in); + if (res.isEvent()) { + byte[] eventPayload = CodecSupport.getPayload(is); + if (CodecSupport.isHeartBeat(eventPayload, proto)) { + // heart beat response data is always null; + data = null; + } else { + data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto)); + } } else { - data = decodeResponseData(channel, in, getRequestData(id)); + data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id)); } res.setResult(data); } else { - res.setErrorMessage(in.readUTF()); + res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF()); } } catch (Throwable t) { res.setStatus(Response.CLIENT_ERROR); @@ -176,14 +180,17 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro req.setEvent(true); } try { - ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; - if (req.isHeartbeat()) { - data = decodeHeartbeatData(channel, in); - } else if (req.isEvent()) { - data = decodeEventData(channel, in); + if (req.isEvent()) { + byte[] eventPayload = CodecSupport.getPayload(is); + if (CodecSupport.isHeartBeat(eventPayload, proto)) { + // heart beat response data is always null; + data = null; + } else { + data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto)); + } } else { - data = decodeRequestData(channel, in); + data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto)); } req.setData(data); } catch (Throwable t) { @@ -231,16 +238,23 @@ protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); - ObjectOutput out = serialization.serialize(channel.getUrl(), bos); - if (req.isEvent()) { - encodeEventData(channel, out, req.getData()); + + if (req.isHeartbeat()) { + // heartbeat request data is always null + bos.write(CodecSupport.getNullBytesOf(serialization)); } else { - encodeRequestData(channel, out, req.getData(), req.getVersion()); - } - out.flushBuffer(); - if (out instanceof Cleanable) { - ((Cleanable) out).cleanup(); + ObjectOutput out = serialization.serialize(channel.getUrl(), bos); + if (req.isEvent()) { + encodeEventData(channel, out, req.getData()); + } else { + encodeRequestData(channel, out, req.getData(), req.getVersion()); + } + out.flushBuffer(); + if (out instanceof Cleanable) { + ((Cleanable) out).cleanup(); + } } + bos.flush(); bos.close(); int len = bos.writtenBytes(); @@ -274,21 +288,33 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); - ObjectOutput out = serialization.serialize(channel.getUrl(), bos); + // encode response data or error message. if (status == Response.OK) { - if (res.isHeartbeat()) { - encodeEventData(channel, out, res.getResult()); - } else { - encodeResponseData(channel, out, res.getResult(), res.getVersion()); + if(res.isHeartbeat()){ + // heartbeat response data is always null + bos.write(CodecSupport.getNullBytesOf(serialization)); + }else { + ObjectOutput out = serialization.serialize(channel.getUrl(), bos); + if (res.isEvent()) { + encodeEventData(channel, out, res.getResult()); + } else { + encodeResponseData(channel, out, res.getResult(), res.getVersion()); + } + out.flushBuffer(); + if (out instanceof Cleanable) { + ((Cleanable) out).cleanup(); + } } } else { + ObjectOutput out = serialization.serialize(channel.getUrl(), bos); out.writeUTF(res.getErrorMessage()); + out.flushBuffer(); + if (out instanceof Cleanable) { + ((Cleanable) out).cleanup(); + } } - out.flushBuffer(); - if (out instanceof Cleanable) { - ((Cleanable) out).cleanup(); - } + bos.flush(); bos.close(); diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java index 8c74fe564f6..dd6d270fec2 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/CodecSupport.java @@ -22,11 +22,14 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.ObjectOutput; import org.apache.dubbo.common.serialize.Serialization; import org.apache.dubbo.remoting.Constants; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -41,6 +44,8 @@ public class CodecSupport { private static Map ID_SERIALIZATION_MAP = new HashMap(); private static Map ID_SERIALIZATIONNAME_MAP = new HashMap(); private static Map SERIALIZATIONNAME_ID_MAP = new HashMap(); + // Cache null object serialize results, for heartbeat request/response serialize use. + private static Map ID_NULLBYTES_MAP = new HashMap(); static { Set supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions(); @@ -92,4 +97,58 @@ public static ObjectInput deserialize(URL url, InputStream is, byte proto) throw Serialization s = getSerialization(url, proto); return s.deserialize(url, is); } + + /** + * Get the null object serialize result byte[] of Serialization from the cache, + * if not, generate it first. + * + * @param s Serialization Instances + * @return serialize result of null object + */ + public static byte[] getNullBytesOf(Serialization s) { + return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> { + //Pre-generated Null object bytes + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] nullBytes = new byte[0]; + try { + ObjectOutput out = s.serialize(null, baos); + out.writeObject(null); + out.flushBuffer(); + nullBytes = baos.toByteArray(); + baos.close(); + } catch (Exception e) { + logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead."); + } + return nullBytes; + }); + } + + /** + * Read all payload to byte[] + * + * @param is + * @return + * @throws IOException + */ + public static byte[] getPayload(InputStream is) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int len; + while ((len = is.read(buffer)) > -1) { + baos.write(buffer, 0, len); + } + baos.flush(); + return baos.toByteArray(); + } + + /** + * Check if payload is null object serialize result byte[] of serialization + * + * @param payload + * @param proto + * @return + */ + public static boolean isHeartBeat(byte[] payload, byte proto) { + return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto))); + } } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java index 889f2cebad7..ef51a3fca53 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -33,6 +33,7 @@ import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcInvocation; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -79,8 +80,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro if (status == Response.OK) { Object data; if (res.isEvent()) { - ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); - data = decodeEventData(channel, in); + byte[] eventPayload = CodecSupport.getPayload(is); + if (CodecSupport.isHeartBeat(eventPayload, proto)) { + // heart beat response data is always null; + data = null; + } else { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto); + data = decodeEventData(channel, in); + } } else { DecodeableRpcResult result; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { @@ -118,8 +125,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro try { Object data; if (req.isEvent()) { - ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); - data = decodeEventData(channel, in); + byte[] eventPayload = CodecSupport.getPayload(is); + if (CodecSupport.isHeartBeat(eventPayload, proto)) { + // heart beat response data is always null; + data = null; + } else { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto); + data = decodeEventData(channel, in); + } } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {