diff --git a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java index 58adbfbce6f..decf4389ac4 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java +++ b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java @@ -22,6 +22,10 @@ */ public interface MessageType { + /** + * The constant VERSION_NOT_SUPPORT. + */ + short VERSION_NOT_SUPPORT = -1; /** * The constant TYPE_GLOBAL_BEGIN. */ diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java b/core/src/main/java/org/apache/seata/core/protocol/Version.java index f32bb163a69..94c174ad669 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/Version.java +++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java @@ -94,6 +94,10 @@ public static boolean isAboveOrEqualVersion230(String version) { return isAboveOrEqualVersion(version, VERSION_2_3_0); } + public static boolean isV0(String version) { + return !isAboveOrEqualVersion(version, VERSION_0_7_1); + } + public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) { boolean isAboveOrEqualVersion = false; try { diff --git a/core/src/main/java/org/apache/seata/core/protocol/VersionNotSupportMessage.java b/core/src/main/java/org/apache/seata/core/protocol/VersionNotSupportMessage.java new file mode 100644 index 00000000000..85c5a612a55 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/VersionNotSupportMessage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol; + +/** + * The type Version not support message. + * + */ +public class VersionNotSupportMessage extends AbstractMessage { + @Override + public short getTypeCode() { + return MessageType.VERSION_NOT_SUPPORT; + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/MsgVersionHelper.java b/core/src/main/java/org/apache/seata/core/rpc/MsgVersionHelper.java new file mode 100644 index 00000000000..571997cd4a9 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/MsgVersionHelper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc; + +import io.netty.channel.Channel; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.MessageType; +import org.apache.seata.core.protocol.MessageTypeAware; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.Version; + +import java.util.Arrays; +import java.util.List; + +/** + * the type ServerSkipMsgHelper + **/ +public class MsgVersionHelper { + + private static final List SKIP_MSG_CODE_V0 = Arrays.asList(MessageType.TYPE_RM_DELETE_UNDOLOG); + + public static boolean versionNotSupport(Channel channel, RpcMessage rpcMessage) { + if (rpcMessage == null || rpcMessage.getBody() == null || channel == null) { + return false; + } + Object msg = rpcMessage.getBody(); + String version = Version.getChannelVersion(channel); + if (StringUtils.isBlank(version) || msg == null) { + return false; + } + boolean isV0 = Version.isV0(version); + if (!isV0 || !(msg instanceof MessageTypeAware)) { + return false; + } + short typeCode = ((MessageTypeAware) msg).getTypeCode(); + return SKIP_MSG_CODE_V0.contains(typeCode); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java index 3d09108ac43..d4d4202b89b 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -43,7 +43,9 @@ import org.apache.seata.core.protocol.MessageTypeAware; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.VersionNotSupportMessage; import org.apache.seata.core.rpc.Disposable; +import org.apache.seata.core.rpc.MsgVersionHelper; import org.apache.seata.core.rpc.hook.RpcHook; import org.apache.seata.core.rpc.processor.Pair; import org.apache.seata.core.rpc.processor.RemotingProcessor; @@ -173,6 +175,12 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi LOGGER.warn("sendSync nothing, caused by null channel."); return null; } + if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Message sending will be skipped as the client version does not support it,{}", rpcMessage); + } + return new VersionNotSupportMessage(); + } MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); @@ -216,6 +224,12 @@ protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMi * @param rpcMessage rpc message */ protected void sendAsync(Channel channel, RpcMessage rpcMessage) { + if (MsgVersionHelper.versionNotSupport(channel, rpcMessage)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Message sending will be skipped as the client version does not support it,{}", rpcMessage); + } + return; + } channelWritableCheck(channel, rpcMessage.getBody()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java index 1ebd36fe453..8f0e9b27aa5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java @@ -36,6 +36,7 @@ import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.Version; import org.apache.seata.discovery.registry.FileRegistryServiceImpl; import org.apache.seata.discovery.registry.RegistryFactory; import org.apache.seata.discovery.registry.RegistryService; @@ -270,12 +271,13 @@ void invalidateObject(final String serverAddress, final Channel channel) throws nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel); } - void registerChannel(final String serverAddress, final Channel channel) { + void registerChannel(final String serverAddress, final Channel channel, String version) { Channel channelToServer = channels.get(serverAddress); if (channelToServer != null && channelToServer.isActive()) { return; } channels.put(serverAddress, channel); + Version.putChannelVersion(channel, version); } private Channel doConnect(String serverAddress) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 0b9fd0e12bc..872cfa2b2bb 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -178,7 +178,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r if (LOGGER.isInfoEnabled()) { LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel); } - getClientChannelManager().registerChannel(serverAddress, channel); + getClientChannelManager().registerChannel(serverAddress, channel, registerRMRequest.getVersion()); String dbKey = getMergedResourceKeys(); if (registerRMRequest.getResourceIds() != null) { if (!registerRMRequest.getResourceIds().equals(dbKey)) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index 28993b61f77..4873f8c3470 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -218,7 +218,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r if (LOGGER.isInfoEnabled()) { LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel); } - getClientChannelManager().registerChannel(serverAddress, channel); + getClientChannelManager().registerChannel(serverAddress, channel, registerTMRequest.getVersion()); } @Override diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java b/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java index 0ed3179eccc..2989f4e8337 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java @@ -37,4 +37,6 @@ public static Channel getChannel(TmNettyRemotingClient client) { private static NettyClientChannelManager getChannelManager(AbstractNettyRemotingClient remotingClient) { return remotingClient.getClientChannelManager(); } + + } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java new file mode 100644 index 00000000000..d7a073c38f0 --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.common.XID; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.UUIDGenerator; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.Version; +import org.apache.seata.core.protocol.VersionNotSupportMessage; +import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest; +import org.apache.seata.core.rpc.MsgVersionHelper; +import org.apache.seata.server.coordinator.DefaultCoordinator; +import org.apache.seata.server.session.SessionHolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * MsgVersionHelper Test + **/ +public class MsgVersionHelperTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MsgVersionHelperTest.class); + + @BeforeAll + public static void init(){ + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091"); + } + @AfterAll + public static void after() { + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + } + + public static ThreadPoolExecutor initMessageExecutor() { + return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Test + public void testSendMsgWithResponse() throws Exception { + ThreadPoolExecutor workingThreads = initMessageExecutor(); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); + new Thread(() -> { + SessionHolder.init(null); + nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer)); + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8091); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + nettyRemotingServer.init(); + }).start(); + Thread.sleep(3000); + + String applicationId = "app 1"; + String transactionServiceGroup = "default_tx_group"; + TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); + tmNettyRemotingClient.init(); + + String serverAddress = "0.0.0.0:8091"; + Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); + + RpcMessage rpcMessage = buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + Assertions.assertFalse(MsgVersionHelper.versionNotSupport(channel, rpcMessage)); + TmNettyRemotingClient.getInstance().sendAsync(channel,rpcMessage); + + + Version.putChannelVersion(channel,"0.7.0"); + Assertions.assertTrue(MsgVersionHelper.versionNotSupport(channel, rpcMessage)); + TmNettyRemotingClient.getInstance().sendAsync(channel,rpcMessage); + Object response = TmNettyRemotingClient.getInstance().sendSync(channel, rpcMessage, 100); + Assertions.assertTrue(response instanceof VersionNotSupportMessage); + + nettyRemotingServer.destroy(); + tmNettyRemotingClient.destroy(); + } + + private RpcMessage buildUndoLogDeleteMsg(byte messageType) { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setId(100); + rpcMessage.setMessageType(messageType); + rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC); + rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR); + rpcMessage.setBody(new UndoLogDeleteRequest()); + return rpcMessage; + } +}