Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: load SeataSerializer by version #6208

Merged
merged 31 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
39d88c1
load SeataSerializer by version
Bughue Dec 25, 2023
901b886
license
Bughue Dec 26, 2023
b518004
style
Bughue Dec 26, 2023
883adfb
test ProtocolConstants.VERSION
Bughue Dec 26, 2023
492672c
version serialize
Bughue Dec 27, 2023
d8c19a4
v0
Bughue Dec 29, 2023
958ef77
v0
Bughue Dec 29, 2023
2a0d836
v0
Bughue Dec 29, 2023
227464b
v0
Bughue Dec 29, 2023
0afa845
license
Bughue Dec 29, 2023
4959cbe
license
Bughue Dec 29, 2023
dfd9bf2
style
Bughue Dec 29, 2023
48a6090
style
Bughue Jan 2, 2024
181a421
inner class
Bughue Jan 4, 2024
d2804fb
inner class
Bughue Jan 4, 2024
342d306
single pattern
Bughue Jan 22, 2024
439d132
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jan 22, 2024
74c48d5
conflit
Bughue Jan 22, 2024
93c972b
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Feb 4, 2024
0f8c187
resolve conflict
Bughue Feb 4, 2024
704669a
style
Bughue Feb 28, 2024
9d30454
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Apr 30, 2024
2f5feb0
merge
Bughue Apr 30, 2024
89c0674
double lock check
Bughue May 7, 2024
b649852
double lock check
Bughue May 7, 2024
3cc6ca3
double lock check
Bughue May 7, 2024
2d37bf1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jun 21, 2024
9d199a1
style
Bughue Jun 21, 2024
dd9d808
style
Bughue Jun 21, 2024
5868eef
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-ser…
Bughue Jun 21, 2024
ac90469
style
Bughue Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/src/main/java/io/seata/core/protocol/ProtocolConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,20 @@ public interface ProtocolConstants {
*/
byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};

/**
* Old protocol version
*/
byte VERSION_0 = 0;

/**
* Protocol version
*/
byte VERSION_1 = 1;

/**
* Protocol version
*/
byte VERSION = 1;
byte VERSION = VERSION_1;

/**
* Max frame length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Object decodeFrame(ByteBuf frame) {
frame.readBytes(bs);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
import io.seata.common.loader.EnhancedServiceNotFoundException;
import io.seata.common.util.ReflectionUtil;

import java.util.HashMap;
import java.util.Map;

/**
* The Service Loader for the interface {@link Serializer}
*
*/
public final class SerializerServiceLoader {

private SerializerServiceLoader() {
}

private final static Map<String, Serializer> SERIALIZER_MAP = new HashMap<>();

private static final String PROTOBUF_SERIALIZER_CLASS_NAME = "io.seata.serializer.protobuf.ProtobufSerializer";

Expand All @@ -39,7 +42,7 @@ private SerializerServiceLoader() {
* @return the service of {@link Serializer}
* @throws EnhancedServiceNotFoundException the enhanced service not found exception
*/
public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException {
public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException {
if (type == SerializerType.PROTOBUF) {
try {
ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME);
Expand All @@ -48,6 +51,24 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun
"Please manually reference 'io.seata:seata-serializer-protobuf' dependency ", e);
}
}
return EnhancedServiceLoader.load(Serializer.class, type.name());

String key = serialzerKey(type, version);
Serializer serializer = SERIALIZER_MAP.get(key);
if (serializer == null) {
if (type == SerializerType.SEATA) {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version});
Bughue marked this conversation as resolved.
Show resolved Hide resolved
} else {
serializer = EnhancedServiceLoader.load(Serializer.class, type.name());
}
SERIALIZER_MAP.put(key, serializer);
}
return serializer;
}

