From a531a5cb9ace8f06a6192d65756d828ab8767f47 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Tue, 23 Oct 2018 12:11:34 +0800 Subject: [PATCH] #1903: merge issue 1903's fix from 2.7.0 to 2.6.x (#2668) --- .../exchange/codec/ExchangeCodec.java | 18 ++++----- .../remoting/transport/CodecSupport.java | 7 ++++ .../remoting/codec/ExchangeCodecTest.java | 17 +++++++++ .../codec/DeprecatedExchangeCodec.java | 18 ++++----- .../dubbo/rpc/protocol/dubbo/DubboCodec.java | 38 ++++++++----------- 5 files changed, 58 insertions(+), 40 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java index c1b8e096226..f1e2bb859ae 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/codec/ExchangeCodec.java @@ -139,8 +139,6 @@ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byt protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); - ObjectInput in = s.deserialize(channel.getUrl(), is); // get request id. long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) { @@ -152,8 +150,9 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro // get status. byte status = header[3]; res.setStatus(status); - if (status == Response.OK) { - try { + try { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); + if (status == Response.OK) { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, in); @@ -163,12 +162,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro data = decodeResponseData(channel, in, getRequestData(id)); } res.setResult(data); - } catch (Throwable t) { - res.setStatus(Response.CLIENT_ERROR); - res.setErrorMessage(StringUtils.toString(t)); + } else { + res.setErrorMessage(in.readUTF()); } - } else { - res.setErrorMessage(in.readUTF()); + } catch (Throwable t) { + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); } return res; } else { @@ -180,6 +179,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro req.setEvent(Request.HEARTBEAT_EVENT); } try { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java index a9b0c1a63a2..3262e12ed15 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/CodecSupport.java @@ -22,9 +22,11 @@ import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.serialize.ObjectInput; import com.alibaba.dubbo.common.serialize.Serialization; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -75,4 +77,9 @@ public static Serialization getSerialization(URL url, Byte id) throws IOExceptio return serialization; } + public static ObjectInput deserialize(URL url, InputStream is, byte proto) throws IOException { + Serialization s = getSerialization(url, proto); + return s.deserialize(url, is); + } + } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java index e372e14f650..163c72c8434 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/codec/ExchangeCodecTest.java @@ -147,6 +147,23 @@ public void test_Decode_Error_Response_Object() throws IOException { Assert.assertEquals(90, obj.getStatus()); } + @Test + public void testInvalidSerializaitonId() throws Exception { + byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + Object obj = decode(header); + Assert.assertTrue(obj instanceof Request); + Request request = (Request) obj; + Assert.assertTrue(request.isBroken()); + Assert.assertTrue(request.getData() instanceof IOException); + header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + + obj = decode(header); + Assert.assertTrue(obj instanceof Response); + Response response = (Response) obj; + Assert.assertEquals(response.getStatus(), Response.CLIENT_ERROR); + Assert.assertTrue(response.getErrorMessage().contains("IOException")); + } + @Test public void test_Decode_Check_Payload() throws IOException { byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java index 2b44ada3941..bdbd280611e 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/transport/codec/DeprecatedExchangeCodec.java @@ -130,8 +130,6 @@ protected Object decode(Channel channel, InputStream is, int readable, byte[] he protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); - ObjectInput in = s.deserialize(channel.getUrl(), is); // get request id. long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) { @@ -143,8 +141,9 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro // get status. byte status = header[3]; res.setStatus(status); - if (status == Response.OK) { - try { + try { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); + if (status == Response.OK) { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, in); @@ -154,12 +153,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro data = decodeResponseData(channel, in, getRequestData(id)); } res.setResult(data); - } catch (Throwable t) { - res.setStatus(Response.CLIENT_ERROR); - res.setErrorMessage(StringUtils.toString(t)); + } else { + res.setErrorMessage(in.readUTF()); } - } else { - res.setErrorMessage(in.readUTF()); + } catch (Throwable t) { + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); } return res; } else { @@ -171,6 +170,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro req.setEvent(Request.HEARTBEAT_EVENT); } try { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java index 80e42090510..b1591918061 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -17,7 +17,6 @@ package com.alibaba.dubbo.rpc.protocol.dubbo; import com.alibaba.dubbo.common.Constants; -import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.Version; import com.alibaba.dubbo.common.io.Bytes; import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream; @@ -25,7 +24,6 @@ import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.serialize.ObjectInput; import com.alibaba.dubbo.common.serialize.ObjectOutput; -import com.alibaba.dubbo.common.serialize.Serialization; import com.alibaba.dubbo.common.utils.ReflectUtils; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.remoting.Channel; @@ -63,7 +61,6 @@ public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // get request id. long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) { @@ -75,13 +72,14 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro // get status. byte status = header[3]; res.setStatus(status); - if (status == Response.OK) { - try { + try { + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); + if (status == Response.OK) { Object data; if (res.isHeartbeat()) { - data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeHeartbeatData(channel, in); } else if (res.isEvent()) { - data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeEventData(channel, in); } else { DecodeableRpcResult result; if (channel.getUrl().getParameter( @@ -98,15 +96,15 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro data = result; } res.setResult(data); - } catch (Throwable t) { - if (log.isWarnEnabled()) { - log.warn("Decode response failed: " + t.getMessage(), t); - } - res.setStatus(Response.CLIENT_ERROR); - res.setErrorMessage(StringUtils.toString(t)); + } else { + res.setErrorMessage(in.readUTF()); } - } else { - res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); + } catch (Throwable t) { + if (log.isWarnEnabled()) { + log.warn("Decode response failed: " + t.getMessage(), t); + } + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); } return res; } else { @@ -119,10 +117,11 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro } try { Object data; + ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); if (req.isHeartbeat()) { - data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { - data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( @@ -149,11 +148,6 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro } } - private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) - throws IOException { - return serialization.deserialize(url, is); - } - private byte[] readMessageData(InputStream is) throws IOException { if (is.available() > 0) { byte[] result = new byte[is.available()];