diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java index 389a2c610a53ef..79e57ba22a481e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java @@ -151,10 +151,10 @@ private void relocateAndBalanceGroup() { continue; } - Set unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId); - List availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds); + Set unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId); + List availableBeIds = getAvailableBeIds(db.getClusterName(), infoService); List> balancedBackendsPerBucketSeq = Lists.newArrayList(); - if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) { + if (relocateAndBalance(groupId, unavailableBeIdsInGroup, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) { colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); catalog.getEditLog().logColocateBackendsPerBucketSeq(info); @@ -484,27 +484,44 @@ private List getBeSeqIndexes(List flatBackendsPerBucketSeq, long private Set getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) { Set backends = colocateIndex.getBackendsByGroup(groupId); Set unavailableBeIds = Sets.newHashSet(); - long currTime = System.currentTimeMillis(); for (Long backendId : backends) { - Backend be = infoService.getBackend(backendId); - if (be == null) { + if (!checkBackendAvailable(backendId, infoService)) { unavailableBeIds.add(backendId); - } else if (!be.isAvailable()) { - // 1. BE is dead for a long time - // 2. BE is under decommission - if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2) - || be.isDecommissioned()) { - unavailableBeIds.add(backendId); - } } } return unavailableBeIds; } - private List getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set unavailableBeIds) { - List allBackendIds = infoService.getClusterBackendIds(cluster, true); - return allBackendIds.stream() - .filter(id -> !unavailableBeIds.contains(id)) - .collect(Collectors.toList()); + private List getAvailableBeIds(String cluster, SystemInfoService infoService) { + // get all backends to allBackendIds, and check be availability using checkBackendAvailable + // backend stopped for a short period of time is still considered available + List allBackendIds = infoService.getClusterBackendIds(cluster, false); + List availableBeIds = Lists.newArrayList(); + for (Long backendId : allBackendIds) { + if (checkBackendAvailable(backendId, infoService)) { + availableBeIds.add(backendId); + } + } + return availableBeIds; + } + + /** + * check backend available + * backend stopped for a short period of time is still considered available + */ + private boolean checkBackendAvailable(Long backendId, SystemInfoService infoService) { + long currTime = System.currentTimeMillis(); + Backend be = infoService.getBackend(backendId); + if (be == null) { + return false; + } else if (!be.isAvailable()) { + // 1. BE is dead for a long time + // 2. BE is under decommission + if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2) + || be.isDecommissioned()) { + return false; + } + } + return true; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java index ee59267ae97e86..47c2f58165d602 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java @@ -362,18 +362,75 @@ public void testGetUnavailableBeIdsInGroup(@Mocked ColocateTableIndex colocateTa } @Test - public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) { - List clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L); + public void testGetAvailableBeIds(@Mocked SystemInfoService infoService, + @Mocked Backend myBackend2, + @Mocked Backend myBackend3, + @Mocked Backend myBackend4, + @Mocked Backend myBackend5) { + List clusterBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L); new Expectations(){ { - infoService.getClusterBackendIds("cluster1", true); - result = clusterAliveBackendIds; + infoService.getClusterBackendIds("cluster1", false); + result = clusterBackendIds; + minTimes = 0; + + infoService.getBackend(1L); + result = null; + minTimes = 0; + + // backend2 is available + infoService.getBackend(2L); + result = myBackend2; + minTimes = 0; + myBackend2.isAvailable(); + result = true; + minTimes = 0; + + // backend3 not available, and dead for a long time + infoService.getBackend(3L); + result = myBackend3; + minTimes = 0; + myBackend3.isAvailable(); + result = false; + minTimes = 0; + myBackend3.isAlive(); + result = false; + minTimes = 0; + myBackend3.getLastUpdateMs(); + result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20; + minTimes = 0; + + // backend4 available, not alive but dead for a short time + infoService.getBackend(4L); + result = myBackend4; + minTimes = 0; + myBackend4.isAvailable(); + result = false; + minTimes = 0; + myBackend4.isAlive(); + result = false; + minTimes = 0; + myBackend4.getLastUpdateMs(); + result = System.currentTimeMillis(); + minTimes = 0; + + // backend5 not available, and in decommission + infoService.getBackend(5L); + result = myBackend5; + minTimes = 0; + myBackend5.isAvailable(); + result = false; + minTimes = 0; + myBackend5.isAlive(); + result = true; + minTimes = 0; + myBackend5.isDecommissioned(); + result = true; minTimes = 0; } }; - Set unavailableBeIds = Sets.newHashSet(4L, 5L, 6L); - List availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds); - Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray()); + List availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIds","cluster1", infoService); + Assert.assertArrayEquals(new long[]{2L, 4L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray()); } }