diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index f27b9fdf0cc870..358fc1023b297a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -26,6 +26,7 @@ import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.qe.ConnectContext; @@ -71,7 +72,8 @@ public class CloudReplica extends Replica { private Map> memClusterToBackends = new ConcurrentHashMap>(); // clusterId, secondaryBe, changeTimestamp - private Map> secondaryClusterToBackends = new ConcurrentHashMap>(); + private Map> secondaryClusterToBackends + = new ConcurrentHashMap>(); public CloudReplica() { } @@ -369,13 +371,17 @@ public Backend getPrimaryBackend(String clusterId) { } public Backend getSecondaryBackend(String clusterId) { - List backendIds = secondaryClusterToBackends.get(clusterId); - if (backendIds == null || backendIds.isEmpty()) { + Pair secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId); + if (secondBeAndChangeTimestamp == null) { return null; } - - long backendId = backendIds.get(0); - return Env.getCurrentSystemInfo().getBackend(backendId); + long beId = secondBeAndChangeTimestamp.key(); + long changeTimestamp = secondBeAndChangeTimestamp.value(); + if (LOG.isDebugEnabled()) { + LOG.debug("in secondaryClusterToBackends clusterId {}, beId {}, changeTimestamp {}, replica info {}", + clusterId, beId, changeTimestamp, this); + } + return Env.getCurrentSystemInfo().getBackend(secondBeAndChangeTimestamp.first); } public long hashReplicaToBe(String clusterId, boolean isBackGround) throws ComputeGroupException { @@ -583,11 +589,35 @@ public void updateClusterToPrimaryBe(String cluster, long beId) { } private void updateClusterToSecondaryBe(String cluster, long beId) { - secondaryClusterToBackends.put(cluster, Lists.newArrayList(beId)); + long changeTimestamp = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp {}, replica info {}", + cluster, beId, changeTimestamp, this); + } + secondaryClusterToBackends.put(cluster, Pair.of(beId, changeTimestamp)); } public void clearClusterToBe(String cluster) { primaryClusterToBackends.remove(cluster); secondaryClusterToBackends.remove(cluster); } + + // ATTN: This func is only used by redundant tablet report clean in bes. + // Only the master node will do the diff logic, + // so just only need to clean up secondaryClusterToBackends on the master node. + public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimestamp) { + Pair secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId); + if (secondBeAndChangeTimestamp == null) { + return; + } + long beId = secondBeAndChangeTimestamp.key(); + long changeTimestamp = secondBeAndChangeTimestamp.value(); + + if (changeTimestamp < expireTimestamp) { + LOG.debug("remove clusterId {} secondary beId {} changeTimestamp {} expireTimestamp {} replica info {}", + clusterId, beId, changeTimestamp, expireTimestamp, this); + secondaryClusterToBackends.remove(clusterId); + return; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 6eb5567f7e83db..b93d4fe2cff464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -548,6 +548,8 @@ private boolean completeRouteInfo() { for (Tablet tablet : index.getTablets()) { for (Replica r : tablet.getReplicas()) { CloudReplica replica = (CloudReplica) r; + // clean secondary map + replica.checkAndClearSecondaryClusterToBe(cluster, needRehashDeadTime); InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster); // colocate table no need to update primary backends if (isColocated) { diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 151de976a8303c..caac5b73cfdbba 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -44,15 +44,36 @@ suite('test_clean_tablet_when_rebalance', 'docker') { def choseDeadBeIndex = 1 def table = "test_clean_tablet_when_rebalance" - def testCase = { deadTime, mergedCacheDir -> - boolean beDeadLong = deadTime > rehashTime ? true : false - logger.info("begin exec beDeadLong {}", beDeadLong) - + def selectTriggerRehash = { -> for (int i = 0; i < 5; i++) { sql """ select count(*) from $table """ } + } + + def getTabletInHostFromBe = { bes -> + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each { + if (ret[it] != null) { + ret[it].add(data.host) + } else { + ret[it] = [data.host] + } + } + } + ret + } + + def testCase = { deadTime, mergedCacheDir -> + boolean beDeadLong = deadTime > rehashTime ? true : false + logger.info("begin exec beDeadLong {}", beDeadLong) + + selectTriggerRehash.call() def beforeGetFromFe = getTabletAndBeHostFromFe(table) def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) @@ -64,38 +85,65 @@ suite('test_clean_tablet_when_rebalance', 'docker') { cluster.stopBackends(choseDeadBeIndex) dockerAwaitUntil(50) { - def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") - .collect { it.BackendId } - .unique() - logger.info("bes {}", bes) + def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}") + def bes = showTablets + .collect { it.BackendId } + .unique() + logger.info("before start bes {}, tablets {}", bes, showTablets) bes.size() == 2 } + // rehash + selectTriggerRehash.call() + // curl be, tablets in 2 bes if (beDeadLong) { setFeConfig('enable_cloud_partition_balance', false) setFeConfig('enable_cloud_table_balance', false) setFeConfig('enable_cloud_global_balance', false) } - sleep(deadTime * 1000) + // wait report logic + sleep(deadTime * 1000) cluster.startBackends(choseDeadBeIndex) + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + logger.info("after stop one be, rehash fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) dockerAwaitUntil(50) { - def bes = sql_return_maparray("SHOW TABLETS FROM ${table}") + def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}") + def bes = showTablets .collect { it.BackendId } .unique() - logger.info("bes {}", bes) + logger.info("after start bes {}, tablets {}", bes, showTablets) bes.size() == (beDeadLong ? 2 : 3) } - for (int i = 0; i < 5; i++) { - sleep(2000) - sql """ - select count(*) from $table - """ + + selectTriggerRehash.call() + // wait report logic + // tablet report clean not work, before sleep, in fe secondary not been clear + afterGetFromFe = getTabletAndBeHostFromFe(table) + afterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends()) + logger.info("before sleep rehash time, fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + def redundancyTablet = null + afterGetFromFe.each { + assertTrue(afterGetFromBe.containsKey(it.Key)) + if (afterGetFromBe[it.Key].size() == 2) { + redundancyTablet = it.Key + logger.info("find tablet {} redundancy in {}", it.Key, afterGetFromBe[it.Key]) + } + assertTrue(afterGetFromBe[it.Key].contains(it.Value[1])) } - def afterGetFromFe = getTabletAndBeHostFromFe(table) - def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) - logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + + sleep(rehashTime * 1000 + 10 * 1000) + // tablet report clean will work, after sleep, in fe secondary been clear + + afterGetFromFe = getTabletAndBeHostFromFe(table) + afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + if (!beDeadLong) { + def checkAfterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends()) + assertEquals(1, checkAfterGetFromBe[redundancyTablet].size()) + } + logger.info("after sleep rehash time, fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) afterGetFromFe.each { assertTrue(afterGetFromBe.containsKey(it.Key)) assertEquals(afterGetFromBe[it.Key], it.Value[1]) @@ -163,7 +211,8 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ); """ sql """ - insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3'), + (4, 4,'v4'), (5, 5, 'v5'), (6, 6, 'v6'), (100, 100, 'v100'), (7, 7, 'v7') """ def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) // 'rehash_tablet_after_be_dead_seconds=100'