Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: select channel handles based on protocol versions #6634

Merged
merged 86 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
39d88c1
load SeataSerializer by version
Bughue Dec 25, 2023
f1bacdd
Unwinding RpcMessage and Encoder/Decoder dependencies
Bughue Dec 25, 2023
901b886
license
Bughue Dec 26, 2023
ee3c3aa
license
Bughue Dec 26, 2023
b518004
style
Bughue Dec 26, 2023
883adfb
test ProtocolConstants.VERSION
Bughue Dec 26, 2023
492672c
version serialize
Bughue Dec 27, 2023
94cb0cf
v0
Bughue Dec 28, 2023
6dc20a2
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-com…
Bughue Dec 29, 2023
a151eba
v0
Bughue Dec 29, 2023
289ed97
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Dec 29, 2023
8c197af
v0
Bughue Dec 29, 2023
d8c19a4
v0
Bughue Dec 29, 2023
958ef77
v0
Bughue Dec 29, 2023
2a0d836
v0
Bughue Dec 29, 2023
59e9f8c
v0
Bughue Dec 29, 2023
227464b
v0
Bughue Dec 29, 2023
c27c63b
license
Bughue Dec 29, 2023
0afa845
license
Bughue Dec 29, 2023
4959cbe
license
Bughue Dec 29, 2023
dd7c314
style
Bughue Dec 29, 2023
b7bdb6d
style
Bughue Dec 29, 2023
dfd9bf2
style
Bughue Dec 29, 2023
48a6090
style
Bughue Jan 2, 2024
7d1a6b3
test
Bughue Jan 2, 2024
33e6e4e
test
Bughue Jan 2, 2024
379ba28
test
Bughue Jan 2, 2024
a9b4d5d
test
Bughue Jan 3, 2024
95573d3
fix test
Bughue Jan 4, 2024
181a421
inner class
Bughue Jan 4, 2024
d2804fb
inner class
Bughue Jan 4, 2024
8da65ff
Merge branch '2.x' into dev-mlv-rpc-msg
xingfudeshi Jan 18, 2024
91c171d
style
Bughue Jan 22, 2024
342d306
single pattern
Bughue Jan 22, 2024
439d132
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jan 22, 2024
74c48d5
conflit
Bughue Jan 22, 2024
72d47a1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Jan 22, 2024
93c972b
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Feb 4, 2024
0f8c187
resolve conflict
Bughue Feb 4, 2024
c1c7f3f
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Feb 4, 2024
cf4fe41
resolve conflict
Bughue Feb 4, 2024
9607bcd
resolve conflict
Bughue Feb 4, 2024
5b433cd
import
Bughue Feb 7, 2024
704669a
style
Bughue Feb 28, 2024
91418a1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 4, 2024
00a4e8e
style
Bughue Mar 4, 2024
c728968
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 4, 2024
f17581c
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-com…
Bughue Mar 4, 2024
12d99d5
Merge branch 'dev-mlv-rpc-msg' of github.com:Bughue/seata into dev-ml…
Bughue Mar 5, 2024
8943b39
Merge branch 'dev-mlv-serialize-version' of github.com:Bughue/seata i…
Bughue Mar 5, 2024
7396cf9
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-com…
Bughue Mar 5, 2024
3f7a3c8
fix test
Bughue Mar 5, 2024
2558ba3
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-com…
Bughue Mar 6, 2024
ef146b7
optimize version
Bughue Mar 7, 2024
df9ea50
optimize version
Bughue Mar 7, 2024
af647e7
optimize version
Bughue Mar 7, 2024
2a92556
Merge branch 'dev-mlv-rpc-msg' of github.com:Bughue/seata into dev-ml…
Bughue Mar 7, 2024
3841379
optimize version
Bughue Mar 8, 2024
d876d10
optimize version
Bughue Mar 8, 2024
3f0abf0
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 8, 2024
637e3da
Merge branch 'dev-mlv-rpc-msg' of github.com:Bughue/seata into dev-ml…
Bughue Mar 8, 2024
6f8c019
optimize version
Bughue Mar 8, 2024
c0dfb51
optimize version
Bughue Mar 8, 2024
c2f68aa
Merge branch 'dev-mlv-rpc-msg' of github.com:Bughue/seata into dev-ml…
Bughue Mar 8, 2024
b5bb8be
optimize version
Bughue Mar 8, 2024
2c4543c
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-com…
Bughue Apr 22, 2024
9d30454
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Apr 30, 2024
2f5feb0
merge
Bughue Apr 30, 2024
a807053
Merge branch 'dev-mlv-serialize-version' of github.com:Bughue/seata i…
Bughue Apr 30, 2024
0ebf2ce
merge
Bughue Apr 30, 2024
7add8d9
check style
Bughue Apr 30, 2024
aeb24c8
check style
Bughue Apr 30, 2024
9f6d80f
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-com…
Bughue Jun 24, 2024
237aeb5
style
Bughue Jun 24, 2024
15dc0d6
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
c5652de
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
91c65c1
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
e0f3d37
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
0cf1328
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
d7b734f
Merge branch '2.x' into 240627
funky-eyes Jun 27, 2024
76fc55f
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
f8c9ef3
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
5dd79e2
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
a9ed614
remove
funky-eyes Jun 27, 2024
e23efb6
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
f4b6b72
optimize: select channel handles based on protocol versions
funky-eyes Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Add changes here for all PR submitted to the 2.x branch.
<!-- Please add the `changes` to the following location(feature/bugfix/optimize/test) based on the type of PR -->

