Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -71,7 +72,8 @@ public class CloudReplica extends Replica {
private Map<String, List<Long>> memClusterToBackends = new ConcurrentHashMap<String, List<Long>>();

// clusterId, secondaryBe, changeTimestamp
private Map<String, List<Long>> secondaryClusterToBackends = new ConcurrentHashMap<String, List<Long>>();
private Map<String, Pair<Long, Long>> secondaryClusterToBackends
= new ConcurrentHashMap<String, Pair<Long, Long>>();

public CloudReplica() {
}
Expand Down Expand Up @@ -369,13 +371,17 @@ public Backend getPrimaryBackend(String clusterId) {
}

public Backend getSecondaryBackend(String clusterId) {
List<Long> backendIds = secondaryClusterToBackends.get(clusterId);
if (backendIds == null || backendIds.isEmpty()) {
Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId);
if (secondBeAndChangeTimestamp == null) {
return null;
}

long backendId = backendIds.get(0);
return Env.getCurrentSystemInfo().getBackend(backendId);
long beId = secondBeAndChangeTimestamp.key();
long changeTimestamp = secondBeAndChangeTimestamp.value();
if (LOG.isDebugEnabled()) {
LOG.debug("in secondaryClusterToBackends clusterId {}, beId {}, changeTimestamp {}, replica info {}",
clusterId, beId, changeTimestamp, this);
}
return Env.getCurrentSystemInfo().getBackend(secondBeAndChangeTimestamp.first);
}

public long hashReplicaToBe(String clusterId, boolean isBackGround) throws ComputeGroupException {
Expand Down Expand Up @@ -583,11 +589,35 @@ public void updateClusterToPrimaryBe(String cluster, long beId) {
}

private void updateClusterToSecondaryBe(String cluster, long beId) {
secondaryClusterToBackends.put(cluster, Lists.newArrayList(beId));
long changeTimestamp = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp {}, replica info {}",
cluster, beId, changeTimestamp, this);
}
secondaryClusterToBackends.put(cluster, Pair.of(beId, changeTimestamp));
}

public void clearClusterToBe(String cluster) {
primaryClusterToBackends.remove(cluster);
secondaryClusterToBackends.remove(cluster);
}

// ATTN: This func is only used by redundant tablet report clean in bes.
// Only the master node will do the diff logic,
// so just only need to clean up secondaryClusterToBackends on the master node.
public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimestamp) {
Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId);
if (secondBeAndChangeTimestamp == null) {
return;
}
long beId = secondBeAndChangeTimestamp.key();
long changeTimestamp = secondBeAndChangeTimestamp.value();

if (changeTimestamp < expireTimestamp) {
LOG.debug("remove clusterId {} secondary beId {} changeTimestamp {} expireTimestamp {} replica info {}",
clusterId, beId, changeTimestamp, expireTimestamp, this);
secondaryClusterToBackends.remove(clusterId);
return;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ private boolean completeRouteInfo() {
for (Tablet tablet : index.getTablets()) {
for (Replica r : tablet.getReplicas()) {
CloudReplica replica = (CloudReplica) r;
// clean secondary map
replica.checkAndClearSecondaryClusterToBe(cluster, needRehashDeadTime);
InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster);
// colocate table no need to update primary backends
if (isColocated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,36 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
def choseDeadBeIndex = 1
def table = "test_clean_tablet_when_rebalance"

def testCase = { deadTime, mergedCacheDir ->
boolean beDeadLong = deadTime > rehashTime ? true : false
logger.info("begin exec beDeadLong {}", beDeadLong)

def selectTriggerRehash = { ->
for (int i = 0; i < 5; i++) {
sql """
select count(*) from $table
"""
}
}

def getTabletInHostFromBe = { bes ->
def ret = [:]
bes.each { be ->
// {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data
def tablets = data.tablets.collect { it.tablet_id as String }
tablets.each {
if (ret[it] != null) {
ret[it].add(data.host)
} else {
ret[it] = [data.host]
}
}
}
ret
}

def testCase = { deadTime, mergedCacheDir ->
boolean beDeadLong = deadTime > rehashTime ? true : false
logger.info("begin exec beDeadLong {}", beDeadLong)

selectTriggerRehash.call()

def beforeGetFromFe = getTabletAndBeHostFromFe(table)
def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
Expand All @@ -64,38 +85,65 @@ suite('test_clean_tablet_when_rebalance', 'docker') {

cluster.stopBackends(choseDeadBeIndex)
dockerAwaitUntil(50) {
def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
.collect { it.BackendId }
.unique()
logger.info("bes {}", bes)
def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}")
def bes = showTablets
.collect { it.BackendId }
.unique()
logger.info("before start bes {}, tablets {}", bes, showTablets)
bes.size() == 2
}
// rehash
selectTriggerRehash.call()
// curl be, tablets in 2 bes

if (beDeadLong) {
setFeConfig('enable_cloud_partition_balance', false)
setFeConfig('enable_cloud_table_balance', false)
setFeConfig('enable_cloud_global_balance', false)
}
sleep(deadTime * 1000)

// wait report logic
sleep(deadTime * 1000)
cluster.startBackends(choseDeadBeIndex)
def afterGetFromFe = getTabletAndBeHostFromFe(table)
def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
logger.info("after stop one be, rehash fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe)

dockerAwaitUntil(50) {
def bes = sql_return_maparray("SHOW TABLETS FROM ${table}")
def showTablets = sql_return_maparray("SHOW TABLETS FROM ${table}")
def bes = showTablets
.collect { it.BackendId }
.unique()
logger.info("bes {}", bes)
logger.info("after start bes {}, tablets {}", bes, showTablets)
bes.size() == (beDeadLong ? 2 : 3)
}
for (int i = 0; i < 5; i++) {
sleep(2000)
sql """
select count(*) from $table
"""

selectTriggerRehash.call()
// wait report logic
// tablet report clean not work, before sleep, in fe secondary not been clear
afterGetFromFe = getTabletAndBeHostFromFe(table)
afterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends())
logger.info("before sleep rehash time, fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe)
def redundancyTablet = null
afterGetFromFe.each {
assertTrue(afterGetFromBe.containsKey(it.Key))
if (afterGetFromBe[it.Key].size() == 2) {
redundancyTablet = it.Key
logger.info("find tablet {} redundancy in {}", it.Key, afterGetFromBe[it.Key])
}
assertTrue(afterGetFromBe[it.Key].contains(it.Value[1]))
}
def afterGetFromFe = getTabletAndBeHostFromFe(table)
def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe)

sleep(rehashTime * 1000 + 10 * 1000)
// tablet report clean will work, after sleep, in fe secondary been clear

afterGetFromFe = getTabletAndBeHostFromFe(table)
afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
if (!beDeadLong) {
def checkAfterGetFromBe = getTabletInHostFromBe(cluster.getAllBackends())
assertEquals(1, checkAfterGetFromBe[redundancyTablet].size())
}
logger.info("after sleep rehash time, fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe)
afterGetFromFe.each {
assertTrue(afterGetFromBe.containsKey(it.Key))
assertEquals(afterGetFromBe[it.Key], it.Value[1])
Expand Down Expand Up @@ -163,7 +211,8 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
);
"""
sql """
insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3'),
(4, 4,'v4'), (5, 5, 'v5'), (6, 6, 'v6'), (100, 100, 'v100'), (7, 7, 'v7')
"""
def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2)
// 'rehash_tablet_after_be_dead_seconds=100'
Expand Down
Loading