Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: load SeataSerializer by version #6208

Merged
merged 31 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
39d88c1
load SeataSerializer by version
Bughue Dec 25, 2023
901b886
license
Bughue Dec 26, 2023
b518004
style
Bughue Dec 26, 2023
883adfb
test ProtocolConstants.VERSION
Bughue Dec 26, 2023
492672c
version serialize
Bughue Dec 27, 2023
d8c19a4
v0
Bughue Dec 29, 2023
958ef77
v0
Bughue Dec 29, 2023
2a0d836
v0
Bughue Dec 29, 2023
227464b
v0
Bughue Dec 29, 2023
0afa845
license
Bughue Dec 29, 2023
4959cbe
license
Bughue Dec 29, 2023
dfd9bf2
style
Bughue Dec 29, 2023
48a6090
style
Bughue Jan 2, 2024
181a421
inner class
Bughue Jan 4, 2024
d2804fb
inner class
Bughue Jan 4, 2024
342d306
single pattern
Bughue Jan 22, 2024
439d132
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jan 22, 2024
74c48d5
conflit
Bughue Jan 22, 2024
93c972b
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Feb 4, 2024
0f8c187
resolve conflict
Bughue Feb 4, 2024
704669a
style
Bughue Feb 28, 2024
9d30454
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Apr 30, 2024
2f5feb0
merge
Bughue Apr 30, 2024
89c0674
double lock check
Bughue May 7, 2024
b649852
double lock check
Bughue May 7, 2024
3cc6ca3
double lock check
Bughue May 7, 2024
2d37bf1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jun 21, 2024
9d199a1
style
Bughue Jun 21, 2024
dd9d808
style
Bughue Jun 21, 2024
5868eef
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jun 21, 2024
ac90469
style
Bughue Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Add changes here for all PR submitted to the 2.x branch.

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version

### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response
Expand All @@ -31,5 +32,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [tuwenlin](https://github.com/tuwenlin)
- [YeonCheolGit](https://github.com/YeonCheolGit)
- [liuqiufeng](https://github.com/liuqiufeng)
- [Bughue](https://github.com/Bughue)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 3 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化


### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
Expand All @@ -27,5 +29,6 @@
- [tuwenlin](https://github.com/tuwenlin)
- [YeonCheolGit](https://github.com/YeonCheolGit)
- [liuqiufeng](https://github.com/liuqiufeng)
- [Bughue](https://github.com/Bughue)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,20 @@ public interface ProtocolConstants {
*/
byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};

/**
* Old protocol version
*/
byte VERSION_0 = 0;

/**
* Protocol version
*/
byte VERSION_1 = 1;

/**
* Protocol version
*/
byte VERSION = 1;
byte VERSION = VERSION_1;

/**
* Max frame length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Object decodeFrame(ByteBuf frame) {
bs = compressor.decompress(bs);
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
if (this.supportDeSerializerTypes.contains(protocolType)) {
Serializer serializer = SerializerServiceLoader.load(protocolType);
Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
throw new IllegalArgumentException("SerializerType not match: " + protocolType.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,26 @@
import static org.apache.seata.core.serializer.SerializerType.PROTOBUF;
import static org.apache.seata.core.serializer.SerializerType.SEATA;

import java.util.HashMap;
import java.util.Map;

/**
* The Service Loader for the interface {@link Serializer}
*
*/
public final class SerializerServiceLoader {

private static final Logger LOGGER = LoggerFactory.getLogger(SerializerServiceLoader.class);
private static final Configuration CONFIG = ConfigurationFactory.getInstance();

private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[] {SEATA, PROTOBUF, KRYO, HESSIAN};
private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[]{SEATA, PROTOBUF, KRYO, HESSIAN};

private final static Map<String, Serializer> SERIALIZER_MAP = new HashMap<>();

private static final String SPLIT_CHAR = ",";

private SerializerServiceLoader() {
}


private static final String PROTOBUF_SERIALIZER_CLASS_NAME = "org.apache.seata.serializer.protobuf.ProtobufSerializer";

/**
Expand All @@ -61,7 +64,7 @@ private SerializerServiceLoader() {
* @return the service of {@link Serializer}
* @throws EnhancedServiceNotFoundException the enhanced service not found exception
*/
public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException {
public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException {
if (type == SerializerType.PROTOBUF) {
try {
ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME);
Expand All @@ -70,9 +73,28 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun
"Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency ", e);
}
}
return EnhancedServiceLoader.load(Serializer.class, type.name());

String key = serialzerKey(type, version);
Serializer serializer = SERIALIZER_MAP.get(key);
if (serializer == null) {
if (type == SerializerType.SEATA) {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version});
} else {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name());
}
SERIALIZER_MAP.put(key, serializer);
}
return serializer;
}

private static String serialzerKey(SerializerType type, byte version) {
if (type == SerializerType.SEATA) {
return type.name() + version;
}
return type.name();
}


public static List<SerializerType> getSupportedSerializers() {
List<SerializerType> supportedSerializers = new ArrayList<>();
String defaultSupportSerializers = Arrays.stream(DEFAULT_SERIALIZER_TYPE).map(SerializerType::name).collect(Collectors.joining(SPLIT_CHAR));
Expand All @@ -93,4 +115,4 @@ public static SerializerType getDefaultSerializerType() {
return getSupportedSerializers().get(0);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@

/**
* The type Message codec factory.
*
*/
public class MessageCodecFactory {

Expand All @@ -95,8 +94,8 @@ public class MessageCodecFactory {
* @param abstractMessage the abstract message
* @return the message codec
*/
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage) {
return getMessageCodec(abstractMessage.getTypeCode());
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage, byte version) {
return getMessageCodec(abstractMessage.getTypeCode(), version);
}

/**
Expand All @@ -105,14 +104,14 @@ public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage)
* @param typeCode the type code
* @return the msg instance by code
*/
public static MessageSeataCodec getMessageCodec(short typeCode) {
public static MessageSeataCodec getMessageCodec(short typeCode, byte version) {
MessageSeataCodec msgCodec = null;
switch (typeCode) {
case MessageType.TYPE_SEATA_MERGE:
msgCodec = new MergedWarpMessageCodec();
msgCodec = new MergedWarpMessageCodec(version);
break;
case MessageType.TYPE_SEATA_MERGE_RESULT:
msgCodec = new MergeResultMessageCodec();
msgCodec = new MergeResultMessageCodec(version);
break;
case MessageType.TYPE_REG_CLT:
msgCodec = new RegisterTMRequestCodec();
Expand All @@ -136,7 +135,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
msgCodec = new GlobalReportRequestCodec();
break;
case MessageType.TYPE_BATCH_RESULT_MSG:
msgCodec = new BatchResultMessageCodec();
msgCodec = new BatchResultMessageCodec(version);
break;
default:
break;
Expand All @@ -147,15 +146,15 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
}

try {
msgCodec = getMergeRequestMessageSeataCodec(typeCode);
msgCodec = getMergeRequestMessageSeataCodec(typeCode, version);
} catch (Exception exx) {
}

if (msgCodec != null) {
return msgCodec;
}

msgCodec = getMergeResponseMessageSeataCodec(typeCode);
msgCodec = getMergeResponseMessageSeataCodec(typeCode, version);

return msgCodec;
}
Expand All @@ -166,7 +165,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
* @param typeCode the type code
* @return the merge request instance by code
*/
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode) {
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode, byte version) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN:
return new GlobalBeginRequestCodec();
Expand Down Expand Up @@ -195,7 +194,7 @@ protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode
* @param typeCode the type code
* @return the merge response instance by code
*/
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode) {
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode, byte version) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN_RESULT:
return new GlobalBeginResponseCodec();
Expand Down
Loading
Loading