From 2c5eb895a15a5869e6c883e2d406c15f8988a216 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Mon, 25 Mar 2024 12:27:47 +0800 Subject: [PATCH 1/4] decommission be check replica num --- .../org/apache/doris/alter/SystemHandler.java | 74 ++++++++++++++++++- ..._decommission_with_replica_num_fail.groovy | 59 +++++++++++++++ 2 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index e503e093787971..438d5b6bbb3050 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -33,24 +33,31 @@ import org.apache.doris.analysis.ModifyFrontendHostNameClause; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MysqlCompatibleDatabase; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang3.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; /* * SystemHandler is for @@ -235,7 +242,8 @@ public static List checkDecommission(List hostInfos) decommissionBackends.add(backend); } - // TODO(cmy): check if replication num can be met + checkDecommissionWithReplicaAllocation(decommissionBackends); + // TODO(cmy): check remaining space return decommissionBackends; @@ -258,12 +266,74 @@ public static List checkDecommissionByIds(List ids) decommissionBackends.add(backend); } - // TODO(cmy): check if replication num can be met + checkDecommissionWithReplicaAllocation(decommissionBackends); + // TODO(cmy): check remaining space return decommissionBackends; } + private static void checkDecommissionWithReplicaAllocation(List decommissionBackends) + throws DdlException { + if (Config.isCloudMode()) { + return; + } + + Map tagAvailBackendNums = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { + long beId = backend.getId(); + if (!backend.isScheduleAvailable() + || decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) { + continue; + } + + Tag tag = backend.getLocationTag(); + if (tag != null) { + tagAvailBackendNums.put(tag, tagAvailBackendNums.getOrDefault(tag, 0) + 1); + } + } + + Env env = Env.getCurrentEnv(); + List dbIds = env.getInternalCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + + if (db instanceof MysqlCompatibleDatabase) { + continue; + } + + for (Table table : db.getTables()) { + table.readLock(); + try { + if (!table.needSchedule()) { + continue; + } + + OlapTable tbl = (OlapTable) table; + for (Partition partition : tbl.getAllPartitions()) { + ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); + for (Map.Entry entry : replicaAlloc.getAllocMap().entrySet()) { + Tag tag = entry.getKey(); + int replicaNum = (int) entry.getValue(); + int backendNum = tagAvailBackendNums.get(tag); + if (replicaNum > backendNum) { + throw new DdlException("Partition " + partition.getName() + " of table " + db.getName() + + "." + tbl.getName() + " 's replication allocation " + + replicaAlloc + " > available backend num " + backendNum + + " with tag " + tag + ", need decrease the partition's replication num."); + } + } + } + } finally { + table.readUnlock(); + } + } + } + } + @Override public synchronized void cancel(CancelStmt stmt) throws DdlException { CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt; diff --git a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy new file mode 100644 index 00000000000000..892a0583c592b2 --- /dev/null +++ b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_decommission_with_replica_num_fail') { + if (isCloudMode()) { + return + } + + def tbl = 'test_decommission_with_replica_num_fail' + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive == "true" + def decommissioned = be.SystemDecommissioned == "true" + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + sql """ + CREATE TABLE ${tbl} + ( + k1 int, + k2 int + ) + DISTRIBUTED BY HASH(k1) BUCKETS 6 + PROPERTIES + ( + "replication_num" = "${replicaNum}" + ); + """ + try { + test { + sql "ALTER SYSTEM DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'" + exception "need decrease the partition's replication num" + } + } finally { + sql "CANCEL DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'" + } + sql "DROP TABLE IF EXISTS ${tbl} FORCE" +} From f61b351a94a5de32d7695631b676f801e0ccc1e2 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Mon, 25 Mar 2024 12:31:28 +0800 Subject: [PATCH 2/4] update --- .../src/main/java/org/apache/doris/alter/SystemHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 438d5b6bbb3050..c7050563cfa7fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -275,7 +275,7 @@ public static List checkDecommissionByIds(List ids) private static void checkDecommissionWithReplicaAllocation(List decommissionBackends) throws DdlException { - if (Config.isCloudMode()) { + if (Config.isCloudMode() || decommissionBackends.isEmpty()) { return; } From f3e5e291a0e2d717cbf110d139730b9098b4b362 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Mon, 25 Mar 2024 18:19:25 +0800 Subject: [PATCH 3/4] update --- .../main/java/org/apache/doris/alter/SystemHandler.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index c7050563cfa7fb..9fccf6f6c240d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -58,6 +58,8 @@ import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /* * SystemHandler is for @@ -279,6 +281,8 @@ private static void checkDecommissionWithReplicaAllocation(List decommi return; } + Set decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag()) + .collect(Collectors.toSet()); Map tagAvailBackendNums = Maps.newHashMap(); for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { long beId = backend.getId(); @@ -317,8 +321,11 @@ private static void checkDecommissionWithReplicaAllocation(List decommi ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); for (Map.Entry entry : replicaAlloc.getAllocMap().entrySet()) { Tag tag = entry.getKey(); + if (!decommissionTags.contains(tag)) { + continue; + } int replicaNum = (int) entry.getValue(); - int backendNum = tagAvailBackendNums.get(tag); + int backendNum = tagAvailBackendNums.getOrDefault(tag, 0); if (replicaNum > backendNum) { throw new DdlException("Partition " + partition.getName() + " of table " + db.getName() + "." + tbl.getName() + " 's replication allocation " From c9a682005e1c4377d06f85218cadf076825cf5af Mon Sep 17 00:00:00 2001 From: yujun777 Date: Wed, 27 Mar 2024 09:12:41 +0800 Subject: [PATCH 4/4] update --- .../org/apache/doris/alter/SystemHandler.java | 13 ++++++---- ..._decommission_with_replica_num_fail.groovy | 6 ++--- .../suites/node_p0/test_backend.groovy | 26 ++++++++++++------- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 9fccf6f6c240d3..57e00f5ab1468c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -42,6 +42,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.NetUtils; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.resource.Tag; @@ -277,7 +278,8 @@ public static List checkDecommissionByIds(List ids) private static void checkDecommissionWithReplicaAllocation(List decommissionBackends) throws DdlException { - if (Config.isCloudMode() || decommissionBackends.isEmpty()) { + if (Config.isCloudMode() || decommissionBackends.isEmpty() + || DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) { return; } @@ -327,10 +329,11 @@ private static void checkDecommissionWithReplicaAllocation(List decommi int replicaNum = (int) entry.getValue(); int backendNum = tagAvailBackendNums.getOrDefault(tag, 0); if (replicaNum > backendNum) { - throw new DdlException("Partition " + partition.getName() + " of table " + db.getName() - + "." + tbl.getName() + " 's replication allocation " - + replicaAlloc + " > available backend num " + backendNum - + " with tag " + tag + ", need decrease the partition's replication num."); + throw new DdlException("After decommission, partition " + partition.getName() + + " of table " + db.getName() + "." + tbl.getName() + + " 's replication allocation { " + replicaAlloc + + " } > available backend num " + backendNum + " on tag " + tag + + ", otherwise need to decrease the partition's replication num."); } } } diff --git a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy index 892a0583c592b2..ff19adae27dad2 100644 --- a/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy +++ b/regression-test/suites/alter_p0/test_decommission_with_replica_num_fail.groovy @@ -25,8 +25,8 @@ suite('test_decommission_with_replica_num_fail') { def replicaNum = 0 def targetBackend = null for (def be : backends) { - def alive = be.Alive == "true" - def decommissioned = be.SystemDecommissioned == "true" + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() if (alive && !decommissioned) { replicaNum++ targetBackend = be @@ -50,7 +50,7 @@ suite('test_decommission_with_replica_num_fail') { try { test { sql "ALTER SYSTEM DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'" - exception "need decrease the partition's replication num" + exception "otherwise need to decrease the partition's replication num" } } finally { sql "CANCEL DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'" diff --git a/regression-test/suites/node_p0/test_backend.groovy b/regression-test/suites/node_p0/test_backend.groovy index 1fe6f802e907bf..cce111b0a19076 100644 --- a/regression-test/suites/node_p0/test_backend.groovy +++ b/regression-test/suites/node_p0/test_backend.groovy @@ -41,11 +41,12 @@ suite("test_backend", "nonConcurrent") { } if (context.config.jdbcUser.equals("root")) { + def beId1 = null try { + GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num"); try_sql """admin set frontend config("drop_backend_after_decommission" = "false")""" def result = sql_return_maparray """SHOW BACKENDS;""" logger.info("show backends result:${result}") - def beId1 = null for (def res : result) { beId1 = res.BackendId break @@ -58,16 +59,23 @@ suite("test_backend", "nonConcurrent") { assertTrue(res.SystemDecommissioned.toBoolean()) } } - result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """ - logger.info("CANCEL DECOMMISSION BACKEND ${result}") - result = sql_return_maparray """SHOW BACKENDS;""" - for (def res : result) { - if (res.BackendId == "${beId1}") { - assertFalse(res.SystemDecommissioned.toBoolean()) + } finally { + try { + if (beId1 != null) { + def result = sql """CANCEL DECOMMISSION BACKEND "${beId1}" """ + logger.info("CANCEL DECOMMISSION BACKEND ${result}") + + result = sql_return_maparray """SHOW BACKENDS;""" + for (def res : result) { + if (res.BackendId == "${beId1}") { + assertFalse(res.SystemDecommissioned.toBoolean()) + } + } } + } finally { + GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num'); + try_sql """admin set frontend config("drop_backend_after_decommission" = "true")""" } - } finally { - try_sql """admin set frontend config("drop_backend_after_decommission" = "true")""" } } }