Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: optimize serialization/deserialization of protocol codec #6357

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6254](https://github.com/apache/incubator-seata/pull/6254)] optimize Hessian Serialize
- [[#6332](https://github.com/apache/incubator-seata/pull/6332)] remove mysql dependency from the distribution package
- [[#6343](https://github.com/apache/incubator-seata/pull/6343)] compatible with tm module and rm-datasource module
- [[#6357](https://github.com/apache/incubator-seata/pull/6357)] optimize serialization/deserialization of protocol codec
- [[#6345](https://github.com/apache/incubator-seata/pull/6345)] compatible with tcc module
- [[#6356](https://github.com/apache/incubator-seata/pull/6356)] remove authentication from the health check page
- [[#6360](https://github.com/apache/incubator-seata/pull/6360)] optimize 401 issues for some links
Expand All @@ -125,6 +126,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6412](https://github.com/apache/incubator-seata/pull/6412)] optimize core compatible module
- [[#6429](https://github.com/apache/incubator-seata/pull/6429)] remove repetitive words


### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities
- [[#6145](https://github.com/apache/incubator-seata/pull/6145)] upgrade jettison to 1.5.4
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
- [[#6332](https://github.com/apache/incubator-seata/pull/6332)] 分发包中移除 mysql 依赖
- [[#6343](https://github.com/apache/incubator-seata/pull/6343)] 兼容 TM 模块和 rm-datasource 模块
- [[#6349](https://github.com/apache/incubator-seata/pull/6349)] 迁移 dockerhub 仓库
- [[#6357](https://github.com/apache/incubator-seata/pull/6357)] 优化协议编解码的序列化/反序列化
- [[#6356](https://github.com/apache/incubator-seata/pull/6356)] 去除健康检查页面的鉴权
- [[#6360](https://github.com/apache/incubator-seata/pull/6360)] 优化部分链接 401 的问题
- [[#6350](https://github.com/apache/incubator-seata/pull/6350)] 移除 enableDegrade 配置
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.seata.core.protocol;

import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.serializer.SerializerType;
import org.apache.seata.core.compressor.CompressorType;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;

/**
* @since 0.7.0
Expand Down Expand Up @@ -75,8 +76,7 @@ public interface ProtocolConstants {
*
* @see SerializerType#SEATA
*/
byte CONFIGURED_CODEC = SerializerType.getByName(ConfigurationFactory.getInstance()
.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name())).getCode();
byte CONFIGURED_CODEC = SerializerServiceLoader.getSupportedSerializers().iterator().next().getCode();

/**
* Configured compressor by user, default is NONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package org.apache.seata.core.rpc.netty.v1;

import java.util.Map;
import java.util.Set;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
Expand Down Expand Up @@ -65,8 +63,9 @@
public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class);
private static final Configuration CONFIG = ConfigurationFactory.getInstance();
private SerializerType serializerType;

private final Set<SerializerType> supportDeSerializerTypes;


public ProtocolV1Decoder() {
// default is 8M
Expand All @@ -82,8 +81,10 @@ int lengthFieldLength, FullLength is int(4B). so values is 4
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
*/
super(maxFrameLength, 3, 4, -7, 0);
String serializerName = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name());
this.serializerType = SerializerType.getByName(serializerName);
supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers();
if (supportDeSerializerTypes.isEmpty()) {
throw new IllegalArgumentException("No serializer found");
}
}

@Override
Expand Down Expand Up @@ -150,7 +151,7 @@ public Object decodeFrame(ByteBuf frame) {
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
if (this.serializerType.equals(protocolType)) {
if (this.supportDeSerializerTypes.contains(protocolType)) {
Serializer serializer = SerializerServiceLoader.load(protocolType);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
Expand All @@ -161,4 +162,6 @@ public Object decodeFrame(ByteBuf frame) {

return rpcMessage;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,38 @@
*/
package org.apache.seata.core.serializer;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
import org.apache.seata.common.util.ReflectionUtil;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.core.serializer.SerializerType.HESSIAN;
import static org.apache.seata.core.serializer.SerializerType.KRYO;
import static org.apache.seata.core.serializer.SerializerType.PROTOBUF;
import static org.apache.seata.core.serializer.SerializerType.SEATA;

/**
* The Service Loader for the interface {@link Serializer}
*
*/
public final class SerializerServiceLoader {

private static final Logger LOGGER = LoggerFactory.getLogger(SerializerServiceLoader.class);
private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[] {SEATA, PROTOBUF, KRYO, HESSIAN};

private static final String SPLIT_CHAR = ",";

private SerializerServiceLoader() {
}

Expand All @@ -50,4 +72,20 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun
}
return EnhancedServiceLoader.load(Serializer.class, type.name());
}

public static Set<SerializerType> getSupportedSerializers() {
Set<SerializerType> supportedSerializers = new HashSet<>();
String defaultSupportSerializers = Arrays.stream(DEFAULT_SERIALIZER_TYPE).map(SerializerType::name).collect(Collectors.joining(SPLIT_CHAR));
String serializerNames = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, defaultSupportSerializers);
String[] serializerNameArray = serializerNames.split(SPLIT_CHAR);
for (String serializerName : serializerNameArray) {
try {
SerializerType serializerType = SerializerType.getByName(serializerName);
supportedSerializers.add(serializerType);
} catch (IllegalArgumentException ignore) {
LOGGER.warn("Invalid serializer name: " + serializerName);
}
}
return supportedSerializers;
}
}
Loading