private static String serialzerKey(SerializerType type, byte version) {
if (type == SerializerType.SEATA) {
return type.name() + version;
}
return type.name();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@

/**
* The type Message codec factory.
*
*/
public class MessageCodecFactory {

Expand All @@ -95,8 +94,8 @@ public class MessageCodecFactory {
* @param abstractMessage the abstract message
* @return the message codec
*/
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage) {
return getMessageCodec(abstractMessage.getTypeCode());
public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage, byte version) {
return getMessageCodec(abstractMessage.getTypeCode(), version);
}

/**
Expand All @@ -105,14 +104,14 @@ public static MessageSeataCodec getMessageCodec(AbstractMessage abstractMessage)
* @param typeCode the type code
* @return the msg instance by code
*/
public static MessageSeataCodec getMessageCodec(short typeCode) {
public static MessageSeataCodec getMessageCodec(short typeCode, byte version) {
MessageSeataCodec msgCodec = null;
switch (typeCode) {
case MessageType.TYPE_SEATA_MERGE:
msgCodec = new MergedWarpMessageCodec();
msgCodec = new MergedWarpMessageCodec(version);
break;
case MessageType.TYPE_SEATA_MERGE_RESULT:
msgCodec = new MergeResultMessageCodec();
msgCodec = new MergeResultMessageCodec(version);
break;
case MessageType.TYPE_REG_CLT:
msgCodec = new RegisterTMRequestCodec();
Expand All @@ -136,7 +135,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
msgCodec = new GlobalReportRequestCodec();
break;
case MessageType.TYPE_BATCH_RESULT_MSG:
msgCodec = new BatchResultMessageCodec();
msgCodec = new BatchResultMessageCodec(version);
break;
default:
break;
Expand All @@ -147,15 +146,15 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
}

try {
msgCodec = getMergeRequestMessageSeataCodec(typeCode);
msgCodec = getMergeRequestMessageSeataCodec(typeCode, version);
} catch (Exception exx) {
}

if (msgCodec != null) {
return msgCodec;
}

msgCodec = getMergeResponseMessageSeataCodec(typeCode);
msgCodec = getMergeResponseMessageSeataCodec(typeCode, version);

return msgCodec;
}
Expand All @@ -166,7 +165,7 @@ public static MessageSeataCodec getMessageCodec(short typeCode) {
* @param typeCode the type code
* @return the merge request instance by code
*/
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode) {
protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode, byte version) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN:
return new GlobalBeginRequestCodec();
Expand Down Expand Up @@ -195,7 +194,7 @@ protected static MessageSeataCodec getMergeRequestMessageSeataCodec(int typeCode
* @param typeCode the type code
* @return the merge response instance by code
*/
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode) {
protected static MessageSeataCodec getMergeResponseMessageSeataCodec(int typeCode, byte version) {
switch (typeCode) {
case MessageType.TYPE_GLOBAL_BEGIN_RESULT:
return new GlobalBeginResponseCodec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,41 @@
*/
package io.seata.serializer.seata;

import java.nio.ByteBuffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.BufferUtils;
import io.seata.core.protocol.AbstractMessage;
import io.seata.common.loader.Scope;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.serializer.Serializer;
import io.seata.serializer.seata.protocol.v0.SeataSerializerV0;
import io.seata.serializer.seata.protocol.v1.SeataSerializerV1;
Bughue marked this conversation as resolved.
Show resolved Hide resolved

/**
* The Seata codec.
*
*/
@LoadLevel(name = "SEATA")
@LoadLevel(name = "SEATA", scope = Scope.PROTOTYPE)
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
public class SeataSerializer implements Serializer {

@Override
public <T> byte[] serialize(T t) {
if (!(t instanceof AbstractMessage)) {
throw new IllegalArgumentException("AbstractMessage isn't available.");
}
AbstractMessage abstractMessage = (AbstractMessage)t;
//typecode
short typecode = abstractMessage.getTypeCode();
//msg codec
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode);
//get empty ByteBuffer
ByteBuf out = Unpooled.buffer(1024);
//msg encode
messageCodec.encode(t, out);
byte[] body = new byte[out.readableBytes()];
out.readBytes(body);
Serializer versionSeataSerializer;

//typecode + body
ByteBuffer byteBuffer = ByteBuffer.allocate(2 + body.length);
byteBuffer.putShort(typecode);
byteBuffer.put(body);

BufferUtils.flip(byteBuffer);
byte[] content = new byte[byteBuffer.limit()];
byteBuffer.get(content);
return content;
public SeataSerializer(Byte version) {
if (version == ProtocolConstants.VERSION_0) {
versionSeataSerializer = SeataSerializerV0.getInstance();
} else if (version == ProtocolConstants.VERSION_1) {
versionSeataSerializer = SeataSerializerV1.getInstance();
}
if (versionSeataSerializer == null) {
throw new IllegalArgumentException("version is not supported");
}
}

@Override
public <T> byte[] serialize(T t) {
return versionSeataSerializer.serialize(t);
}

@Override
public <T> T deserialize(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("Nothing to decode.");
}
if (bytes.length < 2) {
throw new IllegalArgumentException("The byte[] isn't available for decode.");
}
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//typecode
short typecode = byteBuffer.getShort();
//msg body
byte[] body = new byte[byteBuffer.remaining()];
byteBuffer.get(body);
ByteBuffer in = ByteBuffer.wrap(body);
//new Messgae
AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typecode);
//get messageCodec
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode);
//decode
messageCodec.decode(abstractMessage, in);
return (T)abstractMessage;
return versionSeataSerializer.deserialize(bytes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
*/
public class BatchResultMessageCodec extends AbstractMessageCodec {

private byte version;

public BatchResultMessageCodec(byte version) {
this.version = version;
}
@Override
public Class<?> getMessageClassType() {
return BatchResultMessage.class;
Expand All @@ -53,7 +58,7 @@ public <T> void encode(T t, ByteBuf out) {
for (final AbstractMessage msg : msgs) {
final ByteBuf subBuffer = Unpooled.buffer(1024);
short typeCode = msg.getTypeCode();
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode, version);
messageCodec.encode(msg, subBuffer);
buffer.writeShort(msg.getTypeCode());
buffer.writeBytes(subBuffer);
Expand Down Expand Up @@ -107,7 +112,7 @@ protected void decode(BatchResultMessage batchResultMessage, ByteBuffer byteBuff
for (int idx = 0; idx < msgNum; idx++) {
short typeCode = byteBuffer.getShort();
AbstractMessage abstractResultMessage = MessageCodecFactory.getMessage(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode, version);
messageCodec.decode(abstractResultMessage, byteBuffer);
msgs.add((AbstractResultMessage) abstractResultMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
*/
public class MergeResultMessageCodec extends AbstractMessageCodec {

private byte version;

public MergeResultMessageCodec(byte version) {
this.version = version;
}
@Override
public Class<?> getMessageClassType() {
return MergeResultMessage.class;
Expand All @@ -48,7 +53,7 @@ public <T> void encode(T t, ByteBuf out) {
short typeCode = msg.getTypeCode();
//put typeCode
out.writeShort(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode, version);
messageCodec.encode(msg, out);
}

Expand Down Expand Up @@ -91,7 +96,7 @@ protected void decode(MergeResultMessage mergeResultMessage, ByteBuffer byteBuff
for (int idx = 0; idx < msgNum; idx++) {
short typeCode = byteBuffer.getShort();
AbstractMessage abstractResultMessage = MessageCodecFactory.getMessage(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode, version);
messageCodec.decode(abstractResultMessage, byteBuffer);
msgs[idx] = (AbstractResultMessage)abstractResultMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
*/
public class MergedWarpMessageCodec extends AbstractMessageCodec {

private byte version;

public MergedWarpMessageCodec(byte version) {
this.version = version;
}
@Override
public Class<?> getMessageClassType() {
return MergedWarpMessage.class;
Expand All @@ -51,7 +56,7 @@ public <T> void encode(T t, ByteBuf out) {
for (final AbstractMessage msg : msgs) {
final ByteBuf subBuffer = Unpooled.buffer(1024);
short typeCode = msg.getTypeCode();
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode, version);
messageCodec.encode(msg, subBuffer);
buffer.writeShort(msg.getTypeCode());
buffer.writeBytes(subBuffer);
Expand Down Expand Up @@ -97,7 +102,7 @@ private void doDecode(MergedWarpMessage mergedWarpMessage, ByteBuffer byteBuffer
for (int idx = 0; idx < msgNum; idx++) {
short typeCode = byteBuffer.getShort();
AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode);
MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typeCode, version);
messageCodec.decode(abstractMessage, byteBuffer);
msgs.add(abstractMessage);
}
Expand Down
Loading
Loading