From 39d88c1d17c15698f2825ff54f664dfba07469cf Mon Sep 17 00:00:00 2001
From: "minghua.xie"
+ *
+ * (> 0.7.0)
+ * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
+ * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId |
+ * | code |colVer| (head+body) | Length |Type |lizer|ess | |
+ * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+ *
+ * (<= 0.7.0)
+ * 0 1 2 3 4 6 8 10 12 14
+ * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * | 0xdada | flag | typecode/ | requestid |
+ * | | | bodylength| |
+ * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+ *
+ *
+ *
+ *+ *
+ *
+ * seata-version < 0.7 + * Only used in TC receives a request from RM/TM. + * 0 1 2 3 4 6 8 10 12 14 16 + * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ + * | 0xdada | flag | typecode/ | requestid | | + * | | | bodylength| | | + * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+ + + * | ... ... | + * + + + * | body | + * + + + * | ... ... | + * +-----------------------------------------------------------------------------------------------+ + * + *+ *
+ *
+ * seata-version < 0.7 + * Only used in TC send a request to RM/TM. + * 0 1 2 3 4 6 8 10 12 14 16 + * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ + * | 0xdada | flag | typecode/ | requestid | | + * | | | bodylength| | | + * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+ + + * | ... ... | + * + + + * | body | + * + + + * | ... ... | + * +-----------------------------------------------------------------------------------------------+ + * + *+ *
+ *
+ * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 + * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ + * | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId | + * | code |colVer| (head+body) | Length |Type |lizer|ess | | + * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ + * | | + * | Head Map [Optional] | + * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ + * | | + * | body | + * | | + * | ... ... | + * +-----------------------------------------------------------------------------------------------+ + *+ *
+ *
- * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ - * | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId | - * | code |colVer| (head+body) | Length |Type |lizer|ess | | - * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ - * | | - * | Head Map [Optional] | - * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ - * | | - * | body | - * | | - * | ... ... | - * +-----------------------------------------------------------------------------------------------+ - *- *
- *
- *+ * Compatible Protocol Encoder *
*
@@ -62,6 +64,13 @@ public class ProtocolDecoderV1 implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); + private final SetsupportDeSerializerTypes; + + public ProtocolDecoderV1() { + supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); + if (supportDeSerializerTypes.isEmpty()) { + throw new IllegalArgumentException("No serializer found"); + } } @Override public ProtocolRpcMessage decodeFrame(ByteBuf frame) { @@ -106,8 +115,13 @@ public ProtocolRpcMessage decodeFrame(ByteBuf frame) { frame.readBytes(bs); Compressor compressor = CompressorFactory.getCompressor(compressorType); bs = compressor.decompress(bs); - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), version); - rpcMessage.setBody(serializer.deserialize(bs)); + SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec()); + if (this.supportDeSerializerTypes.contains(protocolType)) { + Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1); + rpcMessage.setBody(serializer.deserialize(bs)); + } else { + throw new IllegalArgumentException("SerializerType not match"); + } } } From 7add8d907c1822db31fdd04a84fbe2eb013b3156 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Tue, 30 Apr 2024 11:42:28 +0800 Subject: [PATCH 51/62] check style --- .../seata/core/rpc/netty/CompatibleProtocolEncoder.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java index eb0a162cd88..e588b92b8ea 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java @@ -21,7 +21,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.apache.seata.common.util.StringUtils; -import org.apache.seata.core.protocol.AbstractIdentifyRequest; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.Version; @@ -39,7 +38,6 @@ * Head Length: include head data from magic code to head map. *Body Length: Full Length - Head Length * - * */ public class CompatibleProtocolEncoder extends MessageToByteEncoder { @@ -61,7 +59,7 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { if (msg instanceof RpcMessage) { RpcMessage rpcMessage = (RpcMessage) msg; String sdkVersion = rpcMessage.getOtherSideVersion(); - if(StringUtils.isBlank(sdkVersion)){ + if (StringUtils.isBlank(sdkVersion)) { sdkVersion = Version.getCurrent(); } byte protocolVersion = Version.calcProtocolVersion(sdkVersion); From aeb24c8023fa358e717d84420ef5fb31cca37ff0 Mon Sep 17 00:00:00 2001 From: "minghua.xie"Date: Tue, 30 Apr 2024 11:49:41 +0800 Subject: [PATCH 52/62] check style --- .../apache/seata/core/rpc/processor/server/RegTmProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java index b63ae2d84bc..6090232c6c4 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java @@ -90,7 +90,7 @@ private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); if (isSuccess && LOGGER.isInfoEnabled()) { LOGGER.info("TM register success,message:{},channel:{},client version:{},client protocol-version:{}" - , message, ctx.channel(), message.getVersion(), rpcMessage.getSdkVersion()); + , message, ctx.channel(), message.getVersion(), rpcMessage.getOtherSideVersion()); } } From 237aeb5713f71d37ff2b77f49af543f6f1273409 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 24 Jun 2024 12:01:01 +0800 Subject: [PATCH 53/62] style --- .../org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index 25a8a3a2b05..9ca49779443 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -16,8 +16,8 @@ */ package org.apache.seata.core.rpc.netty.v1; +import java.util.List; import java.util.Map; -import java.util.Set; import io.netty.buffer.ByteBuf; import org.apache.seata.core.compressor.Compressor; @@ -64,7 +64,7 @@ public class ProtocolDecoderV1 implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); - private final Set supportDeSerializerTypes; + private final List supportDeSerializerTypes; public ProtocolDecoderV1() { supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); From 15dc0d6e67dd77d8e17f70d79250fb84f3139f5a Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 10:57:29 +0800 Subject: [PATCH 54/62] optimize: select channel handles based on protocol versions --- .../seata/core/protocol/RpcMessage.java | 10 --- .../netty/AbstractNettyRemotingServer.java | 10 +-- .../rpc/netty/AbstractProtocolDecoder.java | 47 +++++++++++ .../rpc/netty/CompatibleProtocolEncoder.java | 79 ------------------- ...Decoder.java => MultiProtocolDecoder.java} | 42 +++++++--- .../core/rpc/netty/NettyClientBootstrap.java | 6 +- .../core/rpc/netty/NettyServerBootstrap.java | 8 +- .../core/rpc/netty/v0/ProtocolDecoderV0.java | 8 +- .../core/rpc/netty/v0/ProtocolEncoderV0.java | 17 +++- .../core/rpc/netty/v1/ProtocolDecoderV1.java | 37 ++++++++- .../core/rpc/netty/v1/ProtocolEncoderV1.java | 18 ++++- .../rpc/processor/server/RegTmProcessor.java | 4 +- .../core/rpc/netty/v1/ProtocolV1Client.java | 7 +- .../core/rpc/netty/v1/ProtocolV1Server.java | 11 +-- 14 files changed, 168 insertions(+), 136 deletions(-) create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java rename core/src/main/java/org/apache/seata/core/rpc/netty/{CompatibleProtocolDecoder.java => MultiProtocolDecoder.java} (77%) diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java index 6a7e4d9da50..4f0963b20f7 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java +++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java @@ -35,8 +35,6 @@ public class RpcMessage implements Serializable { private Map headMap = new HashMap<>(); private Object body; - private String otherSideVersion; - /** * Gets id. * @@ -171,14 +169,6 @@ public void setMessageType(byte messageType) { this.messageType = messageType; } - public String getOtherSideVersion() { - return otherSideVersion; - } - - public void setOtherSideVersion(String otherSideVersion) { - this.otherSideVersion = otherSideVersion; - } - @Override public String toString() { return StringUtils.toString(this); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 9be9e79c3b2..5ca26300c90 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -109,17 +109,11 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) { - RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, msg, messageType); - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - rpcMessage.setOtherSideVersion(rpcContext.getVersion()); - return rpcMessage; + return super.buildResponseMessage(fromRpcMessage, msg, messageType); } protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) { - RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType); - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - rpcMessage.setOtherSideVersion(rpcContext.getVersion()); - return rpcMessage; + return super.buildRequestMessage(msg, messageType); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java new file mode 100644 index 00000000000..b1c238089e9 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java @@ -0,0 +1,47 @@ +package org.apache.seata.core.rpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.seata.core.exception.DecodeException; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractProtocolDecoder extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { + + protected Logger logger = LoggerFactory.getLogger(getClass()); + + public AbstractProtocolDecoder() { + /* + int maxFrameLength, + int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 + int lengthFieldLength, FullLength is int(4B). so values is 4 + int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 + int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 + */ + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + logger.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + + +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java deleted file mode 100644 index e588b92b8ea..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty; - -import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.protocol.RpcMessage; -import org.apache.seata.core.protocol.Version; -import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; -import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Compatible Protocol Encoder - * - *
Full Length: include all data - *Head Length: include head data from magic code to head map. - *Body Length: Full Length - Head Length - * - */ -public class CompatibleProtocolEncoder extends MessageToByteEncoder { - - private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolEncoder.class); - - private static MapprotocolEncoderMap; - - public CompatibleProtocolEncoder() { - super(); - protocolEncoderMap = ImmutableMap. builder() - .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) - .build(); - } - - @Override - public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { - try { - if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; - String sdkVersion = rpcMessage.getOtherSideVersion(); - if (StringUtils.isBlank(sdkVersion)) { - sdkVersion = Version.getCurrent(); - } - byte protocolVersion = Version.calcProtocolVersion(sdkVersion); - ProtocolEncoder encoder = protocolEncoderMap.get(protocolVersion); - if (encoder == null) { - throw new UnsupportedOperationException("Unsupported protocolVersion: " + protocolVersion); - } - - encoder.encode(rpcMessage, out); - } else { - throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); - } - } catch (Throwable e) { - LOGGER.error("Encode request error!", e); - } - } -} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java similarity index 77% rename from core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java rename to core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java index d066984c23b..ab0fb137625 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java @@ -18,12 +18,15 @@ import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0; +import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; +import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,17 +55,26 @@ * Body Length: Full Length - Head Length * */ -public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { +public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder { - private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class); - private static MapprotocolDecoderMap; + private static final Logger LOGGER = LoggerFactory.getLogger(MultiProtocolDecoder.class); + private final Map protocolDecoderMap; - public CompatibleProtocolDecoder() { + private final Map protocolEncoderMap; + + private final ChannelHandler[] channelHandlers; + + public MultiProtocolDecoder(ChannelHandler... channelHandlers) { // default is 8M - this(ProtocolConstants.MAX_FRAME_LENGTH); + this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers); } - public CompatibleProtocolDecoder(int maxFrameLength) { + public MultiProtocolDecoder() { + // default is 8M + this(ProtocolConstants.MAX_FRAME_LENGTH, null); + } + + public MultiProtocolDecoder(int maxFrameLength, ChannelHandler[] channelHandlers) { /* int maxFrameLength, int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 @@ -71,10 +83,15 @@ int lengthFieldLength, FullLength is int(4B). so values is 4 int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 */ super(maxFrameLength, 3, 4, -7, 0); - protocolDecoderMap = ImmutableMap. builder() + this.protocolDecoderMap = ImmutableMap. builder() .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()) .build(); + this.protocolEncoderMap = ImmutableMap. builder() + .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) + .build(); + this.channelHandlers = channelHandlers; } @Override @@ -93,9 +110,10 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception if (decoded instanceof ByteBuf) { frame = (ByteBuf) decoded; + ProtocolDecoder decoder = protocolDecoderMap.get(version); + ProtocolEncoder encoder = protocolEncoderMap.get(version); try { - ProtocolDecoder decoder = protocolDecoderMap.get(version); - if (decoder == null) { + if (decoder == null || encoder == null) { throw new UnsupportedOperationException("Unsupported version: " + version); } return decoder.decodeFrame(frame); @@ -103,6 +121,12 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception if (version != ProtocolConstants.VERSION_0) { frame.release(); } + ctx.pipeline().addLast((ChannelHandler)decoder); + ctx.pipeline().addLast((ChannelHandler)encoder); + if (channelHandlers != null) { + ctx.pipeline().addLast(channelHandlers); + } + ctx.pipeline().remove(this); } } } catch (Exception exx) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 4867f86bcf8..847ba0aa466 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -35,6 +35,8 @@ import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.rpc.RemotingBootstrap; +import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; +import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,8 +134,8 @@ public void initChannel(SocketChannel ch) { new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) - .addLast(new CompatibleProtocolDecoder()) - .addLast(new CompatibleProtocolEncoder()); + .addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index b847b2a96dd..35b0bdfc675 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -158,12 +158,10 @@ public void start() { .childHandler(new ChannelInitializer () { @Override public void initChannel(SocketChannel ch) { + MultiProtocolDecoder multiProtocolDecoder = + new MultiProtocolDecoder(channelHandlers); ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) - .addLast(new CompatibleProtocolDecoder()) - .addLast(new CompatibleProtocolEncoder()); - if (channelHandlers != null) { - addChannelPipelineLast(ch, channelHandlers); - } + .addLast(multiProtocolDecoder); } }); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java index 42e112a2f61..ca0776d044c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java @@ -20,7 +20,7 @@ import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.rpc.netty.ProtocolDecoder; +import org.apache.seata.core.rpc.netty.AbstractProtocolDecoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -53,10 +53,13 @@ * * @see ProtocolEncoderV0 */ -public class ProtocolDecoderV0 implements ProtocolDecoder { +public class ProtocolDecoderV0 extends AbstractProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV0.class); + + public ProtocolDecoderV0() { + } @Override public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) { @@ -128,5 +131,4 @@ public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) { return rpcMessage; } - } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java index 3fc447b2818..4f7163a9658 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java @@ -17,6 +17,8 @@ package org.apache.seata.core.rpc.netty.v0; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.MessageTypeAware; import org.apache.seata.core.protocol.ProtocolConstants; @@ -54,7 +56,7 @@ * * @see ProtocolDecoderV0 */ -public class ProtocolEncoderV0 implements ProtocolEncoder { +public class ProtocolEncoderV0 extends MessageToByteEncoder implements ProtocolEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV0.class); @@ -103,4 +105,17 @@ public void encode(RpcMessage message, ByteBuf out) { LOGGER.error("Encode request error!", e); } } + + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + try { + if (msg instanceof RpcMessage) { + encode((RpcMessage)msg, out); + } else { + throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); + } + } catch (Throwable e) { + LOGGER.error("Encode request error!", e); + } + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index 9ca49779443..8d6eb3abfc6 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -20,8 +20,11 @@ import java.util.Map; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.seata.core.compressor.Compressor; import org.apache.seata.core.compressor.CompressorFactory; +import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.rpc.netty.ProtocolDecoder; @@ -61,16 +64,25 @@ * @see ProtocolEncoderV1 * @since 0.7.0 */ -public class ProtocolDecoderV1 implements ProtocolDecoder { +public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder{ private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); private final List supportDeSerializerTypes; public ProtocolDecoderV1() { + /* + int maxFrameLength, + int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 + int lengthFieldLength, FullLength is int(4B). so values is 4 + int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 + int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 + */ + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0); supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); if (supportDeSerializerTypes.isEmpty()) { throw new IllegalArgumentException("No serializer found"); - } } + } + } @Override public ProtocolRpcMessage decodeFrame(ByteBuf frame) { @@ -127,4 +139,25 @@ public ProtocolRpcMessage decodeFrame(ByteBuf frame) { return rpcMessage; } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java index 14cbcdb55da..baa6531bbe4 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java @@ -17,6 +17,8 @@ package org.apache.seata.core.rpc.netty.v1; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; import org.apache.seata.core.rpc.netty.ProtocolEncoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.compressor.Compressor; @@ -57,7 +59,7 @@ * @see ProtocolDecoderV1 * @since 0.7.0 */ -public class ProtocolEncoderV1 implements ProtocolEncoder { +public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV1.class); @@ -119,4 +121,18 @@ public void encode(RpcMessage message, ByteBuf out) { throw e; } } + + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + try { + if (msg instanceof RpcMessage) { + this.encode((RpcMessage)msg, out); + } else { + throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); + } + } catch (Throwable e) { + LOGGER.error("Encode request error!", e); + } + } + } diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java index 6090232c6c4..0afee867f8b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java @@ -89,8 +89,8 @@ private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { } remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); if (isSuccess && LOGGER.isInfoEnabled()) { - LOGGER.info("TM register success,message:{},channel:{},client version:{},client protocol-version:{}" - , message, ctx.channel(), message.getVersion(), rpcMessage.getOtherSideVersion()); + LOGGER.info("TM register success,message:{},channel:{},client version:{}", message, ctx.channel(), + message.getVersion()); } } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java index 3f52ed63c55..a052f3718bc 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java @@ -45,8 +45,7 @@ import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; import org.apache.seata.core.serializer.SerializerType; -import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder; -import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,9 +81,7 @@ public void connect(String host, int port, int connectTimeout) { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(new CompatibleProtocolEncoder()); - pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 * 1024)); - pipeline.addLast(new ClientChannelHandler(ProtocolV1Client.this)); + pipeline.addLast(new MultiProtocolDecoder(new ClientChannelHandler(ProtocolV1Client.this))); } }); // Bind and start to accept incoming connections. diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java index 20a2fdbcc57..6f997c9fbbd 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java @@ -35,12 +35,7 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder; -import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder; - -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; /** */ @@ -73,9 +68,7 @@ public void start() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 * 1024)); - pipeline.addLast(new CompatibleProtocolEncoder()); - pipeline.addLast(new ServerChannelHandler()); + pipeline.addLast(new MultiProtocolDecoder(new ServerChannelHandler())); } }); From c5652de476b62d058553ef1d82cdbba98d480b3e Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 11:06:17 +0800 Subject: [PATCH 55/62] optimize: select channel handles based on protocol versions --- .../rpc/netty/AbstractProtocolDecoder.java | 64 ++++++++++++------- .../core/rpc/netty/MultiProtocolDecoder.java | 16 ++--- .../core/rpc/netty/NettyClientBootstrap.java | 4 +- .../core/rpc/netty/NettyServerBootstrap.java | 7 +- .../core/rpc/netty/v0/MessageCodecV0.java | 44 ------------- .../core/rpc/netty/v1/ProtocolDecoderV1.java | 2 +- 6 files changed, 53 insertions(+), 84 deletions(-) delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java index b1c238089e9..82735451964 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.seata.core.rpc.netty; import io.netty.buffer.ByteBuf; @@ -10,38 +26,38 @@ public abstract class AbstractProtocolDecoder extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { - protected Logger logger = LoggerFactory.getLogger(getClass()); + protected Logger logger = LoggerFactory.getLogger(getClass()); - public AbstractProtocolDecoder() { - /* + public AbstractProtocolDecoder() { + /* int maxFrameLength, int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 int lengthFieldLength, FullLength is int(4B). so values is 4 int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 */ - super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4); - } + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4); + } - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - Object decoded; - try { - decoded = super.decode(ctx, in); - if (decoded instanceof ByteBuf) { - ByteBuf frame = (ByteBuf)decoded; - try { - return decodeFrame(frame); - } finally { - frame.release(); - } - } - } catch (Exception exx) { - logger.error("Decode frame error, cause: {}", exx.getMessage()); - throw new DecodeException(exx); - } - return decoded; - } + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + logger.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java index ab0fb137625..9bd95503697 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java @@ -83,15 +83,13 @@ int lengthFieldLength, FullLength is int(4B). so values is 4 int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 */ super(maxFrameLength, 3, 4, -7, 0); - this.protocolDecoderMap = ImmutableMap. builder() - .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()) - .build(); - this.protocolEncoderMap = ImmutableMap. builder() - .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) - .build(); - this.channelHandlers = channelHandlers; + this.protocolDecoderMap = + ImmutableMap. builder().put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()).build(); + this.protocolEncoderMap = + ImmutableMap. builder().put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()).build(); + this.channelHandlers = channelHandlers; } @Override diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 847ba0aa466..4aaafc0acb0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -130,8 +130,8 @@ public void start() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast( - new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + pipeline + .addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) .addLast(new ProtocolDecoderV1()) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index 35b0bdfc675..c7b2aa57c21 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -158,11 +158,10 @@ public void start() { .childHandler(new ChannelInitializer () { @Override public void initChannel(SocketChannel ch) { - MultiProtocolDecoder multiProtocolDecoder = - new MultiProtocolDecoder(channelHandlers); - ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) + MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers); + ch.pipeline() + .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) .addLast(multiProtocolDecoder); - } }); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java deleted file mode 100644 index ab1d4f74716..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty.v0; - -import io.netty.buffer.ByteBuf; -import org.apache.seata.core.protocol.MessageTypeAware; - -/** - * The interface Message codec. - * - */ -public interface MessageCodecV0 extends MessageTypeAware { - - /** - * Encode byte [ ]. - * - * @return the byte [ ] - */ - byte[] encode(); - - /** - * Decode boolean. - * - * @param in the in - * @return the boolean - */ - boolean decode(ByteBuf in); - - boolean decode(ByteBuf in, T req); -} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index 8d6eb3abfc6..1ef84ea86b0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -64,7 +64,7 @@ * @see ProtocolEncoderV1 * @since 0.7.0 */ -public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder{ +public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); private final List supportDeSerializerTypes; From 91c65c18cc27c4b5029bb407711193f426534d4e Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 11:10:34 +0800 Subject: [PATCH 56/62] optimize: select channel handles based on protocol versions --- .../seata/core/rpc/netty/AbstractNettyRemotingServer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 5ca26300c90..62e2aa26b4b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -109,11 +109,11 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) { - return super.buildResponseMessage(fromRpcMessage, msg, messageType); + return super.buildResponseMessage(fromRpcMessage, msg, messageType); } protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) { - return super.buildRequestMessage(msg, messageType); + return super.buildRequestMessage(msg, messageType); } From e0f3d378af96d963bb802ccd450a0dcfb5a87727 Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 13:49:44 +0800 Subject: [PATCH 57/62] optimize: select channel handles based on protocol versions --- .../netty/AbstractNettyRemotingClient.java | 3 +-- .../netty/AbstractNettyRemotingServer.java | 21 +++++-------------- .../seata/core/rpc/netty/ProtocolDecoder.java | 3 ++- .../core/rpc/netty/v0/ProtocolDecoderV0.java | 7 ++++--- .../core/rpc/netty/v1/ProtocolDecoderV1.java | 6 +++--- .../rpc/netty/v1/ClientChannelHandler.java | 4 ++-- .../core/rpc/netty/v1/ProtocolV1Client.java | 6 ++++-- .../netty/v1/ProtocolV1SerializerTest.java | 8 +++---- .../core/rpc/netty/v1/ProtocolV1Server.java | 10 +++------ .../rpc/netty/v1/ServerChannelHandler.java | 6 ++---- 10 files changed, 30 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2901eb8d3f5..be97b5ade40 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -410,8 +410,7 @@ class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { RpcMessage rpcMessage = null; - if (msg instanceof ProtocolRpcMessage) { - rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + if (msg instanceof RpcMessage) { processMessage(ctx, rpcMessage); } else { LOGGER.error("rpcMessage type error"); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 62e2aa26b4b..39edcaca8ad 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -69,7 +69,7 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } - RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @@ -78,7 +78,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @@ -87,7 +87,7 @@ public void sendAsyncRequest(Channel channel, Object msg) { if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); super.sendAsync(channel, rpcMessage); } @@ -98,7 +98,7 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg clientChannel = ChannelManager.getSameClientChannel(channel); } if (clientChannel != null) { - RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, msg instanceof HeartbeatMessage + RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE); super.sendAsync(clientChannel, rpcMsg); @@ -107,16 +107,6 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg } } - - private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) { - return super.buildResponseMessage(fromRpcMessage, msg, messageType); - } - - protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) { - return super.buildRequestMessage(msg, messageType); - } - - @Override public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) { Pair pair = new Pair<>(processor, executor); @@ -174,8 +164,7 @@ class ServerHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { RpcMessage rpcMessage = null; - if (msg instanceof ProtocolRpcMessage) { - rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + if (msg instanceof RpcMessage) { processMessage(ctx, rpcMessage); } else { LOGGER.error("rpcMessage type error"); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java index 42a7c75c04f..d28506fd841 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java @@ -17,6 +17,7 @@ package org.apache.seata.core.rpc.netty; import io.netty.buffer.ByteBuf; +import org.apache.seata.core.protocol.RpcMessage; /** * the protocol decoder @@ -24,6 +25,6 @@ **/ public interface ProtocolDecoder { - ProtocolRpcMessage decodeFrame(ByteBuf in); + RpcMessage decodeFrame(ByteBuf in); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java index ca0776d044c..76e9beb29d6 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java @@ -20,6 +20,7 @@ import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.rpc.netty.AbstractProtocolDecoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; @@ -62,7 +63,7 @@ public ProtocolDecoderV0() { } @Override - public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) { + public RpcMessage decodeFrame(ByteBuf in) { ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0(); if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) { throw new IllegalArgumentException("Nothing to decode."); @@ -96,7 +97,7 @@ public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) { rpcMessage.setBody(HeartbeatMessage.PONG); } - return rpcMessage; + return rpcMessage.protocolMsg2RpcMsg(); } if (bodyLength > 0 && in.readableBytes() < bodyLength) { @@ -128,7 +129,7 @@ public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" + msgId); } - return rpcMessage; + return rpcMessage.protocolMsg2RpcMsg(); } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index 1ef84ea86b0..2d2d01fce55 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -27,8 +27,8 @@ import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.rpc.netty.ProtocolDecoder; -import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -85,7 +85,7 @@ int lengthFieldLength, FullLength is int(4B). so values is 4 } @Override - public ProtocolRpcMessage decodeFrame(ByteBuf frame) { + public RpcMessage decodeFrame(ByteBuf frame) { byte b0 = frame.readByte(); byte b1 = frame.readByte(); if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 @@ -137,7 +137,7 @@ public ProtocolRpcMessage decodeFrame(ByteBuf frame) { } } - return rpcMessage; + return rpcMessage.protocolMsg2RpcMsg(); } @Override diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java index e35c124e306..14709e5f06c 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java @@ -50,8 +50,8 @@ public void channelInactive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof ProtocolRpcMessage) { - RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + if (msg instanceof RpcMessage) { + RpcMessage rpcMessage = (RpcMessage)msg; int msgId = rpcMessage.getId(); DefaultPromise future = (DefaultPromise) client.futureMap.remove(msgId); if (future != null) { diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java index a052f3718bc..7b8694e182a 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java @@ -45,7 +45,6 @@ import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; import org.apache.seata.core.serializer.SerializerType; -import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +80,10 @@ public void connect(String host, int port, int connectTimeout) { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(new MultiProtocolDecoder(new ClientChannelHandler(ProtocolV1Client.this))); + pipeline + .addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); + pipeline.addLast(new ClientChannelHandler(ProtocolV1Client.this)); } }); // Bind and start to accept incoming connections. diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java index 5ee0df7dee2..33a693311ac 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java @@ -27,8 +27,8 @@ import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; -import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -66,7 +66,7 @@ public void testAll() { body.setXid("xid-1234"); // test run times - int runTimes = 100000; + int runTimes = 100; final int threads = 50; final CountDownLatch cnt = new CountDownLatch(runTimes); @@ -80,7 +80,7 @@ public void testAll() { while (tag.getAndIncrement() < runTimes) { try { Future future = client.sendRpc(head, body); - ProtocolRpcMessage resp = (ProtocolRpcMessage) future.get(10, TimeUnit.SECONDS); + RpcMessage resp = (RpcMessage)future.get(10, TimeUnit.SECONDS); if (resp != null) { success.incrementAndGet(); } @@ -93,7 +93,7 @@ public void testAll() { }); } - cnt.await(); + cnt.await(10,TimeUnit.SECONDS); LOGGER.info("success {}/{}", success.get(), runTimes); Assertions.assertEquals(success.get(), runTimes); } catch (InterruptedException e) { diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java index 6f997c9fbbd..264ebd8eaf2 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java @@ -75,13 +75,9 @@ protected void initChannel(Channel channel) throws Exception { String host = "0.0.0.0"; ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(host, port)); - ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - throw new RuntimeException("Server start fail !", future.cause()); - } + ChannelFuture channelFuture = future.addListener((ChannelFutureListener)future1 -> { + if (!future1.isSuccess()) { + throw new RuntimeException("Server start fail !", future1.cause()); } }); diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java index 8b468d0e8ff..9a8fff05824 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -39,10 +39,8 @@ public class ServerChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); - - if (msg instanceof ProtocolRpcMessage) { - RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); - channel.writeAndFlush(rpcMessage); + if (msg instanceof RpcMessage) { + channel.writeAndFlush(msg); } else { LOGGER.error("rpcMessage type error"); } From 0cf1328de09488f38acea547aab8a37075ebad09 Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 14:00:12 +0800 Subject: [PATCH 58/62] optimize: select channel handles based on protocol versions --- changes/en-us/2.x.md | 5 ++++- changes/zh-cn/2.x.md | 3 +++ .../seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 7941cbbf60a..abd08255b13 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -3,7 +3,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: - +- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager @@ -14,6 +14,8 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version - [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies +- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] select channel handles based on protocol versions + ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response @@ -35,5 +37,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [YeonCheolGit](https://github.com/YeonCheolGit) - [liuqiufeng](https://github.com/liuqiufeng) - [Bughue](https://github.com/Bughue) +- [funky-eyes](https://github.com/funky-eyes) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 25ff7a0587c..474d813b106 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -3,6 +3,7 @@ ### feature: +- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容 ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题 @@ -13,6 +14,7 @@ - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化 - [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖 +- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] 根据协议版本指定channel handle ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应 @@ -31,5 +33,6 @@ - [YeonCheolGit](https://github.com/YeonCheolGit) - [liuqiufeng](https://github.com/liuqiufeng) - [Bughue](https://github.com/Bughue) +- [funky-eyes](https://github.com/funky-eyes) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java index 33a693311ac..ac75db20c5f 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java @@ -66,7 +66,7 @@ public void testAll() { body.setXid("xid-1234"); // test run times - int runTimes = 100; + int runTimes = 100000; final int threads = 50; final CountDownLatch cnt = new CountDownLatch(runTimes); @@ -96,6 +96,7 @@ public void testAll() { cnt.await(10,TimeUnit.SECONDS); LOGGER.info("success {}/{}", success.get(), runTimes); Assertions.assertEquals(success.get(), runTimes); + service1.shutdown(); } catch (InterruptedException e) { LOGGER.error("Thread interrupted", e); } finally { From 76fc55f5ff2ecc69e04237916bebc358924c4cd0 Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 14:08:12 +0800 Subject: [PATCH 59/62] optimize: select channel handles based on protocol versions --- .../seata/core/rpc/netty/AbstractNettyRemotingClient.java | 3 +-- .../seata/core/rpc/netty/AbstractNettyRemotingServer.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index be97b5ade40..248e8f48f6d 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -409,9 +409,8 @@ class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - RpcMessage rpcMessage = null; if (msg instanceof RpcMessage) { - processMessage(ctx, rpcMessage); + processMessage(ctx, (RpcMessage)msg); } else { LOGGER.error("rpcMessage type error"); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 39edcaca8ad..cf66c0b4562 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -163,9 +163,8 @@ class ServerHandler extends ChannelDuplexHandler { */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - RpcMessage rpcMessage = null; if (msg instanceof RpcMessage) { - processMessage(ctx, rpcMessage); + processMessage(ctx, (RpcMessage)msg); } else { LOGGER.error("rpcMessage type error"); } From a9ed6148924e9ca5e57e3689876b35ff4df7251f Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 14:38:55 +0800 Subject: [PATCH 60/62] remove --- .../rpc/netty/CompatibleProtocolDecoder.java | 155 ------------------ .../rpc/netty/CompatibleProtocolEncoder.java | 79 --------- 2 files changed, 234 deletions(-) delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java deleted file mode 100644 index d066984c23b..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty; - -import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.seata.core.exception.DecodeException; -import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0; -import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * - * (> 0.7.0) - * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 - * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ - * | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId | - * | code |colVer| (head+body) | Length |Type |lizer|ess | | - * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ - * - * (<= 0.7.0) - * 0 1 2 3 4 6 8 10 12 14 - * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ - * | 0xdada | flag | typecode/ | requestid | - * | | | bodylength| | - * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+ - * - *- *- *
Full Length: include all data - *Head Length: include head data from magic code to head map. - *Body Length: Full Length - Head Length - * - */ -public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { - - private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class); - private static MapprotocolDecoderMap; - - public CompatibleProtocolDecoder() { - // default is 8M - this(ProtocolConstants.MAX_FRAME_LENGTH); - } - - public CompatibleProtocolDecoder(int maxFrameLength) { - /* - int maxFrameLength, - int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 - int lengthFieldLength, FullLength is int(4B). so values is 4 - int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 - int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 - */ - super(maxFrameLength, 3, 4, -7, 0); - protocolDecoderMap = ImmutableMap. builder() - .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()) - .build(); - } - - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - ByteBuf frame; - Object decoded; - byte version; - try { - if (isV0(in)) { - decoded = in; - version = ProtocolConstants.VERSION_0; - } else { - decoded = super.decode(ctx, in); - version = decideVersion(decoded); - } - - if (decoded instanceof ByteBuf) { - frame = (ByteBuf) decoded; - try { - ProtocolDecoder decoder = protocolDecoderMap.get(version); - if (decoder == null) { - throw new UnsupportedOperationException("Unsupported version: " + version); - } - return decoder.decodeFrame(frame); - } finally { - if (version != ProtocolConstants.VERSION_0) { - frame.release(); - } - } - } - } catch (Exception exx) { - LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); - throw new DecodeException(exx); - } - return decoded; - } - - protected byte decideVersion(Object in) { - if (in instanceof ByteBuf) { - ByteBuf frame = (ByteBuf) in; - frame.markReaderIndex(); - byte b0 = frame.readByte(); - byte b1 = frame.readByte(); - if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 - || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { - throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); - } - - byte version = frame.readByte(); - frame.resetReaderIndex(); - return version; - } - return -1; - } - - - protected boolean isV0(ByteBuf in) { - boolean isV0 = false; - in.markReaderIndex(); - byte b0 = in.readByte(); - byte b1 = in.readByte(); - // v1/v2/v3 : b2 = version - // v0 : 1st byte in FLAG(2byte:0x10/0x20/0x40/0x80) - byte b2 = in.readByte(); - if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0 - && ProtocolConstants.MAGIC_CODE_BYTES[1] == b1 - && 0 == b2) { - isV0 = true; - } - - in.resetReaderIndex(); - return isV0; - } - - protected boolean isV0(byte version) { - return version == ProtocolConstants.VERSION_0; - } -} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java deleted file mode 100644 index e588b92b8ea..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty; - -import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.protocol.RpcMessage; -import org.apache.seata.core.protocol.Version; -import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; -import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Compatible Protocol Encoder - * - *
Full Length: include all data - *Head Length: include head data from magic code to head map. - *Body Length: Full Length - Head Length - * - */ -public class CompatibleProtocolEncoder extends MessageToByteEncoder { - - private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolEncoder.class); - - private static MapprotocolEncoderMap; - - public CompatibleProtocolEncoder() { - super(); - protocolEncoderMap = ImmutableMap. builder() - .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) - .build(); - } - - @Override - public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { - try { - if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; - String sdkVersion = rpcMessage.getOtherSideVersion(); - if (StringUtils.isBlank(sdkVersion)) { - sdkVersion = Version.getCurrent(); - } - byte protocolVersion = Version.calcProtocolVersion(sdkVersion); - ProtocolEncoder encoder = protocolEncoderMap.get(protocolVersion); - if (encoder == null) { - throw new UnsupportedOperationException("Unsupported protocolVersion: " + protocolVersion); - } - - encoder.encode(rpcMessage, out); - } else { - throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); - } - } catch (Throwable e) { - LOGGER.error("Encode request error!", e); - } - } -} From e23efb6ac210ee63180b973b79e2eb5204396bbe Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 16:02:28 +0800 Subject: [PATCH 61/62] optimize: select channel handles based on protocol versions --- .../core/rpc/netty/v0/ProtocolDecoderV0.java | 1 - .../core/rpc/netty/v1/ProtocolDecoderV1.java | 35 ++----------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java index d051317d12f..635dbde3c98 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java @@ -59,7 +59,6 @@ public class ProtocolDecoderV0 extends AbstractProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV0.class); public ProtocolDecoderV0() { - } @Override diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index ce48b3ce8ca..8d4e8391487 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -20,15 +20,12 @@ import java.util.Map; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.seata.core.compressor.Compressor; import org.apache.seata.core.compressor.CompressorFactory; -import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; -import org.apache.seata.core.rpc.netty.ProtocolDecoder; +import org.apache.seata.core.rpc.netty.AbstractProtocolDecoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -64,20 +61,12 @@ * @see ProtocolEncoderV1 * @since 0.7.0 */ -public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { +public class ProtocolDecoderV1 extends AbstractProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); private final List supportDeSerializerTypes; public ProtocolDecoderV1() { - /* - int maxFrameLength, - int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 - int lengthFieldLength, FullLength is int(4B). so values is 4 - int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 - int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 - */ - super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0); supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); if (supportDeSerializerTypes.isEmpty()) { throw new IllegalArgumentException("No serializer found"); @@ -140,24 +129,4 @@ public RpcMessage decodeFrame(ByteBuf frame) { return rpcMessage.protocolMsg2RpcMsg(); } - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - Object decoded; - try { - decoded = super.decode(ctx, in); - if (decoded instanceof ByteBuf) { - ByteBuf frame = (ByteBuf)decoded; - try { - return decodeFrame(frame); - } finally { - frame.release(); - } - } - } catch (Exception exx) { - LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); - throw new DecodeException(exx); - } - return decoded; - } - } From f4b6b72e3fa7dad2f4ae78117780982579f7874d Mon Sep 17 00:00:00 2001 From: jianbin Date: Thu, 27 Jun 2024 16:28:45 +0800 Subject: [PATCH 62/62] optimize: select channel handles based on protocol versions --- .../rpc/netty/AbstractProtocolDecoder.java | 63 ------------------- .../core/rpc/netty/v0/ProtocolDecoderV0.java | 35 ++++++++++- .../core/rpc/netty/v1/ProtocolDecoderV1.java | 35 ++++++++++- 3 files changed, 66 insertions(+), 67 deletions(-) delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java deleted file mode 100644 index 82735451964..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractProtocolDecoder.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.seata.core.exception.DecodeException; -import org.apache.seata.core.protocol.ProtocolConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractProtocolDecoder extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { - - protected Logger logger = LoggerFactory.getLogger(getClass()); - - public AbstractProtocolDecoder() { - /* - int maxFrameLength, - int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 - int lengthFieldLength, FullLength is int(4B). so values is 4 - int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 - int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 - */ - super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4); - } - - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - Object decoded; - try { - decoded = super.decode(ctx, in); - if (decoded instanceof ByteBuf) { - ByteBuf frame = (ByteBuf)decoded; - try { - return decodeFrame(frame); - } finally { - frame.release(); - } - } - } catch (Exception exx) { - logger.error("Decode frame error, cause: {}", exx.getMessage()); - throw new DecodeException(exx); - } - return decoded; - } - - -} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java index 635dbde3c98..d14ce91ceaf 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java @@ -17,11 +17,14 @@ package org.apache.seata.core.rpc.netty.v0; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; -import org.apache.seata.core.rpc.netty.AbstractProtocolDecoder; +import org.apache.seata.core.rpc.netty.ProtocolDecoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -54,11 +57,19 @@ * * @see ProtocolEncoderV0 */ -public class ProtocolDecoderV0 extends AbstractProtocolDecoder { +public class ProtocolDecoderV0 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV0.class); public ProtocolDecoderV0() { + /* + int maxFrameLength, + int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 + int lengthFieldLength, FullLength is int(4B). so values is 4 + int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 + int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 + */ + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0); } @Override @@ -131,4 +142,24 @@ public RpcMessage decodeFrame(ByteBuf in) { return rpcMessage.protocolMsg2RpcMsg(); } + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index 8d4e8391487..ce48b3ce8ca 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -20,12 +20,15 @@ import java.util.Map; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.seata.core.compressor.Compressor; import org.apache.seata.core.compressor.CompressorFactory; +import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; -import org.apache.seata.core.rpc.netty.AbstractProtocolDecoder; +import org.apache.seata.core.rpc.netty.ProtocolDecoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -61,12 +64,20 @@ * @see ProtocolEncoderV1 * @since 0.7.0 */ -public class ProtocolDecoderV1 extends AbstractProtocolDecoder { +public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); private final List supportDeSerializerTypes; public ProtocolDecoderV1() { + /* + int maxFrameLength, + int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 + int lengthFieldLength, FullLength is int(4B). so values is 4 + int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 + int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 + */ + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0); supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); if (supportDeSerializerTypes.isEmpty()) { throw new IllegalArgumentException("No serializer found"); @@ -129,4 +140,24 @@ public RpcMessage decodeFrame(ByteBuf frame) { return rpcMessage.protocolMsg2RpcMsg(); } + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + }