From d31840961a358af82abc872ecd2a8ef402d23351 Mon Sep 17 00:00:00 2001 From: jimin Date: Thu, 28 Mar 2024 10:11:11 +0800 Subject: [PATCH] refactor: optimize Configuration Cache (#6420) --- changes/en-us/2.x.md | 2 + changes/zh-cn/2.x.md | 3 +- .../config/consul/ConsulConfiguration.java | 6 +-- .../CachedConfigurationChangeListener.java | 28 ++++++++++ .../seata/config/ConfigurationCache.java | 51 ++++--------------- .../config/ConfigurationChangeListener.java | 8 +-- .../seata/config/FileConfiguration.java | 2 +- .../seata/config/ConfigurationCacheTests.java | 40 +-------------- .../seata/config/FileConfigurationTest.java | 29 +++++++---- .../config/ProConfigurationFactoryTest.java | 2 +- .../RegistryConfigurationFactoryTest.java | 1 - .../config/YamlConfigurationFactoryTest.java | 2 +- .../src/test/resources/file-test-pro.conf | 3 ++ .../src/test/resources/file-test-yaml.conf | 4 ++ .../config/CustomConfigurationForTest.java | 1 - .../seata/config/etcd3/EtcdConfiguration.java | 2 +- .../core/rpc/netty/NettyClientConfig.java | 2 +- .../core/rpc/netty/NettyRemotingServer.java | 4 ++ .../core/rpc/netty/NettyServerBootstrap.java | 4 ++ .../core/rpc/netty/NettyServerConfig.java | 10 ++-- .../core/rpc/netty/RmNettyRemotingClient.java | 25 ++++----- .../core/rpc/netty/TmNettyRemotingClient.java | 12 +++-- .../core/rpc/netty/NettyClientTestSuite.java | 25 +++++++++ .../core/rpc/netty/RmNettyClientTest.java | 34 ++++++++----- .../core/rpc/netty/TmNettyClientTest.java | 50 ++++++++++-------- .../discovery/registry/RegistryService.java | 11 ++-- ...GlobalTransactionalInterceptorHandler.java | 20 ++++---- .../GlobalTransactionalInterceptorParser.java | 7 +-- pom.xml | 10 ++++ .../datasource/exec/LockRetryController.java | 10 ++-- .../db/lock/DataBaseDistributedLocker.java | 7 ++- .../annotation/GlobalTransactionScanner.java | 17 +++---- .../seata/common/ConfigurationTestHelper.java | 17 +++++-- .../saga/engine/db/AbstractServerTest.java | 6 ++- 34 files changed, 252 insertions(+), 203 deletions(-) create mode 100644 config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java create mode 100644 core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 91632d04d8a..be4b6ce8079 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -113,6 +113,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6387](https://github.com/apache/incubator-seata/pull/6387)] optimize tcc use compatible - [[#6402](https://github.com/apache/incubator-seata/pull/6402)] optimize rm-datasource use compatible - [[#6419](https://github.com/apache/incubator-seata/pull/6419)] optimize integration-tx-api compatible +- [[#6405](https://github.com/apache/incubator-seata/pull/6405)] fix kotlin compile failure - [[#6412](https://github.com/apache/incubator-seata/pull/6412)] optimize core compatible module - [[#6429](https://github.com/apache/incubator-seata/pull/6429)] remove repetitive words @@ -140,6 +141,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6280](https://github.com/apache/incubator-seata/pull/6280)] refactor Saga designer using diagram-js - [[#6269](https://github.com/apache/incubator-seata/pull/6269)] standardize Seata Exception +- [[#6420](https://github.com/apache/incubator-seata/pull/6420)] refactor Configuration Cache Thanks to these contributors for their code commits. Please report an unintended omission. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index a32b723df5c..61229c5ce66 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -115,10 +115,10 @@ - [[#6387](https://github.com/apache/incubator-seata/pull/6387)] 优化tcc使用兼容 - [[#6402](https://github.com/apache/incubator-seata/pull/6402)] 优化rm-datasource向下兼容 - [[#6419](https://github.com/apache/incubator-seata/pull/6419)] 优化integration-tx-api向下兼容 +- [[#6405](https://github.com/apache/incubator-seata/pull/6405)] 修复 kotlin 编译失败 - [[#6412](https://github.com/apache/incubator-seata/pull/6412)] 优化 core 兼容模块 - [[#6429](https://github.com/apache/incubator-seata/pull/6429)] 移除重复注释 - ### security: - [[#6069](https://github.com/apache/incubator-seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞 - [[#6144](https://github.com/apache/incubator-seata/pull/6144)] 升级Nacos依赖版本至1.4.6 @@ -141,6 +141,7 @@ - [[#6280](https://github.com/apache/incubator-seata/pull/6280)] 使用diagram-js重构Saga设计器 - [[#6269](https://github.com/apache/incubator-seata/pull/6269)] 统一Seata异常规范 +- [[#6420](https://github.com/apache/incubator-seata/pull/6420)] 优化配置缓存 非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。 diff --git a/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java b/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java index 31fe5acfdac..1a81de7d674 100644 --- a/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java +++ b/config/seata-config-consul/src/main/java/org/apache/seata/config/consul/ConsulConfiguration.java @@ -20,9 +20,9 @@ import java.net.InetSocketAddress; import java.util.Enumeration; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -346,7 +346,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { for (ConfigurationChangeListener changeListener : entry.getValue()) { event.setDataId(key).setNewValue(valueNew); ConfigurationChangeListener listener = ((ConsulListener) changeListener).getTargetListener(); - listener.onChangeEvent(event); + listener.onProcessEvent(event); } } } @@ -354,7 +354,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { } else { // The old config change listener,it would be deleted in next edition event.setDataId(dataId).setNewValue(value); - listener.onChangeEvent(event); + listener.onProcessEvent(event); } } } diff --git a/config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java b/config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java new file mode 100644 index 00000000000..074eadc5861 --- /dev/null +++ b/config/seata-config-core/src/main/java/org/apache/seata/config/CachedConfigurationChangeListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.config; + +public interface CachedConfigurationChangeListener extends ConfigurationChangeListener { + + ConfigurationCache CONFIGURATION_CACHE = ConfigurationCache.getInstance(); + + @Override + default void afterEvent(ConfigurationChangeEvent event) { + ConfigurationChangeListener listener = (ConfigurationChangeListener)CONFIGURATION_CACHE; + listener.onProcessEvent(event); + } +} diff --git a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java index 889e59bc745..6bc8573c204 100644 --- a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java +++ b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationCache.java @@ -21,8 +21,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.seata.common.util.CollectionUtils; + import org.apache.seata.common.util.DurationUtil; import org.apache.seata.common.util.StringUtils; @@ -37,45 +38,7 @@ public class ConfigurationCache implements ConfigurationChangeListener { private static final Map CONFIG_CACHE = new ConcurrentHashMap<>(); - private Map> configListenersMap = new HashMap<>(); - - public static void addConfigListener(String dataId, ConfigurationChangeListener... listeners) { - if (StringUtils.isBlank(dataId)) { - return; - } - synchronized (ConfigurationCache.class) { - HashSet listenerHashSet = - getInstance().configListenersMap.computeIfAbsent(dataId, key -> new HashSet<>()); - if (!listenerHashSet.contains(getInstance())) { - ConfigurationFactory.getInstance().addConfigListener(dataId, getInstance()); - listenerHashSet.add(getInstance()); - } - if (null != listeners && listeners.length > 0) { - for (ConfigurationChangeListener listener : listeners) { - if (!listenerHashSet.contains(listener)) { - listenerHashSet.add(listener); - ConfigurationFactory.getInstance().addConfigListener(dataId, listener); - } - } - } - } - } - - public static void removeConfigListener(String dataId, ConfigurationChangeListener... listeners) { - if (StringUtils.isBlank(dataId)) { - return; - } - synchronized (ConfigurationCache.class) { - final HashSet listenerSet = getInstance().configListenersMap.get(dataId); - if (CollectionUtils.isNotEmpty(listenerSet)) { - for (ConfigurationChangeListener listener : listeners) { - if (listenerSet.remove(listener)) { - ConfigurationFactory.getInstance().removeConfigListener(dataId, listener); - } - } - } - } - } + private static final Set DATA_ID_CACHED = new HashSet<>(); public static ConfigurationCache getInstance() { return ConfigurationCacheInstance.INSTANCE; @@ -91,11 +54,12 @@ public void onChangeEvent(ConfigurationChangeEvent event) { } else { Object newValue = new ObjectWrapper(event.getNewValue(), null).convertData(oldWrapper.getType()); if (!Objects.equals(oldWrapper.getData(), newValue)) { - CONFIG_CACHE.put(event.getDataId(), new ObjectWrapper(newValue, oldWrapper.getType(),oldWrapper.getLastDefaultValue())); + CONFIG_CACHE.put(event.getDataId(), + new ObjectWrapper(newValue, oldWrapper.getType(), oldWrapper.getLastDefaultValue())); } } } else { - CONFIG_CACHE.remove(event.getDataId()); + CONFIG_CACHE.remove(event.getDataId(), oldWrapper); } } @@ -115,6 +79,9 @@ public Configuration proxy(Configuration originalConfiguration) throws Exception } if (null == wrapper || (null != defaultValue && !Objects.equals(defaultValue, wrapper.lastDefaultValue))) { + if (DATA_ID_CACHED.add(rawDataId)) { + originalConfiguration.addConfigListener(rawDataId, this); + } Object result = method.invoke(originalConfiguration, args); // The wrapper.data only exists in the cache when it is not null. if (result != null) { diff --git a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java index 338f0a06e19..3c69735e9c3 100644 --- a/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java +++ b/config/seata-config-core/src/main/java/org/apache/seata/config/ConfigurationChangeListener.java @@ -58,9 +58,9 @@ public interface ConfigurationChangeListener { */ default void onProcessEvent(ConfigurationChangeEvent event) { getExecutorService().submit(() -> { - beforeEvent(); + beforeEvent(event); onChangeEvent(event); - afterEvent(); + afterEvent(event); }); } @@ -83,14 +83,14 @@ default ExecutorService getExecutorService() { /** * Before event. */ - default void beforeEvent() { + default void beforeEvent(ConfigurationChangeEvent event) { } /** * After event. */ - default void afterEvent() { + default void afterEvent(ConfigurationChangeEvent event) { } } diff --git a/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java b/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java index f0feed1435b..08e8be181fa 100644 --- a/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java +++ b/config/seata-config-core/src/main/java/org/apache/seata/config/FileConfiguration.java @@ -394,7 +394,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { event.setDataId(dataId).setNewValue(currentConfig).setOldValue(oldConfig); for (ConfigurationChangeListener listener : dataIdMap.get(dataId)) { - listener.onChangeEvent(event); + listener.onProcessEvent(event); } } } diff --git a/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java b/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java index c26ab4eb3b7..010753092c0 100644 --- a/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java +++ b/config/seata-config-core/src/test/java/org/apache/seata/config/ConfigurationCacheTests.java @@ -16,17 +16,12 @@ */ package org.apache.seata.config; -import org.apache.seata.common.util.CollectionUtils; +import java.time.Duration; + import org.apache.seata.common.util.DurationUtil; -import org.apache.seata.common.util.ReflectionUtil; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.lang.reflect.Field; -import java.time.Duration; -import java.util.HashSet; -import java.util.Map; - public class ConfigurationCacheTests { @@ -70,37 +65,6 @@ public void testChangeValue() throws Exception { Assertions.assertNull(test); } - // FIXME: 2023/2/19 wait bugfix - // @Test - public void testConfigListener() throws Exception { - Configuration configuration = new FileConfiguration("registry"); - configuration = ConfigurationCache.getInstance().proxy(configuration); - - // get config listeners map - Field configListenersMapField = ReflectionUtil.getField(ConfigurationCache.class, "configListenersMap"); - Map> configListenersMap = (Map>)configListenersMapField.get(ConfigurationCache.getInstance()); - - boolean value = configuration.getBoolean("service.disableGlobalTransaction"); - TestListener listener = new TestListener(); - ConfigurationCache.addConfigListener("service.disableGlobalTransaction", listener); - // check listener if exist - HashSet listeners = configListenersMap.get("service.disableGlobalTransaction"); - Assertions.assertTrue(CollectionUtils.isNotEmpty(listeners)); - // change value,trigger listener - System.setProperty("service.disableGlobalTransaction", String.valueOf(!value)); - // remove null - ConfigurationCache.removeConfigListener(null); - // check listener if exist - listeners = configListenersMap.get("service.disableGlobalTransaction"); - Assertions.assertTrue(CollectionUtils.isNotEmpty(listeners)); - // remove listener - ConfigurationCache.removeConfigListener("service.disableGlobalTransaction", listener); - // check listener if exist - listeners = configListenersMap.get("service.disableGlobalTransaction"); - // is empty - Assertions.assertTrue(CollectionUtils.isEmpty(listeners)); - } public static class TestListener implements ConfigurationChangeListener { diff --git a/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java b/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java index 1be77f1a112..5c754e0b8b2 100644 --- a/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java +++ b/config/seata-config-core/src/test/java/org/apache/seata/config/FileConfigurationTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -40,19 +41,27 @@ void tearDown() { void addConfigListener() throws InterruptedException { Configuration fileConfig = ConfigurationFactory.getInstance(); CountDownLatch countDownLatch = new CountDownLatch(1); - boolean value = fileConfig.getBoolean("service.disableGlobalTransaction"); - ConfigurationCache.addConfigListener("service.disableGlobalTransaction", (event) -> { - Assertions.assertEquals(Boolean.parseBoolean(event.getNewValue()), !Boolean.parseBoolean(event.getOldValue())); - countDownLatch.countDown(); + String dataId = "service.disableGlobalTransaction"; + boolean value = fileConfig.getBoolean(dataId); + fileConfig.addConfigListener(dataId, new CachedConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + Assertions.assertEquals(Boolean.parseBoolean(event.getNewValue()), + !Boolean.parseBoolean(event.getOldValue())); + countDownLatch.countDown(); + } }); - System.setProperty("service.disableGlobalTransaction", String.valueOf(!value)); - countDownLatch.await(5, TimeUnit.SECONDS); + System.setProperty(dataId, String.valueOf(!value)); + countDownLatch.await(2, TimeUnit.SECONDS); System.setProperty("file.listener.enabled", "false"); - System.setProperty("service.disableGlobalTransaction", String.valueOf(value)); - Thread.sleep(2000); - boolean currentValue = fileConfig.getBoolean("service.disableGlobalTransaction"); + //wait for loop safety, loop time is LISTENER_CONFIG_INTERVAL=1s + Thread.sleep(1500); + System.setProperty(dataId, String.valueOf(value)); + //sleep for a period of time to simulate waiting for a cache refresh.Actually, it doesn't trigger. + Thread.sleep(1000); + boolean currentValue = fileConfig.getBoolean(dataId); Assertions.assertNotEquals(value, currentValue); - System.setProperty("service.disableGlobalTransaction", String.valueOf(!value)); + System.setProperty(dataId, String.valueOf(!value)); } @Test diff --git a/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java b/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java index aaa685dffad..a1c107d1397 100644 --- a/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java +++ b/config/seata-config-core/src/test/java/org/apache/seata/config/ProConfigurationFactoryTest.java @@ -33,7 +33,7 @@ void getInstance() { Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testNull")); Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testExist")); Configuration instance = ConfigurationFactory.getInstance(); - Assertions.assertEquals(instance.getConfig("service.disableGlobalTransaction"), "true"); + Assertions.assertEquals(instance.getConfig("client.undo.compress.enable"), "true"); Assertions.assertEquals(instance.getConfig("service.default.grouplist"), "127.0.0.1:8092"); } diff --git a/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java b/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java index b04a7b8fdd6..c04ddc9700b 100644 --- a/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java +++ b/config/seata-config-core/src/test/java/org/apache/seata/config/RegistryConfigurationFactoryTest.java @@ -30,7 +30,6 @@ void getInstance() { ConfigurationFactory.reload(); Assertions.assertEquals(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.name"),"file-test.conf"); Configuration instance = ConfigurationFactory.getInstance(); - Assertions.assertEquals(instance.getConfig("service.disableGlobalTransaction"),"true"); Assertions.assertEquals(instance.getConfig("service.default.grouplist"), "127.0.0.1:8091"); } diff --git a/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java b/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java index 2c8958d73c7..f50d31808ac 100644 --- a/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java +++ b/config/seata-config-core/src/test/java/org/apache/seata/config/YamlConfigurationFactoryTest.java @@ -33,7 +33,7 @@ public void getInstance() { Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testNull")); Assertions.assertNull(ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig("config.file.testExist")); Configuration instance = ConfigurationFactory.getInstance(); - Assertions.assertEquals(instance.getConfig("service.disableGlobalTransaction"), "true"); + Assertions.assertEquals(instance.getConfig("transport.heartbeat"), "true"); Assertions.assertEquals(instance.getConfig("service.default.grouplist"), "127.0.0.1:8093"); } diff --git a/config/seata-config-core/src/test/resources/file-test-pro.conf b/config/seata-config-core/src/test/resources/file-test-pro.conf index 49b0f12968c..63098d6ac43 100644 --- a/config/seata-config-core/src/test/resources/file-test-pro.conf +++ b/config/seata-config-core/src/test/resources/file-test-pro.conf @@ -22,4 +22,7 @@ service { default.grouplist = "127.0.0.1:8092" #disable seata disableGlobalTransaction = true +} +client { + undo.compress.enable=true } \ No newline at end of file diff --git a/config/seata-config-core/src/test/resources/file-test-yaml.conf b/config/seata-config-core/src/test/resources/file-test-yaml.conf index 7ec3ed688c2..120200b1e68 100644 --- a/config/seata-config-core/src/test/resources/file-test-yaml.conf +++ b/config/seata-config-core/src/test/resources/file-test-yaml.conf @@ -22,4 +22,8 @@ service { default.grouplist = "127.0.0.1:8093" #disable seata disableGlobalTransaction = true +} + +transport { + heartbeat = true } \ No newline at end of file diff --git a/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java b/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java index fc20d198e26..1fcbfdfb519 100644 --- a/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java +++ b/config/seata-config-custom/src/test/java/org/apache/seata/config/CustomConfigurationForTest.java @@ -60,7 +60,6 @@ public boolean removeConfig(String dataId, long timeoutMills) { @Override public void addConfigListener(String dataId, ConfigurationChangeListener listener) { - throw new UnsupportedOperationException(); } @Override diff --git a/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java b/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java index b7a9106b9fa..c81d06e5afa 100644 --- a/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java +++ b/config/seata-config-etcd3/src/main/java/org/apache/seata/config/etcd3/EtcdConfiguration.java @@ -394,7 +394,7 @@ public void onNext(WatchResponse watchResponse) { List keyValues = getResponse.getKvs(); if (CollectionUtils.isNotEmpty(keyValues)) { event.setDataId(dataId).setNewValue(keyValues.get(0).getValue().toString(UTF_8)); - listener.onChangeEvent(event); + listener.onProcessEvent(event); } } catch (Exception e) { LOGGER.error("error occurred while getting value{}", e.getMessage(), e); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java index 972998c945b..f0e047ad58d 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java @@ -51,7 +51,7 @@ public class NettyClientConfig extends NettyBaseConfig { private static final int MAX_CHECK_ALIVE_RETRY = 300; private static final int CHECK_ALIVE_INTERVAL = 10; private static final String SOCKET_ADDRESS_START_CHAR = "/"; - private static final long MAX_ACQUIRE_CONN_MILLS = 60 * 1000L; + private static final long MAX_ACQUIRE_CONN_MILLS = 10 * 1000L; private static final String RPC_DISPATCH_THREAD_PREFIX = "rpcDispatch"; private static final int DEFAULT_MAX_POOL_ACTIVE = 1; private static final int DEFAULT_MIN_POOL_IDLE = 0; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java index 715230131ed..3e6ec63c15e 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java @@ -68,6 +68,10 @@ public NettyRemotingServer(ThreadPoolExecutor messageExecutor) { super(messageExecutor, new NettyServerConfig()); } + public NettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) { + super(messageExecutor, nettyServerConfig); + } + /** * Sets transactionMessageHandler. * diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index 493992b1f44..6c9a325882f 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -77,6 +77,10 @@ public NettyServerBootstrap(NettyServerConfig nettyServerConfig) { new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } + + if (nettyServerConfig.getServerListenPort() > 0) { + setListenPort(nettyServerConfig.getServerListenPort()); + } } /** diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java index 1d945e26718..9dd373ba0b2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerConfig.java @@ -49,7 +49,7 @@ public class NettyServerConfig extends NettyBaseConfig { ConfigurationKeys.TRANSPORT_PREFIX + "writeBufferHighWaterMark", String.valueOf(67108864))); private int writeBufferLowWaterMark = Integer.parseInt(System.getProperty( ConfigurationKeys.TRANSPORT_PREFIX + "writeBufferLowWaterMark", String.valueOf(1048576))); - private static final int DEFAULT_LISTEN_PORT = 8091; + private int serverListenPort = 0; private static final long RPC_TC_REQUEST_TIMEOUT = CONFIG.getLong(ConfigurationKeys.RPC_TC_REQUEST_TIMEOUT, DEFAULT_RPC_TC_REQUEST_TIMEOUT); private int serverChannelMaxIdleTimeSeconds = Integer.parseInt(System.getProperty( ConfigurationKeys.TRANSPORT_PREFIX + "serverChannelMaxIdleTimeSeconds", String.valueOf(30))); @@ -217,8 +217,12 @@ public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { * * @return the listen port */ - public int getDefaultListenPort() { - return DEFAULT_LISTEN_PORT; + public int getServerListenPort() { + return serverListenPort; + } + + public void setServerListenPort(int port) { + this.serverListenPort = port; } /** diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 61f23687d98..92cbafd0a5d 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -16,6 +16,14 @@ */ package org.apache.seata.core.rpc.netty; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + import io.netty.channel.Channel; import io.netty.util.concurrent.EventExecutorGroup; import org.apache.seata.common.DefaultValues; @@ -23,9 +31,9 @@ import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.StringUtils; -import org.apache.seata.config.ConfigurationCache; +import org.apache.seata.config.CachedConfigurationChangeListener; +import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationChangeEvent; -import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.model.Resource; @@ -43,14 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - import static org.apache.seata.common.Constants.DBKEYS_SPLIT_CHAR; /** @@ -92,9 +92,10 @@ private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutor ThreadPoolExecutor messageExecutor) { super(nettyClientConfig, eventExecutorGroup, messageExecutor, TransactionRole.RMROLE); // set enableClientBatchSendRequest - this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST, + Configuration configuration = ConfigurationFactory.getInstance(); + this.enableClientBatchSendRequest = configuration.getBoolean(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST, ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST,DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST)); - ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() { + configuration.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST, new CachedConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { String dataId = event.getDataId(); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index f9efc611679..68ff739bbb0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -21,17 +21,19 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; + import io.netty.channel.Channel; import io.netty.util.concurrent.EventExecutorGroup; +import org.apache.commons.lang.StringUtils; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.thread.RejectedPolicies; import org.apache.seata.common.util.NetUtil; -import org.apache.seata.config.ConfigurationCache; +import org.apache.seata.config.CachedConfigurationChangeListener; +import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationChangeEvent; -import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.auth.AuthSigner; import org.apache.seata.core.constants.ConfigurationKeys; @@ -41,7 +43,6 @@ import org.apache.seata.core.protocol.RegisterTMResponse; import org.apache.seata.core.rpc.processor.client.ClientHeartbeatProcessor; import org.apache.seata.core.rpc.processor.client.ClientOnResponseProcessor; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +69,10 @@ private TmNettyRemotingClient(NettyClientConfig nettyClientConfig, super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE); this.signer = EnhancedServiceLoader.load(AuthSigner.class); // set enableClientBatchSendRequest - this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, + Configuration configuration = ConfigurationFactory.getInstance(); + this.enableClientBatchSendRequest = configuration.getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST); - ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() { + configuration.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new CachedConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { String dataId = event.getDataId(); diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java new file mode 100644 index 00000000000..45387fc1559 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/NettyClientTestSuite.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import org.junit.platform.suite.api.SelectClasses; +import org.junit.platform.suite.api.Suite; + +@Suite +@SelectClasses({TmNettyClientTest.class, RmNettyClientTest.class}) +public class NettyClientTestSuite { +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java index 14266b15082..1b365501b53 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java @@ -16,45 +16,47 @@ */ package org.apache.seata.core.rpc.netty; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.config.CachedConfigurationChangeListener; import org.apache.seata.config.ConfigurationCache; +import org.apache.seata.config.ConfigurationChangeEvent; +import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** * Rm RPC client test. - * */ +@Order(2) class RmNettyClientTest { - + Logger logger = LoggerFactory.getLogger(getClass()); - + @BeforeAll public static void beforeAll() { RmNettyRemotingClient.getInstance().destroy(); - System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true"); } @AfterAll public static void afterAll() { RmNettyRemotingClient.getInstance().destroy(); - System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false"); } @Test @@ -85,12 +87,16 @@ public void testCheckFailFast() throws Exception { Mockito.when(resourceManager.getManagedResources()).thenReturn(resourceMap); newClient.setResourceManager(resourceManager); System.setProperty("file.listener.enabled", "true"); - ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, - event -> logger.info("dataId:{}, value: {}, oldValue: {}", event.getDataId(), event.getNewValue(), - event.getOldValue())); + ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, new CachedConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + logger.info("dataId:{}, value: {}, oldValue: {}", event.getDataId(), event.getNewValue(), event.getOldValue()); + } + }); System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true"); - Thread.sleep(2000); + ConfigurationCache.clear(); Assertions.assertThrows(FrameworkException.class, newClient::init); + System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false"); } private AtomicBoolean getInitializeStatus(final RmNettyRemotingClient rmNettyRemotingClient) { diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java index ca620f73b87..924711438f8 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java @@ -16,37 +16,40 @@ */ package org.apache.seata.core.rpc.netty; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.config.CachedConfigurationChangeListener; import org.apache.seata.config.ConfigurationCache; -import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.apache.seata.config.ConfigurationChangeEvent; +import org.apache.seata.config.ConfigurationFactory; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * The type Tm rpc client test. - * */ +@Order(1) public class TmNettyClientTest { Logger logger = LoggerFactory.getLogger(getClass()); - private static final ThreadPoolExecutor - workingThreads = new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS, + private static final ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy()); /** @@ -79,18 +82,19 @@ public void testGetInstance() throws Exception { @Test public void testInit() throws Exception { String applicationId = "app 1"; - String transactionServiceGroup = "group A"; - TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); - + String transactionServiceGroup = "default_tx_group"; + TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, + transactionServiceGroup); + System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false"); + ConfigurationCache.clear(); tmNettyRemotingClient.init(); - //check if attr of tmNettyClient object has been set success Field clientBootstrapField = getDeclaredField(tmNettyRemotingClient, "clientBootstrap"); clientBootstrapField.setAccessible(true); NettyClientBootstrap clientBootstrap = (NettyClientBootstrap)clientBootstrapField.get(tmNettyRemotingClient); Field bootstrapField = getDeclaredField(clientBootstrap, "bootstrap"); bootstrapField.setAccessible(true); - Bootstrap bootstrap = (Bootstrap) bootstrapField.get(clientBootstrap); + Bootstrap bootstrap = (Bootstrap)bootstrapField.get(clientBootstrap); Assertions.assertNotNull(bootstrap); Field optionsField = getDeclaredField(bootstrap, "options"); @@ -134,20 +138,26 @@ public void setApplicationId() throws Exception { @AfterAll public static void afterAll() { TmNettyRemotingClient.getInstance().destroy(); - System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false"); } @Test public void testCheckFailFast() throws Exception { TmNettyRemotingClient.getInstance().destroy(); + System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false"); TmNettyRemotingClient tmClient = TmNettyRemotingClient.getInstance("fail_fast", "default_tx_group"); System.setProperty("file.listener.enabled", "true"); - ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, - event -> logger.info("dataId:{}, value: {}, oldValue: {}", event.getDataId(), event.getNewValue(), - event.getOldValue())); + ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, + new CachedConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + logger.info("dataId:{}, value: {}, oldValue: {}", event.getDataId(), event.getNewValue(), + event.getOldValue()); + } + }); System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true"); - Thread.sleep(2000); + ConfigurationCache.clear(); Assertions.assertThrows(FrameworkException.class, tmClient::init); + System.setProperty(ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false"); } /** diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index d1d0a4f3e01..fd9561434f6 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -17,16 +17,16 @@ package org.apache.seata.discovery.registry; import java.net.InetSocketAddress; -import java.util.Set; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Collection; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.apache.seata.config.ConfigurationCache; + import org.apache.seata.config.ConfigurationFactory; /** @@ -113,7 +113,6 @@ public interface RegistryService { default String getServiceGroup(String key) { key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; if (!SERVICE_GROUP_NAME.contains(key)) { - ConfigurationCache.addConfigListener(key); SERVICE_GROUP_NAME.add(key); } return ConfigurationFactory.getInstance().getConfig(key); diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java index e390b794332..288bd5dd782 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java @@ -27,9 +27,9 @@ import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.StringUtils; -import org.apache.seata.config.ConfigurationCache; +import org.apache.seata.config.CachedConfigurationChangeListener; +import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationChangeEvent; -import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.event.EventBus; @@ -70,7 +70,7 @@ * The type Global transactional interceptor handler. * */ -public class GlobalTransactionalInterceptorHandler extends AbstractProxyInvocationHandler implements ConfigurationChangeListener { +public class GlobalTransactionalInterceptorHandler extends AbstractProxyInvocationHandler implements CachedConfigurationChangeListener { private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptorHandler.class); @@ -116,20 +116,18 @@ private void initDefaultGlobalTransactionTimeout() { public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set methodsToProxy) { this.failureHandler = failureHandler == null ? FailureHandlerHolder.getFailureHandler() : failureHandler; this.methodsToProxy = methodsToProxy; - this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, + Configuration configuration = ConfigurationFactory.getInstance(); + this.disable = configuration.getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION); - - boolean degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, + boolean degradeCheck = configuration.getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK); - degradeCheckPeriod = ConfigurationFactory.getInstance() - .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD); - degradeCheckAllowTimes = ConfigurationFactory.getInstance() - .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES); + degradeCheckPeriod = configuration.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD); + degradeCheckAllowTimes = configuration.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES); EVENT_BUS.register(this); if (degradeCheck && degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) { startDegradeCheck(); } - ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this); + configuration.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this); this.initDefaultGlobalTransactionTimeout(); } diff --git a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java index 7a4afb964c7..d7edfa458e0 100644 --- a/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java +++ b/integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/parser/GlobalTransactionalInterceptorParser.java @@ -19,11 +19,12 @@ import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; + import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.ReflectionUtil; -import org.apache.seata.config.ConfigurationCache; -import org.apache.seata.config.ConfigurationChangeListener; +import org.apache.seata.config.CachedConfigurationChangeListener; +import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler; import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; import org.apache.seata.spring.annotation.GlobalLock; @@ -50,7 +51,7 @@ public ProxyInvocationHandler parserInterfaceToProxy(Object target, String objec if (existsAnnotation(serviceInterface) || existsAnnotation(interfacesIfJdk)) { ProxyInvocationHandler proxyInvocationHandler = createProxyInvocationHandler(); - ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener) proxyInvocationHandler); + ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) proxyInvocationHandler); return proxyInvocationHandler; } diff --git a/pom.xml b/pom.xml index a9feab80b9e..c558031e116 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,16 @@ junit-platform-launcher test + + org.junit.platform + junit-platform-suite-api + test + + + org.junit.platform + junit-platform-suite-engine + test + org.mockito mockito-core diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java index 454a55955de..e81bdf1281c 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/exec/LockRetryController.java @@ -18,10 +18,9 @@ import org.apache.seata.common.DefaultValues; import org.apache.seata.common.util.NumberUtils; +import org.apache.seata.config.CachedConfigurationChangeListener; import org.apache.seata.config.Configuration; -import org.apache.seata.config.ConfigurationCache; import org.apache.seata.config.ConfigurationChangeEvent; -import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.context.GlobalLockConfigHolder; @@ -35,10 +34,11 @@ public class LockRetryController { private static final GlobalConfig LISTENER = new GlobalConfig(); + private static final Configuration CONFIG = ConfigurationFactory.getInstance(); static { - ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL, LISTENER); - ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES, LISTENER); + CONFIG.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_INTERVAL, LISTENER); + CONFIG.addConfigListener(ConfigurationKeys.CLIENT_LOCK_RETRY_TIMES, LISTENER); } private int lockRetryInterval; @@ -98,7 +98,7 @@ int getLockRetryTimes() { return LISTENER.getGlobalLockRetryTimes(); } - static class GlobalConfig implements ConfigurationChangeListener { + static class GlobalConfig implements CachedConfigurationChangeListener { private volatile int globalLockRetryInterval; diff --git a/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java b/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java index f702334247e..5321532d5b9 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java @@ -32,10 +32,9 @@ import org.apache.seata.common.loader.Scope; import org.apache.seata.common.util.IOUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.CachedConfigurationChangeListener; import org.apache.seata.config.Configuration; -import org.apache.seata.config.ConfigurationCache; import org.apache.seata.config.ConfigurationChangeEvent; -import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.constants.ServerTableColumnsName; @@ -94,7 +93,7 @@ public DataBaseDistributedLocker() { if (StringUtils.isBlank(distributedLockTable)) { demotion = true; - ConfigurationCache.addConfigListener(DISTRIBUTED_LOCK_DB_TABLE, new ConfigurationChangeListener() { + configuration.addConfigListener(DISTRIBUTED_LOCK_DB_TABLE, new CachedConfigurationChangeListener() { @Override public void onChangeEvent(ConfigurationChangeEvent event) { String newValue = event.getNewValue(); @@ -102,7 +101,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { distributedLockTable = newValue; init(); demotion = false; - ConfigurationCache.removeConfigListener(DISTRIBUTED_LOCK_DB_TABLE, this); + ConfigurationFactory.getInstance().removeConfigListener(DISTRIBUTED_LOCK_DB_TABLE, this); } } }); diff --git a/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java b/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java index b6d7c226867..cc4c643a6cd 100644 --- a/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java +++ b/spring/src/main/java/org/apache/seata/spring/annotation/GlobalTransactionScanner.java @@ -23,14 +23,17 @@ import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; + import javax.annotation.Nullable; import com.google.common.collect.ImmutableSet; +import org.aopalliance.aop.Advice; +import org.aopalliance.intercept.MethodInterceptor; +import org.apache.commons.lang.ArrayUtils; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; -import org.apache.seata.config.ConfigurationCache; +import org.apache.seata.config.CachedConfigurationChangeListener; import org.apache.seata.config.ConfigurationChangeEvent; -import org.apache.seata.config.ConfigurationChangeListener; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.rpc.ShutdownHook; @@ -54,9 +57,6 @@ import org.apache.seata.tm.TMClient; import org.apache.seata.tm.api.FailureHandler; import org.apache.seata.tm.api.FailureHandlerHolder; -import org.aopalliance.aop.Advice; -import org.aopalliance.intercept.MethodInterceptor; -import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.Advisor; @@ -86,7 +86,7 @@ * */ public class GlobalTransactionScanner extends AbstractAutoProxyCreator - implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { + implements CachedConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { private static final long serialVersionUID = 1L; @@ -473,8 +473,7 @@ public void afterPropertiesSet() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } - ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, - (ConfigurationChangeListener) this); + ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this); return; } if (initialized.compareAndSet(false, true)) { @@ -561,7 +560,7 @@ public void onChangeEvent(ConfigurationChangeEvent event) { LOGGER.info("{} config changed, old value:true, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, event.getNewValue()); initClient(); - ConfigurationCache.removeConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, this); + ConfigurationFactory.getInstance().removeConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, this); } } } diff --git a/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java b/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java index 308d7211f3b..d8708605e0a 100644 --- a/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java +++ b/test/src/test/java/org/apache/seata/common/ConfigurationTestHelper.java @@ -16,14 +16,15 @@ */ package org.apache.seata.common; -import org.apache.seata.config.ConfigurationCache; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.seata.config.CachedConfigurationChangeListener; +import org.apache.seata.config.ConfigurationChangeEvent; import org.apache.seata.config.ConfigurationFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * the type ConfigurationTestHelper **/ @@ -38,7 +39,13 @@ public static void removeConfig(String dataId) { public static void putConfig(String dataId, String content) { CountDownLatch countDownLatch = new CountDownLatch(1); - ConfigurationCache.addConfigListener(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, event -> countDownLatch.countDown()); + ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, + new CachedConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + countDownLatch.countDown(); + } + }); if (content == null) { System.clearProperty(dataId); ConfigurationFactory.getInstance().removeConfig(dataId); diff --git a/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java b/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java index bb26a62dca6..670261d7415 100644 --- a/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java +++ b/test/src/test/java/org/apache/seata/saga/engine/db/AbstractServerTest.java @@ -25,6 +25,7 @@ import org.apache.seata.common.util.NetUtil; import org.apache.seata.core.rpc.ShutdownHook; import org.apache.seata.core.rpc.netty.NettyRemotingServer; +import org.apache.seata.core.rpc.netty.NettyServerConfig; import org.apache.seata.server.ParameterParser; import org.apache.seata.server.UUIDGenerator; import org.apache.seata.server.coordinator.DefaultCoordinator; @@ -55,7 +56,9 @@ public void run() { //initialize the metrics MetricsManager.get().init(); - nettyServer = new NettyRemotingServer(workingThreads); + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setServerListenPort(8091); + nettyServer = new NettyRemotingServer(workingThreads, nettyServerConfig); UUIDGenerator.init(parameterParser.getServerNode()); //log store mode : file、db SessionHolder.init(); @@ -63,6 +66,7 @@ public void run() { DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyServer); coordinator.init(); nettyServer.setHandler(coordinator); + // register ShutdownHook ShutdownHook.getInstance().addDisposable(coordinator);