From 1e9be901390fa3b58734e4c21067eec0040b0829 Mon Sep 17 00:00:00 2001 From: slievrly Date: Mon, 19 Feb 2024 19:24:35 +0800 Subject: [PATCH 1/2] optimize: optimize serialization/deserialization of protocol codec --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../core/protocol/ProtocolConstants.java | 6 ++-- .../core/rpc/netty/v1/ProtocolV1Decoder.java | 19 +++++++------ .../serializer/SerializerServiceLoader.java | 28 +++++++++++++++++++ 5 files changed, 44 insertions(+), 11 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 4d8d2bd8f41..de03d320578 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -90,6 +90,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6254](https://github.com/apache/incubator-seata/pull/6254)] optimize Hessian Serialize - [[#6332](https://github.com/apache/incubator-seata/pull/6332)] remove mysql dependency from the distribution package - [[#6343](https://github.com/apache/incubator-seata/pull/6343)] compatible with tm module and rm-datasource module +- [[#6357](https://github.com/apache/incubator-seata/pull/6357)] optimize serialization/deserialization of protocol codec ### security: - [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 4e40ab6da04..8cbffac8712 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -89,6 +89,7 @@ - [[#6332](https://github.com/apache/incubator-seata/pull/6332)] 分发包中移除 mysql 依赖 - [[#6343](https://github.com/apache/incubator-seata/pull/6343)] 兼容 TM 模块和 rm-datasource 模块 - [[#6349](https://github.com/apache/incubator-seata/pull/6349)] 迁移 dockerhub 仓库 +- [[#6357](https://github.com/apache/incubator-seata/pull/6357)] 优化协议编解码的序列化/反序列化 ### security: diff --git a/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java b/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java index 4ab61d1337c..98039180c76 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java +++ b/core/src/main/java/org/apache/seata/core/protocol/ProtocolConstants.java @@ -17,9 +17,10 @@ package org.apache.seata.core.protocol; import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.core.serializer.SerializerType; import org.apache.seata.core.compressor.CompressorType; import org.apache.seata.core.constants.ConfigurationKeys; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; /** * @since 0.7.0 @@ -75,8 +76,7 @@ public interface ProtocolConstants { * * @see SerializerType#SEATA */ - byte CONFIGURED_CODEC = SerializerType.getByName(ConfigurationFactory.getInstance() - .getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name())).getCode(); + byte CONFIGURED_CODEC = SerializerServiceLoader.getSupportedSerializers().iterator().next().getCode(); /** * Configured compressor by user, default is NONE diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java index 31944d36673..77e758af651 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java @@ -17,15 +17,13 @@ package org.apache.seata.core.rpc.netty.v1; import java.util.Map; +import java.util.Set; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.seata.config.Configuration; -import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.compressor.Compressor; import org.apache.seata.core.compressor.CompressorFactory; -import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; @@ -65,8 +63,9 @@ public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class); - private static final Configuration CONFIG = ConfigurationFactory.getInstance(); - private SerializerType serializerType; + + private final Set supportDeSerializerTypes; + public ProtocolV1Decoder() { // default is 8M @@ -82,8 +81,10 @@ int lengthFieldLength, FullLength is int(4B). so values is 4 int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 */ super(maxFrameLength, 3, 4, -7, 0); - String serializerName = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name()); - this.serializerType = SerializerType.getByName(serializerName); + supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); + if (supportDeSerializerTypes.isEmpty()) { + throw new IllegalArgumentException("No serializer found"); + } } @Override @@ -150,7 +151,7 @@ public Object decodeFrame(ByteBuf frame) { Compressor compressor = CompressorFactory.getCompressor(compressorType); bs = compressor.decompress(bs); SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec()); - if (this.serializerType.equals(protocolType)) { + if (this.supportDeSerializerTypes.contains(protocolType)) { Serializer serializer = SerializerServiceLoader.load(protocolType); rpcMessage.setBody(serializer.deserialize(bs)); } else { @@ -161,4 +162,6 @@ public Object decodeFrame(ByteBuf frame) { return rpcMessage; } + + } diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java index d13856b6173..4768e9062af 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java @@ -16,9 +16,17 @@ */ package org.apache.seata.core.serializer; +import java.util.HashSet; +import java.util.Set; + import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.loader.EnhancedServiceNotFoundException; import org.apache.seata.common.util.ReflectionUtil; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.constants.ConfigurationKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The Service Loader for the interface {@link Serializer} @@ -26,6 +34,9 @@ */ public final class SerializerServiceLoader { + private static final Logger LOGGER = LoggerFactory.getLogger(SerializerServiceLoader.class); + private static final Configuration CONFIG = ConfigurationFactory.getInstance(); + private SerializerServiceLoader() { } @@ -50,4 +61,21 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun } return EnhancedServiceLoader.load(Serializer.class, type.name()); } + + public static Set getSupportedSerializers() { + Set supportedSerializers = new HashSet<>(); + String serializerNames = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name()); + String[] serializerNameArray = serializerNames.split(","); + for (String serializerName : serializerNameArray) { + try { + SerializerType serializerType = SerializerType.getByName(serializerName); + if (serializerType != null) { + supportedSerializers.add(serializerType); + } + } catch (IllegalArgumentException ignore) { + LOGGER.warn("Invalid serializer name: " + serializerName); + } + } + return supportedSerializers; + } } From eb13bab96ce665f6ffa038d36d1866eb92326182 Mon Sep 17 00:00:00 2001 From: slievrly Date: Tue, 23 Apr 2024 00:52:54 +0800 Subject: [PATCH 2/2] add serializer default value --- .../serializer/SerializerServiceLoader.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java index 4768e9062af..f824c09a8cd 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java @@ -16,8 +16,10 @@ */ package org.apache.seata.core.serializer; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.loader.EnhancedServiceNotFoundException; @@ -28,6 +30,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.seata.core.serializer.SerializerType.HESSIAN; +import static org.apache.seata.core.serializer.SerializerType.KRYO; +import static org.apache.seata.core.serializer.SerializerType.PROTOBUF; +import static org.apache.seata.core.serializer.SerializerType.SEATA; + /** * The Service Loader for the interface {@link Serializer} * @@ -37,6 +44,10 @@ public final class SerializerServiceLoader { private static final Logger LOGGER = LoggerFactory.getLogger(SerializerServiceLoader.class); private static final Configuration CONFIG = ConfigurationFactory.getInstance(); + private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[] {SEATA, PROTOBUF, KRYO, HESSIAN}; + + private static final String SPLIT_CHAR = ","; + private SerializerServiceLoader() { } @@ -64,14 +75,13 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun public static Set getSupportedSerializers() { Set supportedSerializers = new HashSet<>(); - String serializerNames = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name()); - String[] serializerNameArray = serializerNames.split(","); + String defaultSupportSerializers = Arrays.stream(DEFAULT_SERIALIZER_TYPE).map(SerializerType::name).collect(Collectors.joining(SPLIT_CHAR)); + String serializerNames = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, defaultSupportSerializers); + String[] serializerNameArray = serializerNames.split(SPLIT_CHAR); for (String serializerName : serializerNameArray) { try { SerializerType serializerType = SerializerType.getByName(serializerName); - if (serializerType != null) { - supportedSerializers.add(serializerType); - } + supportedSerializers.add(serializerType); } catch (IllegalArgumentException ignore) { LOGGER.warn("Invalid serializer name: " + serializerName); }