### feature:

- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support

### bugfix:
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager
Expand All @@ -16,8 +16,10 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version
- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies
- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] select channel handles based on protocol versions
- [[#6523](https://github.com/apache/incubator-seata/pull/6523)] upgrade alibaba/druid version to 1.2.20


### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response

Expand All @@ -36,6 +38,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [liuqiufeng](https://github.com/liuqiufeng)
- [God-Gan](https://github.com/God-Gan)
- [Bughue](https://github.com/Bughue)
- [funky-eyes](https://github.com/funky-eyes)
- [tanyaofei](https://github.com/tanyaofei)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
4 changes: 4 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<!-- 请根据PR的类型添加 `变更记录` 到以下对应位置(feature/bugfix/optimize/test) 下 -->

### feature:
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容

### bugfix:
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题
Expand All @@ -15,8 +16,10 @@
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖
- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] 根据协议版本指定channel handle
- [[#6523](https://github.com/apache/incubator-seata/pull/6523)] 升级 alibaba/druid 的版本到1.2.20


### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应

Expand All @@ -34,6 +37,7 @@
- [liuqiufeng](https://github.com/liuqiufeng)
- [God-Gan](https://github.com/God-Gan)
- [Bughue](https://github.com/Bughue)
- [funky-eyes](https://github.com/funky-eyes)
- [tanyaofei](https://github.com/tanyaofei)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,8 @@ class ClientHandler extends ChannelDuplexHandler {

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
if (msg instanceof RpcMessage) {
processMessage(ctx, (RpcMessage)msg);
} else {
LOGGER.error("rpcMessage type error");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo
if (channel == null) {
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
}
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}

Expand All @@ -78,7 +78,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti
if (channel == null) {
throw new RuntimeException("client is not connected");
}
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}

Expand All @@ -87,7 +87,7 @@ public void sendAsyncRequest(Channel channel, Object msg) {
if (channel == null) {
throw new RuntimeException("client is not connected");
}
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
super.sendAsync(channel, rpcMessage);
}

Expand All @@ -98,7 +98,7 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg
clientChannel = ChannelManager.getSameClientChannel(channel);
}
if (clientChannel != null) {
RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, msg instanceof HeartbeatMessage
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
: ProtocolConstants.MSGTYPE_RESPONSE);
super.sendAsync(clientChannel, rpcMsg);
Expand All @@ -108,21 +108,6 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg
}


private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) {
RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, msg, messageType);
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
rpcMessage.setOtherSideVersion(rpcContext.getVersion());
return rpcMessage;
}

protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) {
RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType);
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
rpcMessage.setOtherSideVersion(rpcContext.getVersion());
return rpcMessage;
}


@Override
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
Expand Down Expand Up @@ -179,10 +164,8 @@ class ServerHandler extends ChannelDuplexHandler {
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
if (msg instanceof RpcMessage) {
processMessage(ctx, (RpcMessage)msg);
} else {
LOGGER.error("rpcMessage type error");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0;
import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,17 +55,26 @@
* <li>Body Length: Full Length - Head Length</li>
* </p>
*/
public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {
public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder {

private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
private static Map<Byte, ProtocolDecoder> protocolDecoderMap;
private static final Logger LOGGER = LoggerFactory.getLogger(MultiProtocolDecoder.class);
private final Map<Byte, ProtocolDecoder> protocolDecoderMap;

public CompatibleProtocolDecoder() {
private final Map<Byte, ProtocolEncoder> protocolEncoderMap;

private final ChannelHandler[] channelHandlers;

public MultiProtocolDecoder(ChannelHandler... channelHandlers) {
// default is 8M
this(ProtocolConstants.MAX_FRAME_LENGTH);
this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers);
}

public CompatibleProtocolDecoder(int maxFrameLength) {
public MultiProtocolDecoder() {
// default is 8M
this(ProtocolConstants.MAX_FRAME_LENGTH, null);
}

public MultiProtocolDecoder(int maxFrameLength, ChannelHandler[] channelHandlers) {
/*
int maxFrameLength,
int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3
Expand All @@ -71,10 +83,13 @@ int lengthFieldLength, FullLength is int(4B). so values is 4
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
*/
super(maxFrameLength, 3, 4, -7, 0);
protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
.put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
.build();
this.protocolDecoderMap =
ImmutableMap.<Byte, ProtocolDecoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()).build();
this.protocolEncoderMap =
ImmutableMap.<Byte, ProtocolEncoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
.put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()).build();
this.channelHandlers = channelHandlers;
}

@Override
Expand All @@ -93,16 +108,23 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception

if (decoded instanceof ByteBuf) {
frame = (ByteBuf) decoded;
ProtocolDecoder decoder = protocolDecoderMap.get(version);
ProtocolEncoder encoder = protocolEncoderMap.get(version);
try {
ProtocolDecoder decoder = protocolDecoderMap.get(version);
if (decoder == null) {
if (decoder == null || encoder == null) {
throw new UnsupportedOperationException("Unsupported version: " + version);
}
return decoder.decodeFrame(frame);
} finally {
if (version != ProtocolConstants.VERSION_0) {
frame.release();
}
ctx.pipeline().addLast((ChannelHandler)decoder);
ctx.pipeline().addLast((ChannelHandler)encoder);
if (channelHandlers != null) {
ctx.pipeline().addLast(channelHandlers);
}
ctx.pipeline().remove(this);
}
}
} catch (Exception exx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.core.rpc.RemotingBootstrap;
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,12 +130,12 @@ public void start() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
pipeline
.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new CompatibleProtocolDecoder())
.addLast(new CompatibleProtocolEncoder());
.addLast(new ProtocolDecoderV1())
.addLast(new ProtocolEncoderV1());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,10 @@ public void start() {
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new CompatibleProtocolDecoder())
.addLast(new CompatibleProtocolEncoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}

MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers);
ch.pipeline()
.addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(multiProtocolDecoder);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package org.apache.seata.core.rpc.netty;

import io.netty.buffer.ByteBuf;
import org.apache.seata.core.protocol.RpcMessage;

/**
* the protocol decoder
*
**/
public interface ProtocolDecoder {

ProtocolRpcMessage decodeFrame(ByteBuf in);
RpcMessage decodeFrame(ByteBuf in);

}
Loading
Loading