Skip to content

Commit

Permalink
[Fix](cloud) Fix cluster status inconsistent with bes and add config …
Browse files Browse the repository at this point in the history
…disable auto
  • Loading branch information
deardeng committed Sep 13, 2024
1 parent 4466541 commit 6fb7c29
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3042,6 +3042,15 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"})
public static boolean enable_immediate_be_assign = true;

@ConfField(mutable = true, description = {"存算分离模式下是否启用自动启停功能,默认true",
"Whether to enable the automatic start-stop feature in cloud model, default is true."})
public static boolean enable_auto_start_for_cloud_cluster = true;

@ConfField(mutable = true, description = {"存算分离模式下自动启停等待cluster唤醒退避重试次数,默认300次大约5分钟",
"The automatic start-stop wait time for cluster wake-up backoff retry count in the cloud "
+ "model is set to 300 times, which is approximately 5 minutes by default."})
public static int auto_start_wait_to_resume_times = 300;

// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ private void checkDiffNode(Map<String, ClusterPB> remoteClusterIdToPB,
if (LOG.isDebugEnabled()) {
LOG.debug("current cluster status {} {}", currentClusterStatus, newClusterStatus);
}
if (!currentClusterStatus.equals(newClusterStatus)) {
boolean needChange = false;
Set<String> clusterStatusInMem = cloudSystemInfoService.getClusterStatus(currentBes);
if (clusterStatusInMem.size() != 1) {
LOG.warn("cluster {}, multi be nodes cluster status inconsistent, fix it {}", cid, clusterStatusInMem);
needChange = true;
}
if (!currentClusterStatus.equals(newClusterStatus) || needChange) {
// cluster's status changed
LOG.info("cluster_status corresponding to cluster_id has been changed,"
+ " cluster_id : {} , current_cluster_status : {}, new_cluster_status :{}",
Expand Down Expand Up @@ -426,8 +432,8 @@ private void checkCloudFes() {
}
return nodeMap;
});
LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}",
expectedFes, currentFes, toAdd, toDel);
LOG.info("diffFrontends nodes: {}, current: {}, toAdd: {}, toDel: {}, enable auto start: {}",
expectedFes, currentFes, toAdd, toDel, Config.enable_auto_start_for_cloud_cluster);
if (toAdd.isEmpty() && toDel.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("runAfterCatalogReady getObserverFes nothing todo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -567,10 +568,18 @@ public String getCloudStatusById(final String clusterId) {
}
}

public Set<String> getClusterStatus(List<Backend> backends) {
// ATTN: found bug, In the same cluster, the cluster status in the tags of BE nodes is inconsistent.
// Using a set to collect the cluster statuses from the BE nodes.
return backends.stream().map(Backend::getCloudClusterStatus).collect(Collectors.toSet());
}

public String getCloudStatusByIdNoLock(final String clusterId) {
return clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>())
.stream().map(Backend::getCloudClusterStatus).findFirst()
.orElse(String.valueOf(Cloud.ClusterStatus.UNKNOWN));
List<Backend> bes = clusterIdToBackend.getOrDefault(clusterId, new ArrayList<>());
Optional<String> hasNormal = bes.stream().map(Backend::getCloudClusterStatus)
.filter(status -> status.equals(String.valueOf(Cloud.ClusterStatus.NORMAL))).findAny();
return hasNormal.orElseGet(() -> bes.stream().map(Backend::getCloudClusterStatus).findFirst()
.orElse(String.valueOf(Cloud.ClusterStatus.UNKNOWN)));
}

public void updateClusterNameToId(final String newName,
Expand Down Expand Up @@ -949,6 +958,9 @@ public String waitForAutoStart(String clusterName) throws DdlException {
if (Config.isNotCloudMode()) {
return null;
}
if (!Config.enable_auto_start_for_cloud_cluster) {
return null;
}
clusterName = getClusterNameAutoStart(clusterName);
if (Strings.isNullOrEmpty(clusterName)) {
LOG.warn("auto start in cloud mode, but clusterName empty {}", clusterName);
Expand Down Expand Up @@ -999,7 +1011,7 @@ public String waitForAutoStart(String clusterName) throws DdlException {
}
}
// wait 5 mins
int retryTimes = 5 * 60;
int retryTimes = Config.auto_start_wait_to_resume_times < 0 ? 300 : Config.auto_start_wait_to_resume_times;
int retryTime = 0;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public void setMaster(int clusterId, String token, long epoch) {
// Set cloud_instance_id and meta_service_endpoint even if there are empty
// Be can knowns that fe is working in cloud mode.
// Set the cloud instance ID for cloud deployment identification
tMasterInfo.setCloudInstanceId(Config.cloud_instance_id);
if (!Strings.isNullOrEmpty(Config.cloud_instance_id)) {
tMasterInfo.setCloudInstanceId(Config.cloud_instance_id);
}
// Set the endpoint for the metadata service in cloud mode
tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.awaitility.Awaitility;
import org.apache.doris.regression.util.Http
import static java.util.concurrent.TimeUnit.SECONDS;

suite('test_auto_start_in_cloud', 'multi_cluster') {
suite('test_auto_start_in_cloud', 'multi_cluster, docker') {
if (!isCloudMode()) {
return;
}
Expand Down Expand Up @@ -168,5 +168,29 @@ suite('test_auto_start_in_cloud', 'multi_cluster') {

future1.get()
future2.get()

tag = getCloudBeTagByName(clusterName)
logger.info("tag check = {}", tag)
jsonObject = jsonSlurper.parseText(tag)
String cluster_status = jsonObject.cloud_cluster_status
assertEquals("NORMAL", cluster_status)

// add 1 nodes, check it status NORMAL
cluster.addBackend(1, null)
dockerAwaitUntil(5) {
def result = sql """SHOW BACKENDS"""
result.size() == 4
}

def bes = sql_return_maparray "SHOW BACKENDS"
bes.each {
tag = it.Tag
if (!tag.contains(clusterName)) {
return
}
jsonObject = jsonSlurper.parseText(tag)
String cluster_status = jsonObject.cloud_cluster_status
assertEquals("NORMAL", cluster_status)
}
}
}

0 comments on commit 6fb7c29

Please sign in to comment.