Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,34 @@
import org.apache.doris.analysis.ModifyFrontendHostNameClause;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MysqlCompatibleDatabase;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/*
* SystemHandler is for
Expand Down Expand Up @@ -220,12 +230,81 @@ public static List<Backend> checkDecommission(List<HostInfo> hostInfos)
decommissionBackends.add(backend);
}

// TODO(cmy): check if replication num can be met
checkDecommissionWithReplicaAllocation(decommissionBackends);

// TODO(cmy): check remaining space

return decommissionBackends;
}

private static void checkDecommissionWithReplicaAllocation(List<Backend> decommissionBackends)
throws DdlException {
if (decommissionBackends.isEmpty()
|| DebugPointUtil.isEnable("SystemHandler.decommission_no_check_replica_num")) {
return;
}

Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag())
.collect(Collectors.toSet());
Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap();
for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) {
long beId = backend.getId();
if (!backend.isScheduleAvailable()
|| decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) {
continue;
}

Tag tag = backend.getLocationTag();
if (tag != null) {
tagAvailBackendNums.put(tag, tagAvailBackendNums.getOrDefault(tag, 0) + 1);
}
}

Env env = Env.getCurrentEnv();
List<Long> dbIds = env.getInternalCatalog().getDbIds();
for (Long dbId : dbIds) {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}

if (db instanceof MysqlCompatibleDatabase) {
continue;
}

for (Table table : db.getTables()) {
table.readLock();
try {
if (!table.isManagedTable()) {
continue;
}

OlapTable tbl = (OlapTable) table;
for (Partition partition : tbl.getAllPartitions()) {
ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
Tag tag = entry.getKey();
if (!decommissionTags.contains(tag)) {
continue;
}
int replicaNum = (int) entry.getValue();
int backendNum = tagAvailBackendNums.getOrDefault(tag, 0);
if (replicaNum > backendNum) {
throw new DdlException("After decommission, partition " + partition.getName()
+ " of table " + db.getFullName() + "." + tbl.getName()
+ " 's replication allocation { " + replicaAlloc
+ " } > available backend num " + backendNum + " on tag " + tag
+ ", otherwise need to decrease the partition's replication num.");
}
}
}
} finally {
table.readUnlock();
}
}
}
}

@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
CancelAlterSystemStmt cancelAlterSystemStmt = (CancelAlterSystemStmt) stmt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ class Suite implements GroovyInterceptable {
assert p.exitValue() == 0
}

List<String> getFrontendIpHttpPort() {
return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort };
}

void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP, Map<String, String> backendId_to_backendHttpPort) {
List<List<Object>> backends = sql("show backends");
Expand Down
2 changes: 2 additions & 0 deletions regression-test/pipeline/p0/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ enable_map_type=true
enable_struct_type=true
enable_feature_binlog=true

enable_debug_points=true

# enable mtmv
enable_mtmv = true

Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/p1/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ enable_set_in_bitmap_value=true
enable_feature_binlog=true
max_sys_mem_available_low_water_mark_bytes=69206016
enable_merge_on_write_correctness_check=false
enable_debug_points=true
2 changes: 2 additions & 0 deletions regression-test/pipeline/p1/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ remote_fragment_exec_timeout_ms=60000
fuzzy_test_type=p1
use_fuzzy_session_variable=true

enable_debug_points=true

# enable mtmv
enable_mtmv = true

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_decommission_with_replica_num_fail') {
def tbl = 'test_decommission_with_replica_num_fail'
def backends = sql_return_maparray('show backends')
def replicaNum = 0
def targetBackend = null
for (def be : backends) {
def alive = be.Alive.toBoolean()
def decommissioned = be.SystemDecommissioned.toBoolean()
if (alive && !decommissioned) {
replicaNum++
targetBackend = be
}
}
assertTrue(replicaNum > 0)

sql "DROP TABLE IF EXISTS ${tbl} FORCE"
sql """
CREATE TABLE ${tbl}
(
k1 int,
k2 int
)
DISTRIBUTED BY HASH(k1) BUCKETS 6
PROPERTIES
(
"replication_num" = "${replicaNum}"
);
"""
try {
test {
sql "ALTER SYSTEM DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
exception "otherwise need to decrease the partition's replication num"
}
} finally {
sql "CANCEL DECOMMISSION BACKEND '${targetBackend.Host}:${targetBackend.HeartbeatPort}'"
}
sql "DROP TABLE IF EXISTS ${tbl} FORCE"
}
40 changes: 40 additions & 0 deletions regression-test/suites/node_p0/test_backend.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,44 @@ suite("test_backend") {
result = sql """SHOW BACKENDS;"""
logger.info("result:${result}")
}

if (context.config.jdbcUser.equals("root")) {
def decommissionBe = null
try {
GetDebugPoint().enableDebugPointForAllFEs("SystemHandler.decommission_no_check_replica_num");
try_sql """admin set frontend config("drop_backend_after_decommission" = "false")"""
def result = sql_return_maparray """SHOW BACKENDS;"""
logger.info("show backends result:${result}")
for (def res : result) {
decommissionBe = res
break
}
sql """CANCEL DECOMMISSION BACKEND "${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
result = sql """ALTER SYSTEM DECOMMISSION BACKEND "${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
logger.info("ALTER SYSTEM DECOMMISSION BACKEND ${result}")
result = sql_return_maparray """SHOW BACKENDS;"""
for (def res : result) {
if (res.BackendId == "${decommissionBe.BackendId}") {
assertTrue(res.SystemDecommissioned.toBoolean())
}
}
} finally {
try {
if (decommissionBe != null) {
def result = sql """CANCEL DECOMMISSION BACKEND "${decommissionBe.Host}:${decommissionBe.HeartbeatPort}" """
logger.info("CANCEL DECOMMISSION BACKEND ${result}")

result = sql_return_maparray """SHOW BACKENDS;"""
for (def res : result) {
if (res.BackendId == "${decommissionBe.BackendId}") {
assertFalse(res.SystemDecommissioned.toBoolean())
}
}
}
} finally {
GetDebugPoint().disableDebugPointForAllFEs('SystemHandler.decommission_no_check_replica_num');
try_sql """admin set frontend config("drop_backend_after_decommission" = "true")"""
}
}
}
}