-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: multi-version seata protocol support #6226
Changes from all commits
39d88c1
f1bacdd
901b886
ee3c3aa
b518004
883adfb
492672c
94cb0cf
6dc20a2
a151eba
289ed97
8c197af
d8c19a4
958ef77
2a0d836
59e9f8c
227464b
c27c63b
0afa845
4959cbe
dd7c314
b7bdb6d
dfd9bf2
48a6090
7d1a6b3
33e6e4e
379ba28
a9b4d5d
95573d3
181a421
d2804fb
8da65ff
91c171d
342d306
439d132
74c48d5
72d47a1
93c972b
0f8c187
c1c7f3f
cf4fe41
9607bcd
5b433cd
704669a
91418a1
00a4e8e
c728968
f17581c
12d99d5
8943b39
7396cf9
3f7a3c8
2558ba3
ef146b7
df9ea50
af647e7
2a92556
3841379
d876d10
3f0abf0
637e3da
6f8c019
c0dfb51
c2f68aa
b5bb8be
2c4543c
9d30454
2f5feb0
a807053
0ebf2ce
7add8d9
aeb24c8
9f6d80f
237aeb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,7 +69,7 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo | |
if (channel == null) { | ||
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); | ||
} | ||
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); | ||
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); | ||
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); | ||
} | ||
|
||
|
@@ -78,7 +78,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti | |
if (channel == null) { | ||
throw new RuntimeException("client is not connected"); | ||
} | ||
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); | ||
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); | ||
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); | ||
} | ||
|
||
|
@@ -87,26 +87,42 @@ public void sendAsyncRequest(Channel channel, Object msg) { | |
if (channel == null) { | ||
throw new RuntimeException("client is not connected"); | ||
} | ||
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); | ||
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); | ||
super.sendAsync(channel, rpcMessage); | ||
} | ||
|
||
@Override | ||
public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) { | ||
final Channel clientChannel = msg instanceof HeartbeatMessage | ||
? channel | ||
: ChannelManager.getSameClientChannel(channel); | ||
|
||
if (clientChannel == null) { | ||
throw new RuntimeException("Not found client channel to response | channel: " + channel); | ||
Channel clientChannel = channel; | ||
if (!(msg instanceof HeartbeatMessage)) { | ||
clientChannel = ChannelManager.getSameClientChannel(channel); | ||
} | ||
|
||
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage | ||
if (clientChannel != null) { | ||
RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, msg instanceof HeartbeatMessage | ||
? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE | ||
: ProtocolConstants.MSGTYPE_RESPONSE); | ||
super.sendAsync(clientChannel, rpcMsg); | ||
super.sendAsync(clientChannel, rpcMsg); | ||
} else { | ||
throw new RuntimeException("channel is error."); | ||
} | ||
} | ||
|
||
|
||
private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) { | ||
RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, msg, messageType); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 为什么不能直接通过channel上下文中的版本去创建对应协议版本的子类? |
||
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); | ||
rpcMessage.setOtherSideVersion(rpcContext.getVersion()); | ||
return rpcMessage; | ||
} | ||
|
||
protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) { | ||
RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType); | ||
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); | ||
rpcMessage.setOtherSideVersion(rpcContext.getVersion()); | ||
return rpcMessage; | ||
} | ||
|
||
|
||
@Override | ||
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) { | ||
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seata.core.rpc.netty; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import io.netty.buffer.ByteBuf; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; | ||
import org.apache.seata.core.exception.DecodeException; | ||
import org.apache.seata.core.protocol.ProtocolConstants; | ||
import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0; | ||
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* <pre> | ||
* (> 0.7.0) | ||
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | ||
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ | ||
* | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId | | ||
* | code |colVer| (head+body) | Length |Type |lizer|ess | | | ||
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | ||
* | ||
* (<= 0.7.0) | ||
* 0 1 2 3 4 6 8 10 12 14 | ||
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ | ||
* | 0xdada | flag | typecode/ | requestid | | ||
* | | | bodylength| | | ||
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | ||
* | ||
* </pre> | ||
* <p> | ||
* <li>Full Length: include all data </li> | ||
* <li>Head Length: include head data from magic code to head map. </li> | ||
* <li>Body Length: Full Length - Head Length</li> | ||
* </p> | ||
*/ | ||
public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class); | ||
private static Map<Byte, ProtocolDecoder> protocolDecoderMap; | ||
|
||
public CompatibleProtocolDecoder() { | ||
// default is 8M | ||
this(ProtocolConstants.MAX_FRAME_LENGTH); | ||
} | ||
|
||
public CompatibleProtocolDecoder(int maxFrameLength) { | ||
/* | ||
int maxFrameLength, | ||
int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 | ||
int lengthFieldLength, FullLength is int(4B). so values is 4 | ||
int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 | ||
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 | ||
*/ | ||
super(maxFrameLength, 3, 4, -7, 0); | ||
protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder() | ||
.put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) | ||
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { | ||
ByteBuf frame; | ||
Object decoded; | ||
byte version; | ||
try { | ||
if (isV0(in)) { | ||
decoded = in; | ||
version = ProtocolConstants.VERSION_0; | ||
} else { | ||
decoded = super.decode(ctx, in); | ||
version = decideVersion(decoded); | ||
} | ||
|
||
if (decoded instanceof ByteBuf) { | ||
frame = (ByteBuf) decoded; | ||
try { | ||
ProtocolDecoder decoder = protocolDecoderMap.get(version); | ||
if (decoder == null) { | ||
throw new UnsupportedOperationException("Unsupported version: " + version); | ||
} | ||
return decoder.decodeFrame(frame); | ||
} finally { | ||
if (version != ProtocolConstants.VERSION_0) { | ||
frame.release(); | ||
} | ||
} | ||
} | ||
} catch (Exception exx) { | ||
LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); | ||
throw new DecodeException(exx); | ||
} | ||
return decoded; | ||
} | ||
|
||
protected byte decideVersion(Object in) { | ||
if (in instanceof ByteBuf) { | ||
ByteBuf frame = (ByteBuf) in; | ||
frame.markReaderIndex(); | ||
byte b0 = frame.readByte(); | ||
byte b1 = frame.readByte(); | ||
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 | ||
|| ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { | ||
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); | ||
} | ||
|
||
byte version = frame.readByte(); | ||
frame.resetReaderIndex(); | ||
return version; | ||
} | ||
return -1; | ||
} | ||
|
||
|
||
protected boolean isV0(ByteBuf in) { | ||
boolean isV0 = false; | ||
in.markReaderIndex(); | ||
byte b0 = in.readByte(); | ||
byte b1 = in.readByte(); | ||
// v1/v2/v3 : b2 = version | ||
// v0 : 1st byte in FLAG(2byte:0x10/0x20/0x40/0x80) | ||
byte b2 = in.readByte(); | ||
if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0 | ||
&& ProtocolConstants.MAGIC_CODE_BYTES[1] == b1 | ||
&& 0 == b2) { | ||
isV0 = true; | ||
} | ||
|
||
in.resetReaderIndex(); | ||
return isV0; | ||
} | ||
|
||
protected boolean isV0(byte version) { | ||
return version == ProtocolConstants.VERSION_0; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.seata.core.rpc.netty; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import io.netty.buffer.ByteBuf; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.handler.codec.MessageToByteEncoder; | ||
import org.apache.seata.common.util.StringUtils; | ||
import org.apache.seata.core.protocol.ProtocolConstants; | ||
import org.apache.seata.core.protocol.RpcMessage; | ||
import org.apache.seata.core.protocol.Version; | ||
import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; | ||
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Compatible Protocol Encoder | ||
* <p> | ||
* <li>Full Length: include all data </li> | ||
* <li>Head Length: include head data from magic code to head map. </li> | ||
* <li>Body Length: Full Length - Head Length</li> | ||
* </p> | ||
*/ | ||
public class CompatibleProtocolEncoder extends MessageToByteEncoder { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be renamed MultiProtocolEncoder |
||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolEncoder.class); | ||
|
||
private static Map<Byte, ProtocolEncoder> protocolEncoderMap; | ||
|
||
public CompatibleProtocolEncoder() { | ||
super(); | ||
protocolEncoderMap = ImmutableMap.<Byte, ProtocolEncoder>builder() | ||
.put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) | ||
.put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { | ||
try { | ||
if (msg instanceof RpcMessage) { | ||
RpcMessage rpcMessage = (RpcMessage) msg; | ||
String sdkVersion = rpcMessage.getOtherSideVersion(); | ||
if (StringUtils.isBlank(sdkVersion)) { | ||
sdkVersion = Version.getCurrent(); | ||
} | ||
byte protocolVersion = Version.calcProtocolVersion(sdkVersion); | ||
ProtocolEncoder encoder = protocolEncoderMap.get(protocolVersion); | ||
if (encoder == null) { | ||
throw new UnsupportedOperationException("Unsupported protocolVersion: " + protocolVersion); | ||
} | ||
|
||
encoder.encode(rpcMessage, out); | ||
} else { | ||
throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); | ||
} | ||
} catch (Throwable e) { | ||
LOGGER.error("Encode request error!", e); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么要在rpcmessage中增加版本字段?
Why add a version field to rpcmessage?