Skip to content

Commit

Permalink
Reject if response do not match any request (#11882)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ authored Mar 24, 2023
1 parent 7d9e955 commit 7b1199a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.Date;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_TIMEOUT_SERVER;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_EXCEED_PAYLOAD_LIMIT;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_SKIP_UNUSED_STREAM;
Expand Down Expand Up @@ -171,7 +174,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
}
} else {
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));
}
res.setResult(data);
} else {
Expand Down Expand Up @@ -213,16 +216,21 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

protected Object getRequestData(long id) {
protected Object getRequestData(Channel channel, Response response, long id) {
DefaultFuture future = DefaultFuture.getFuture(id);
if (future == null) {
return null;
}
Request req = future.getRequest();
if (req == null) {
return null;
if (future != null) {
Request req = future.getRequest();
if (req != null) {
return req.getData();
}
}
return req.getData();

logger.warn(PROTOCOL_TIMEOUT_SERVER, "", "", "The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response status is " + response.getStatus() + ", response id is " + response.getId()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
throw new IllegalArgumentException("Failed to find any request match the response, response id: " + id);
}

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Expand Down Expand Up @@ -431,7 +439,7 @@ protected Object decodeEventData(Channel channel, ObjectInput in, byte[] eventBy
try {
if (eventBytes != null) {
int dataLen = eventBytes.length;
int threshold = ConfigurationUtils.getSystemConfiguration(channel.getUrl().getScopeModel()).getInt("deserialization.event.size", 50);
int threshold = ConfigurationUtils.getSystemConfiguration(channel.getUrl().getScopeModel()).getInt("deserialization.event.size", 15);
if (dataLen > threshold) {
throw new IllegalArgumentException("Event data too long, actual size " + threshold + ", threshold " + threshold + " rejected for security consideration.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
import org.apache.dubbo.rpc.model.FrameworkModel;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -140,6 +142,8 @@ void test_Decode_Error_MagicNum() throws IOException {

@Test
void test_Decode_Error_Length() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);

byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
byte[] request = getRequestBytes(person, header);
Expand All @@ -151,6 +155,8 @@ void test_Decode_Error_Length() throws IOException {
Assertions.assertEquals(person, obj.getResult());
//only decode necessary bytes
Assertions.assertEquals(request.length, buffer.readerIndex());

future.cancel();
}

@Test
Expand Down Expand Up @@ -229,6 +235,8 @@ void test_Decode_MigicCodec_Contain_ExchangeHeader() throws IOException {

@Test
void test_Decode_Return_Response_Person() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);

//00000010-response/oneway/hearbeat=false/hessian |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
Expand All @@ -238,6 +246,8 @@ void test_Decode_Return_Response_Person() throws IOException {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);

future.cancel();
}

@Test //The status input has a problem, and the read information is wrong when the serialization is serialized.
Expand Down Expand Up @@ -329,6 +339,8 @@ void test_Decode_Error_Request_Object() throws IOException {

@Test
void test_Header_Response_NoSerializationFlag() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);

//00000010-response/oneway/hearbeat=false/noset |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
Expand All @@ -338,10 +350,14 @@ void test_Header_Response_NoSerializationFlag() throws IOException {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);

future.cancel();
}

@Test
void test_Header_Response_Heartbeat() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000, null);

//00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, SERIALIZATION_BYTE, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
Expand All @@ -351,6 +367,8 @@ void test_Header_Response_Heartbeat() throws IOException {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);

future.cancel();
}

@Test
Expand All @@ -376,6 +394,8 @@ void test_Encode_Request() throws IOException {

@Test
void test_Encode_Response() throws IOException {
DefaultFuture future = DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000, null);

ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
Channel channel = getClientSideChannel(url);
Response response = new Response();
Expand All @@ -401,6 +421,7 @@ void test_Encode_Response() throws IOException {
// encode response verson ??
// Assertions.assertEquals(response.getProtocolVersion(), obj.getVersion());

future.cancel();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
(Invocation) getRequestData(channel, res, id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
(Invocation) getRequestData(channel, res, id), proto);
}
data = result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.remoting.buffer.ChannelBuffers;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.exchange.support.MultiMessage;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcInvocation;
Expand All @@ -32,7 +33,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.apache.dubbo.rpc.Constants.INPUT_KEY;
import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;
Expand All @@ -45,23 +48,25 @@ void test() throws Exception {
ChannelBuffer buffer = ChannelBuffers.buffer(2048);
Channel channel = new MockChannel();
Assertions.assertEquals(Codec2.DecodeResult.NEED_MORE_INPUT, dubboCountCodec.decode(channel, buffer));
List<DefaultFuture> futures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
Request request = new Request(1);
Request request = new Request(i);
futures.add(DefaultFuture.newFuture(channel, request, 1000, null));
RpcInvocation rpcInvocation = new RpcInvocation(null, "echo", DemoService.class.getName(), "", new Class<?>[]{String.class}, new String[]{"yug"});
request.setData(rpcInvocation);
dubboCountCodec.encode(channel, buffer, request);
}

for (int i = 0; i < 10; i++) {
Response response = new Response(1);
Response response = new Response(i);
AppResponse appResponse = new AppResponse(i);
response.setResult(appResponse);
dubboCountCodec.encode(channel, buffer, response);
}

MultiMessage multiMessage = (MultiMessage) dubboCountCodec.decode(channel, buffer);
Assertions.assertEquals(multiMessage.size(), 20);
Assertions.assertEquals(20, multiMessage.size());
int requestCount = 0;
int responseCount = 0;
Iterator iterator = multiMessage.iterator();
Expand All @@ -79,6 +84,8 @@ void test() throws Exception {
}
Assertions.assertEquals(requestCount, 10);
Assertions.assertEquals(responseCount, 10);

futures.forEach(DefaultFuture::cancel);
}

}

0 comments on commit 7b1199a

Please sign in to comment.