Skip to content

Commit

Permalink
Pick from apache#7077 f4b225e
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed May 26, 2021
1 parent c15d28a commit de9cb4a
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -148,19 +149,22 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
byte status = header[3];
res.setStatus(status);
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
if (res.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
}
} else {
data = decodeResponseData(channel, in, getRequestData(id));
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
}
res.setResult(data);
} else {
res.setErrorMessage(in.readUTF());
res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
Expand All @@ -176,14 +180,17 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(true);
}
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
if (req.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
}
} else {
data = decodeRequestData(channel, in);
data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
}
req.setData(data);
} catch (Throwable t) {
Expand Down Expand Up @@ -231,16 +238,23 @@ protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());

if (req.isHeartbeat()) {
// heartbeat request data is always null
bos.write(CodecSupport.getNullBytesOf(serialization));
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}

bos.flush();
bos.close();
int len = bos.writtenBytes();
Expand Down Expand Up @@ -274,21 +288,33 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re

buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
if(res.isHeartbeat()){
// heartbeat response data is always null
bos.write(CodecSupport.getNullBytesOf(serialization));
}else {
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (res.isEvent()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
} else {
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
out.writeUTF(res.getErrorMessage());
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}

bos.flush();
bos.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
import org.apache.dubbo.common.serialize.ObjectOutput;
import org.apache.dubbo.common.serialize.Serialization;
import org.apache.dubbo.remoting.Constants;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand All @@ -41,6 +44,8 @@ public class CodecSupport {
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
// Cache null object serialize results, for heartbeat request/response serialize use.
private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();

static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
Expand Down Expand Up @@ -92,4 +97,58 @@ public static ObjectInput deserialize(URL url, InputStream is, byte proto) throw
Serialization s = getSerialization(url, proto);
return s.deserialize(url, is);
}

/**
* Get the null object serialize result byte[] of Serialization from the cache,
* if not, generate it first.
*
* @param s Serialization Instances
* @return serialize result of null object
*/
public static byte[] getNullBytesOf(Serialization s) {
return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> {
//Pre-generated Null object bytes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] nullBytes = new byte[0];
try {
ObjectOutput out = s.serialize(null, baos);
out.writeObject(null);
out.flushBuffer();
nullBytes = baos.toByteArray();
baos.close();
} catch (Exception e) {
logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead.");
}
return nullBytes;
});
}

/**
* Read all payload to byte[]
*
* @param is
* @return
* @throws IOException
*/
public static byte[] getPayload(InputStream is) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) > -1) {
baos.write(buffer, 0, len);
}
baos.flush();
return baos.toByteArray();
}

/**
* Check if payload is null object serialize result byte[] of serialization
*
* @param payload
* @param proto
* @return
*/
public static boolean isHeartBeat(byte[] payload, byte proto) {
return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcInvocation;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -79,8 +80,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in);
}
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
Expand Down Expand Up @@ -118,8 +125,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
try {
Object data;
if (req.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in);
}
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
Expand Down

0 comments on commit de9cb4a

Please sign in to comment.