Skip to content

Commit

Permalink
#1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668)
Browse files Browse the repository at this point in the history
  • Loading branch information
beiwei30 authored Oct 23, 2018
1 parent dca9574 commit a531a5c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byt

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
Expand All @@ -152,8 +150,9 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -163,12 +162,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
} else {
res.setErrorMessage(in.readUTF());
}
} else {
res.setErrorMessage(in.readUTF());
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
Expand All @@ -180,6 +179,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.serialize.Serialization;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -75,4 +77,9 @@ public static Serialization getSerialization(URL url, Byte id) throws IOExceptio
return serialization;
}

public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException {
Serialization s = getSerialization(url, proto);
return s.deserialize(url, is);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ public void test_Decode_Error_Response_Object() throws IOException {
Assert.assertEquals(90, obj.getStatus());
}

@Test
public void testInvalidSerializaitonId() throws Exception {
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Object obj = decode(header);
Assert.assertTrue(obj instanceof Request);
Request request = (Request) obj;
Assert.assertTrue(request.isBroken());
Assert.assertTrue(request.getData() instanceof IOException);
header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

obj = decode(header);
Assert.assertTrue(obj instanceof Response);
Response response = (Response) obj;
Assert.assertEquals(response.getStatus(), Response.CLIENT_ERROR);
Assert.assertTrue(response.getErrorMessage().contains("IOException"));
}

@Test
public void test_Decode_Check_Payload() throws IOException {
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ protected Object decode(Channel channel, InputStream is, int readable, byte[] he

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
Expand All @@ -143,8 +141,9 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -154,12 +153,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
} else {
res.setErrorMessage(in.readUTF());
}
} else {
res.setErrorMessage(in.readUTF());
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
Expand All @@ -171,6 +170,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package com.alibaba.dubbo.rpc.protocol.dubbo;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.serialize.ObjectOutput;
import com.alibaba.dubbo.common.serialize.Serialization;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
Expand Down Expand Up @@ -63,7 +61,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
Expand All @@ -75,13 +72,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
data = decodeEventData(channel, in);
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Expand All @@ -98,15 +96,15 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = result;
}
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
} else {
res.setErrorMessage(in.readUTF());
}
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
Expand All @@ -119,10 +117,11 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
try {
Object data;
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Expand All @@ -149,11 +148,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
throws IOException {
return serialization.deserialize(url, is);
}

private byte[] readMessageData(InputStream is) throws IOException {
if (is.available() > 0) {
byte[] result = new byte[is.available()];
Expand Down

0 comments on commit a531a5c

Please sign in to comment.