Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeat skip serialize and deserialize #7077

Merged
merged 4 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -148,19 +149,22 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
byte status = header[3];
res.setStatus(status);
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
if (res.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
}
} else {
data = decodeResponseData(channel, in, getRequestData(id));
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
}
res.setResult(data);
} else {
res.setErrorMessage(in.readUTF());
res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
Expand All @@ -176,14 +180,17 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(true);
}
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
if (req.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto));
}
} else {
data = decodeRequestData(channel, in);
data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
}
req.setData(data);
} catch (Throwable t) {
Expand Down Expand Up @@ -231,16 +238,23 @@ protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());

if (req.isHeartbeat()) {
// heartbeat request data is always null
bos.write(CodecSupport.getNullBytesOf(serialization));
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}

bos.flush();
bos.close();
int len = bos.writtenBytes();
Expand Down Expand Up @@ -274,21 +288,33 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re

buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
if(res.isHeartbeat()){
// heartbeat response data is always null
bos.write(CodecSupport.getNullBytesOf(serialization));
}else {
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (res.isEvent()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
} else {
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
out.writeUTF(res.getErrorMessage());
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}

bos.flush();
bos.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.ObjectInput;
import org.apache.dubbo.common.serialize.ObjectOutput;
import org.apache.dubbo.common.serialize.Serialization;
import org.apache.dubbo.remoting.Constants;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand All @@ -41,6 +44,8 @@ public class CodecSupport {
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
private static Map<String, Byte> SERIALIZATIONNAME_ID_MAP = new HashMap<String, Byte>();
// Cache null object serialize results, for heartbeat request/response serialize use.
private static Map<Byte, byte[]> ID_NULLBYTES_MAP = new HashMap<Byte, byte[]>();
AlbumenJ marked this conversation as resolved.
Show resolved Hide resolved

static {
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
Expand Down Expand Up @@ -92,4 +97,58 @@ public static ObjectInput deserialize(URL url, InputStream is, byte proto) throw
Serialization s = getSerialization(url, proto);
return s.deserialize(url, is);
}

/**
* Get the null object serialize result byte[] of Serialization from the cache,
* if not, generate it first.
*
* @param s Serialization Instances
* @return serialize result of null object
*/
public static byte[] getNullBytesOf(Serialization s) {
return ID_NULLBYTES_MAP.computeIfAbsent(s.getContentTypeId(), k -> {
//Pre-generated Null object bytes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] nullBytes = new byte[0];
try {
ObjectOutput out = s.serialize(null, baos);
out.writeObject(null);
out.flushBuffer();
nullBytes = baos.toByteArray();
baos.close();
} catch (Exception e) {
logger.warn("Serialization extension " + s.getClass().getName() + " not support serializing null object, return an empty bytes instead.");
}
return nullBytes;
});
}

/**
* Read all payload to byte[]
*
* @param is
* @return
* @throws IOException
*/
public static byte[] getPayload(InputStream is) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) > -1) {
baos.write(buffer, 0, len);
}
baos.flush();
return baos.toByteArray();
}

/**
* Check if payload is null object serialize result byte[] of serialization
*
* @param payload
* @param proto
* @return
*/
public static boolean isHeartBeat(byte[] payload, byte proto) {
return Arrays.equals(payload, getNullBytesOf(getSerializationById(proto)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcInvocation;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -79,8 +80,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in);
}
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
Expand Down Expand Up @@ -118,8 +125,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
try {
Object data;
if (req.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in);
}
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
Expand Down