From 4264174d0f94320883b0c15e3f676fb20ca17dfd Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 14:44:31 +0800 Subject: [PATCH 1/8] fall back to any of available cluster address when query cluster address is empty --- .../consul/ConsulRegistryServiceImpl.java | 5 ++- .../discovery/registry/RegistryService.java | 36 ++++++++++++++----- .../etcd3/EtcdRegistryServiceImpl.java | 5 ++- .../eureka/EurekaRegistryServiceImpl.java | 5 ++- .../nacos/NacosRegistryServiceImpl.java | 4 ++- .../NamingserverRegistryServiceImpl.java | 22 +++--------- .../redis/RedisRegistryServiceImpl.java | 5 ++- .../sofa/SofaRegistryServiceImpl.java | 5 ++- .../zk/ZookeeperRegisterServiceImpl.java | 5 ++- .../zk/ZookeeperRegisterServiceImplTest.java | 25 ++++++------- 10 files changed, 73 insertions(+), 44 deletions(-) diff --git a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java index e44ac33b3aa..72fa626b1f4 100644 --- a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java +++ b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class ConsulRegistryServiceImpl implements RegistryService lookup(String key) throws Exception { + transactionServiceGroup = key; final String cluster = getServiceGroup(key); if (cluster == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -311,7 +314,7 @@ private void refreshCluster(String cluster, List services) { clusterAddressMap.put(cluster, addresses); - removeOfflineAddressesIfNecessary(cluster, addresses); + removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, addresses); } /** diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index fd9561434f6..2d42619d7ac 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -16,6 +16,9 @@ */ package org.apache.seata.discovery.registry; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.config.ConfigurationFactory; + import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -27,8 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.apache.seata.config.ConfigurationFactory; - /** * The interface Registry service. * @@ -54,7 +55,7 @@ public interface RegistryService { /** * Service node health check */ - Map> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>(); + Map>> CURRENT_ADDRESS_MAP = new ConcurrentHashMap<>(); /** * Register. * @@ -119,12 +120,28 @@ default String getServiceGroup(String key) { } default List aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>()); + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + k -> new ConcurrentHashMap<>()); + + String clusterName = getServiceGroup(transactionServiceGroup); + List inetSocketAddresses = clusterAddressMap.get(clusterName); + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + return inetSocketAddresses; + } + + // fall back to addresses of any cluster + return clusterAddressMap.values().stream().findAny().orElse(Collections.emptyList()); } default List refreshAliveLookup(String transactionServiceGroup, List aliveAddress) { - return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress); + + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + key -> new ConcurrentHashMap<>()); + + String clusterName = getServiceGroup(transactionServiceGroup); + + return clusterAddressMap.put(clusterName, aliveAddress); } @@ -137,15 +154,18 @@ default List refreshAliveLookup(String transactionServiceGrou * @param clusterName * @param newAddressed */ - default void removeOfflineAddressesIfNecessary(String clusterName, Collection newAddressed) { + default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection newAddressed) { + + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService, + key -> new ConcurrentHashMap<>()); - List currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList()); + List currentAddresses = clusterAddressMap.getOrDefault(clusterName, new ArrayList<>()); List inetSocketAddresses = currentAddresses .stream().filter(newAddressed::contains).collect( Collectors.toList()); - CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses); + clusterAddressMap.put(clusterName, inetSocketAddresses); } } diff --git a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java index 70e3dfe8520..96d3cf05110 100644 --- a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java +++ b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class EtcdRegistryServiceImpl implements RegistryService private static final int MAP_INITIAL_CAPACITY = 8; private static final int THREAD_POOL_SIZE = 2; private ExecutorService executorService; + + private String transactionServiceGroup; /** * TTL for lease */ @@ -181,6 +183,7 @@ public void unsubscribe(String cluster, Watch.Listener listener) throws Exceptio @Override public List lookup(String key) throws Exception { + transactionServiceGroup = key; final String cluster = getServiceGroup(key); if (cluster == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -252,7 +255,7 @@ private void refreshCluster(String cluster) throws Exception { }).collect(Collectors.toList()); clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList)); - removeOfflineAddressesIfNecessary(cluster, instanceList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList); } /** diff --git a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 80d640c57c9..5ab5191234d 100644 --- a/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/org/apache/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -75,6 +75,8 @@ public class EurekaRegistryServiceImpl implements RegistryService lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -169,7 +172,7 @@ private void refreshCluster(String clusterName) { .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } } diff --git a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java index 7ddda0d0e7e..1f6abccba4a 100644 --- a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java +++ b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java @@ -84,6 +84,8 @@ public class NacosRegistryServiceImpl implements RegistryService private static final Pattern DEFAULT_SLB_REGISTRY_PATTERN = Pattern.compile("(?!.*internal)(?=.*seata).*mse.aliyuncs.com"); private static volatile Boolean useSLBWay; + private String transactionServiceGroup; + private NacosRegistryServiceImpl() { String configForNacosSLB = FILE_CONFIG.getConfig(getNacosUrlPatternOfSLB()); Pattern patternOfNacosRegistryForSLB = StringUtils.isBlank(configForNacosSLB) @@ -193,7 +195,7 @@ public List lookup(String key) throws Exception { .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } }); } diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index 2781f4687bc..899e8bd4a55 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -16,7 +16,6 @@ */ package org.apache.seata.discovery.registry.namingserver; - import java.io.IOException; import java.net.InetSocketAddress; import java.rmi.RemoteException; @@ -41,22 +40,22 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; import org.apache.seata.common.metadata.Node; import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.metadata.namingserver.MetaResponse; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.discovery.registry.RegistryService; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,17 +321,6 @@ public void unsubscribe(String vGroup) throws Exception { isSubscribed = false; } - @Override - public List aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>()); - } - - @Override - public List refreshAliveLookup(String transactionServiceGroup, - List aliveAddress) { - return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress); - } - /** * @param key vGroup name * @return List available instance list diff --git a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java index 769799f6930..6c18a57e72f 100644 --- a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java +++ b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java @@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService private static final long KEY_TTL = 5L; private static final long KEY_REFRESH_PERIOD = 2000L; + private String transactionServiceGroup; + private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RedisRegistryService-subscribe", 1)); private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1, @@ -219,6 +221,7 @@ public void unsubscribe(String cluster, RedisListener listener) { @Override public List lookup(String key) { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -280,7 +283,7 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S } socketAddresses.remove(inetSocketAddress); - removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses); + removeOfflineAddressesIfNecessary(transactionServiceGroup, notifyCluserName, socketAddresses); } @Override diff --git a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java index a7e4e087fa4..fa0b428dda1 100644 --- a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java +++ b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java @@ -81,6 +81,8 @@ public class SofaRegistryServiceImpl implements RegistryService lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { String missingDataId = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key; @@ -174,7 +177,7 @@ public List lookup(String key) throws Exception { List newAddressList = flatData(instances); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } respondRegistries.countDown(); }); diff --git a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 9b112b1efb5..84e70abe041 100644 --- a/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -78,6 +78,8 @@ public class ZookeeperRegisterServiceImpl implements RegistryService REGISTERED_PATH_SET = Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE)); + private String transactionServiceGroup; + private ZookeeperRegisterServiceImpl() { } @@ -175,6 +177,7 @@ public void unsubscribe(String cluster, IZkChildListener listener) throws Except */ @Override public List lookup(String key) throws Exception { + transactionServiceGroup = key; String clusterName = getServiceGroup(key); if (clusterName == null) { @@ -309,7 +312,7 @@ private void refreshClusterAddressMap(String clusterName, List instances } CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); - removeOfflineAddressesIfNecessary(clusterName, newAddressList); + removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, newAddressList); } private String getClusterName() { diff --git a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index cd7afa792f7..9cbfd3efd65 100644 --- a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -18,9 +18,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -128,18 +126,21 @@ public void testLookUp() throws Exception { @Test public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("default_tx_group","cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test public void testRemoveOfflineAddressesIfNecessaryRemoveCase() { - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); - Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + service.removeOfflineAddressesIfNecessary("default_tx_group", "cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test @@ -148,7 +149,8 @@ public void testAliveLookup() { System.setProperty("txServiceGroup", "default_tx_group"); System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); List result = service.aliveLookup("default_tx_group"); Assertions.assertEquals(result, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); @@ -161,12 +163,11 @@ public void tesRefreshAliveLookup() { System.setProperty("txServiceGroup", "default_tx_group"); System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); - service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); service.refreshAliveLookup("default_tx_group", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); - Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"), + Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster"), Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); } } From 048bd73ddfbfe0f5fda2924aeaf026dfb5687cf6 Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 14:53:34 +0800 Subject: [PATCH 2/8] add register info --- changes/en-us/2.x.md | 2 ++ changes/zh-cn/2.x.md | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 87ce714321c..950430fe168 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -70,6 +70,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6787](https://github.com/apache/incubator-seata/pull/6787)] upgrade elliptic to 6.5.7 - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty ### refactor: @@ -112,6 +113,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [imashimaro](https://github.com/hmj776521114) - [lyl2008dsg](https://github.com/lyl2008dsg) - [l81893521](https://github.com/l81893521) +- [laywin](https://github.com/laywin) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 11ee91bcba5..564fa32d105 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -71,7 +71,7 @@ - [[#6787](https://github.com/apache/incubator-seata/pull/6787)] 升级 elliptic 至 6.5.7 版本 - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题 - +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址 ### refactor: @@ -116,6 +116,7 @@ - [imashimaro](https://github.com/hmj776521114) - [lyl2008dsg](https://github.com/lyl2008dsg) - [l81893521](https://github.com/l81893521) +- [laywin](https://github.com/laywin) From d8934e385f7fd41f2c4df3cf56f8a522c2b9c654 Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 16:12:03 +0800 Subject: [PATCH 3/8] do some optimize --- .../discovery/registry/RegistryService.java | 8 +++-- .../NamingserverRegistryServiceImpl.java | 33 +++++++++++++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index 2d42619d7ac..1cda7d2b1d1 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -130,7 +130,8 @@ default List aliveLookup(String transactionServiceGroup) { } // fall back to addresses of any cluster - return clusterAddressMap.values().stream().findAny().orElse(Collections.emptyList()); + return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty) + .findAny().orElse(Collections.emptyList()); } default List refreshAliveLookup(String transactionServiceGroup, @@ -165,7 +166,10 @@ default void removeOfflineAddressesIfNecessary(String transactionGroupService, S .stream().filter(newAddressed::contains).collect( Collectors.toList()); - clusterAddressMap.put(clusterName, inetSocketAddresses); + // prevent empty update + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + clusterAddressMap.put(clusterName, inetSocketAddresses); + } } } diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index 899e8bd4a55..178593c4b4c 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -19,12 +19,13 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.rmi.RemoteException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import java.util.HashMap; +import java.util.Objects; import java.util.Map; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Objects; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -50,6 +51,7 @@ import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.metadata.namingserver.MetaResponse; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.HttpClientUtil; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -401,6 +403,31 @@ public String getNamespace() { return namespace; } + @Override + public List aliveLookup(String transactionServiceGroup) { + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + k -> new ConcurrentHashMap<>()); + + List inetSocketAddresses = clusterAddressMap.get(transactionServiceGroup); + if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { + return inetSocketAddresses; + } + + // fall back to addresses of any cluster + return clusterAddressMap.values().stream().filter(CollectionUtils::isNotEmpty) + .findAny().orElse(Collections.emptyList()); + } + + @Override + public List refreshAliveLookup(String transactionServiceGroup, + List aliveAddress) { + Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, + key -> new ConcurrentHashMap<>()); + + + return clusterAddressMap.put(transactionServiceGroup, aliveAddress); + } + /** * get one namingserver url From 44d9b310e55c16d94d4dda8520d6e309577e18d9 Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 16:57:19 +0800 Subject: [PATCH 4/8] fix style --- .../apache/seata/discovery/registry/RegistryService.java | 6 +++--- .../namingserver/NamingserverRegistryServiceImpl.java | 4 ++-- .../registry/zk/ZookeeperRegisterServiceImplTest.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index 1cda7d2b1d1..c98378c3a07 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -121,7 +121,7 @@ default String getServiceGroup(String key) { default List aliveLookup(String transactionServiceGroup) { Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, - k -> new ConcurrentHashMap<>()); + k -> new ConcurrentHashMap<>()); String clusterName = getServiceGroup(transactionServiceGroup); List inetSocketAddresses = clusterAddressMap.get(clusterName); @@ -138,7 +138,7 @@ default List refreshAliveLookup(String transactionServiceGrou List aliveAddress) { Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, - key -> new ConcurrentHashMap<>()); + key -> new ConcurrentHashMap<>()); String clusterName = getServiceGroup(transactionServiceGroup); @@ -158,7 +158,7 @@ default List refreshAliveLookup(String transactionServiceGrou default void removeOfflineAddressesIfNecessary(String transactionGroupService, String clusterName, Collection newAddressed) { Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService, - key -> new ConcurrentHashMap<>()); + key -> new ConcurrentHashMap<>()); List currentAddresses = clusterAddressMap.getOrDefault(clusterName, new ArrayList<>()); diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index 178593c4b4c..a479fe69d15 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -406,7 +406,7 @@ public String getNamespace() { @Override public List aliveLookup(String transactionServiceGroup) { Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, - k -> new ConcurrentHashMap<>()); + k -> new ConcurrentHashMap<>()); List inetSocketAddresses = clusterAddressMap.get(transactionServiceGroup); if (CollectionUtils.isNotEmpty(inetSocketAddresses)) { @@ -422,7 +422,7 @@ public List aliveLookup(String transactionServiceGroup) { public List refreshAliveLookup(String transactionServiceGroup, List aliveAddress) { Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, - key -> new ConcurrentHashMap<>()); + key -> new ConcurrentHashMap<>()); return clusterAddressMap.put(transactionServiceGroup, aliveAddress); diff --git a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 9cbfd3efd65..200d4508427 100644 --- a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -134,13 +134,13 @@ public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { } @Test - public void testRemoveOfflineAddressesIfNecessaryRemoveCase() { + public void testRemovePreventEmptyPushCase() { Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); service.removeOfflineAddressesIfNecessary("default_tx_group", "cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); - Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("default_tx_group").get("cluster").size()); } @Test From 57ac72f39d1071fb97031b83b96643a980efe47b Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 17:13:08 +0800 Subject: [PATCH 5/8] triger ci --- .../discovery/registry/zk/ZookeeperRegisterServiceImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 200d4508427..dffd1a279a3 100644 --- a/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/org/apache/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -136,6 +136,7 @@ public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { @Test public void testRemovePreventEmptyPushCase() { Map> addresses = service.CURRENT_ADDRESS_MAP.computeIfAbsent("default_tx_group", k -> new HashMap<>()); + addresses.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); service.removeOfflineAddressesIfNecessary("default_tx_group", "cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); From e83972fe6e706363fa05d68fa15a53b5a0a2449a Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 17:45:53 +0800 Subject: [PATCH 6/8] optimize --- .../org/apache/seata/discovery/registry/RegistryService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index c98378c3a07..779c84f6fe7 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -160,7 +160,7 @@ default void removeOfflineAddressesIfNecessary(String transactionGroupService, S Map> clusterAddressMap = CURRENT_ADDRESS_MAP.computeIfAbsent(transactionGroupService, key -> new ConcurrentHashMap<>()); - List currentAddresses = clusterAddressMap.getOrDefault(clusterName, new ArrayList<>()); + List currentAddresses = clusterAddressMap.getOrDefault(clusterName, Collections.emptyList()); List inetSocketAddresses = currentAddresses .stream().filter(newAddressed::contains).collect( From e3fe63f713d933e43f56786f7c3bfb62e27af2f9 Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Thu, 29 Aug 2024 17:49:47 +0800 Subject: [PATCH 7/8] optimize --- .../org/apache/seata/discovery/registry/RegistryService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index 779c84f6fe7..efc997b20bf 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -20,7 +20,6 @@ import org.apache.seata.config.ConfigurationFactory; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; From dec1ec47bd792e5207438667452ea757f2e28b14 Mon Sep 17 00:00:00 2001 From: yuanxiao01 Date: Fri, 30 Aug 2024 09:54:18 +0800 Subject: [PATCH 8/8] modify register info --- changes/en-us/2.x.md | 2 +- changes/zh-cn/2.x.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 0e4fd4afac0..a779856f936 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -28,6 +28,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] fix namingserver node term - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] fix MySQL driver loading by replacing custom classloader with system classloader for better compatibility and simplified process - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty ### optimize: @@ -71,7 +72,6 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] rename the server naming/v1 api to vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] fix npmjs conflicts - [[#6794](https://github.com/apache/incubator-seata/pull/6794)] optimize NacosMockTest UT case -- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 90b94426179..a915329758b 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -29,6 +29,7 @@ - [[#6778](https://github.com/apache/incubator-seata/pull/6778)] 修复namingserver的节点term为0问题 - [[#6765](https://github.com/apache/incubator-seata/pull/6765)] 改进MySQL驱动加载机制,将自定义类加载器替换为系统类加载器,更兼容简化流程 - [[#6781](https://github.com/apache/incubator-seata/pull/6781)] 修复tc下线时,由于定时任务没有先关闭,导致下线后还会被注册上,需要靠namingserver的健康检查来下线的bug +- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址 ### optimize: @@ -72,7 +73,6 @@ - [[#6783](https://github.com/apache/incubator-seata/pull/6783)] 将server事务分组修改接口改为/vgroup/v1 - [[#6793](https://github.com/apache/incubator-seata/pull/6793)] 修复 npmjs 依赖冲突问题 - [[#6794](https://github.com/apache/incubator-seata/pull/6794)] 优化 NacosMockTest 单测问题 -- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址 ### refactor: