Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
Expand All @@ -55,6 +56,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -418,11 +420,22 @@ public void checkInflghtWarmUpCacheAsync() {
for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
if (DebugPointUtil.isEnable("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull")) {
LOG.info("debug point CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull, be {}", destBackend);
destBackend = null;
}
if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) {
List<InfightTablet> toRemove = new LinkedList<>();
for (InfightTask task : entry.getValue()) {
for (InfightTablet key : tabletToInfightTask.keySet()) {
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
toRemove.add(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
}
}
for (InfightTablet key : toRemove) {
if (LOG.isDebugEnabled()) {
LOG.debug("remove tablet {}-{}", key.getClusterId(), key.getTabletId());
}
tabletToInfightTask.remove(key);
}
continue;
}
Expand All @@ -447,6 +460,9 @@ public void checkInflghtWarmUpCacheAsync() {
LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey());
}
updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
if (LOG.isDebugEnabled()) {
LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId());
}
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId));
}
}
Expand Down
292 changes: 174 additions & 118 deletions regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,142 +21,198 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
if (!isCloudMode()) {
return;
}
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'enable_cloud_warm_up_for_rebalance=false',
'cloud_tablet_rebalancer_interval_second=1',
'cloud_balance_tablet_percent_per_run=0.5',
'cloud_pre_heating_time_limit_sec=1',
'sys_log_verbose_modules=org',

def clusterOptions = [
new ClusterOptions(),
new ClusterOptions(),
]
options.setFeNum(3)
options.setBeNum(1)
options.cloudMode = true
options.connectToFollower = true
options.enableDebugPoints()

docker(options) {
sql """
CREATE TABLE table100 (
class INT,
id INT,
score INT SUM
)
AGGREGATE KEY(class, id)
DISTRIBUTED BY HASH(class) BUCKETS 48
"""

sql """
CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
AGGREGATE KEY(k1, k2)
PARTITION BY RANGE(k1) (
PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
PARTITION p1993 VALUES [("19930101"), ("19940101")),
PARTITION p1994 VALUES [("19940101"), ("19950101")),
PARTITION p1995 VALUES [("19950101"), ("19960101")),
PARTITION p1996 VALUES [("19960101"), ("19970101")),
PARTITION p1997 VALUES [("19970101"), ("19980101")),
PARTITION p1998 VALUES [("19980101"), ("19990101")))
DISTRIBUTED BY HASH(k1) BUCKETS 3
"""
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
sql """set global forward_to_master=false"""

// add a be
cluster.addBackend(1, null)

dockerAwaitUntil(30) {
def bes = sql """show backends"""
log.info("bes: {}", bes)
bes.size() == 2
}
for (options in clusterOptions) {
options.setFeNum(3)
options.setBeNum(1)
options.cloudMode = true
options.connectToFollower = true
options.enableDebugPoints()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_tablet_rebalancer_interval_second=1',
'cloud_balance_tablet_percent_per_run=0.5',

dockerAwaitUntil(5) {
def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
log.info("replica distribution table100: {}", ret)
ret.size() == 2
}
'sys_log_verbose_modules=org',
]
}
clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true', 'cloud_pre_heating_time_limit_sec=300']
clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']

def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """
assertEquals(2, result.size())
int replicaNum = 0

for (def row : result) {
log.info("replica distribution: ${row} ".toString())
replicaNum = Integer.valueOf((String) row.ReplicaNum)
if (replicaNum == 0) {
// due to debug point, observer not hash replica
} else {
assertTrue(replicaNum <= 25 && replicaNum >= 23)

for (int i = 0; i < clusterOptions.size(); i++) {
log.info("begin warm up {}", i == 0 ? "ON" : "OFF")
docker(clusterOptions[i]) {
sql """
CREATE TABLE table100 (
class INT,
id INT,
score INT SUM
)
AGGREGATE KEY(class, id)
DISTRIBUTED BY HASH(class) BUCKETS 48
"""

sql """
CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL )
AGGREGATE KEY(k1, k2)
PARTITION BY RANGE(k1) (
PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
PARTITION p1993 VALUES [("19930101"), ("19940101")),
PARTITION p1994 VALUES [("19940101"), ("19950101")),
PARTITION p1995 VALUES [("19950101"), ("19960101")),
PARTITION p1996 VALUES [("19960101"), ("19970101")),
PARTITION p1997 VALUES [("19970101"), ("19980101")),
PARTITION p1998 VALUES [("19980101"), ("19990101")))
DISTRIBUTED BY HASH(k1) BUCKETS 3
"""
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
sql """set global forward_to_master=false"""

// add a be
cluster.addBackend(1, null)

dockerAwaitUntil(30) {
def bes = sql """show backends"""
log.info("bes: {}", bes)
bes.size() == 2
}
}

dockerAwaitUntil(5) {
def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992)"""
log.info("replica distribution table_p2: {}", ret)
ret.size() == 2
}
dockerAwaitUntil(5) {
def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
log.info("replica distribution table100: {}", ret)
ret.size() == 2
}

def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """
assertEquals(2, result.size())
int replicaNum = 0

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
for (def row : result) {
log.info("replica distribution: ${row} ".toString())
replicaNum = Integer.valueOf((String) row.ReplicaNum)
if (replicaNum == 0) {
// due to debug point, observer not hash replica
} else {
assertTrue(replicaNum <= 25 && replicaNum >= 23)
}
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1993) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
dockerAwaitUntil(5) {
def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992)"""
log.info("replica distribution table_p2: {}", ret)
ret.size() == 2
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1994) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
}
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1995) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1993) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
}
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1996) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1994) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
}
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1997) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1995) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1996) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
}
}

result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1997) """
assertEquals(2, result.size())
for (def row : result) {
replicaNum = Integer.valueOf((String) row.ReplicaNum)
log.info("replica distribution: ${row} ".toString())
if (replicaNum != 0) {
assertTrue(replicaNum <= 2 && replicaNum >= 1)
}
}

if (i == 1) {
// just test warm up
return
}

GetDebugPoint().enableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull");
// add a be
cluster.addBackend(1, null)
// warm up
sql """admin set frontend config("enable_cloud_warm_up_for_rebalance"="true")"""

// test rebalance thread still work
dockerAwaitUntil(30) {
def bes = sql """show backends"""
log.info("bes: {}", bes)
bes.size() == 3
}

dockerAwaitUntil(5) {
def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
log.info("replica distribution table100: {}", ret)
ret.size() == 3
}

result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """
assertEquals(3, result.size())
log.info("replica distribution: ${result} ".toString())

// test 10s not balance, due to debug point
for (int j = 0; j < 10; j++) {
assertTrue(result.any { row ->
Integer.valueOf((String) row.ReplicaNum) == 0
})
sleep(1 * 1000)
}
GetDebugPoint().disableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull");
dockerAwaitUntil(10) {
def ret = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
log.info("replica distribution table100: {}", ret)
ret.any { row ->
Integer.valueOf((String) row.ReplicaNum) == 16
}
}
}
logger.info("Successfully run {} times", i + 1)
}
}
Loading