From 235a08f12fda3257ce4cb530fe2b1ace1b3f2cd4 Mon Sep 17 00:00:00 2001 From: liulijia Date: Thu, 16 Jul 2020 13:07:09 +0800 Subject: [PATCH 1/2] Delete bad replicas when no BE can be used to create new replica. for #4104 --- .../org/apache/doris/catalog/OlapTable.java | 4 +- .../java/org/apache/doris/catalog/Tablet.java | 45 ++++++++++++------- .../org/apache/doris/clone/TabletChecker.java | 4 +- .../apache/doris/clone/TabletSchedCtx.java | 4 +- .../apache/doris/clone/TabletScheduler.java | 4 +- .../doris/common/proc/StatisticProcDir.java | 18 ++++---- .../apache/doris/master/ReportHandler.java | 4 +- 7 files changed, 47 insertions(+), 36 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index f8f442337ee4cf..10603eb673d30f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1229,7 +1229,7 @@ public void checkStableAndNormal(String clusterName) throws DdlException { } public boolean isStable(SystemInfoService infoService, TabletScheduler tabletScheduler, String clusterName) { - int availableBackendsNum = infoService.getClusterBackendIds(clusterName, true).size(); + List aliveBeIdsInCluster = infoService.getClusterBackendIds(clusterName, true); for (Partition partition : idToPartition.values()) { long visibleVersion = partition.getVisibleVersion(); long visibleVersionHash = partition.getVisibleVersionHash(); @@ -1242,7 +1242,7 @@ public boolean isStable(SystemInfoService infoService, TabletScheduler tabletSch Pair statusPair = tablet.getHealthStatusWithPriority( infoService, clusterName, visibleVersion, visibleVersionHash, replicationNum, - availableBackendsNum); + aliveBeIdsInCluster); if (statusPair.first != TabletStatus.HEALTHY) { LOG.info("table {} is not stable because tablet {} status is {}. replicas: {}", id, tablet.getId(), statusPair.first, tablet.getReplicas()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index d8eb69144e7760..a96fac080d7459 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -17,6 +17,11 @@ package org.apache.doris.catalog; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.clone.TabletSchedCtx.Priority; @@ -25,13 +30,6 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * This class represents the olap tablet related metadata. @@ -407,7 +406,7 @@ public long getDataSize(boolean singleReplica) { public Pair getHealthStatusWithPriority( SystemInfoService systemInfoService, String clusterName, long visibleVersion, long visibleVersionHash, int replicationNum, - int availableBackendsNum) { + List aliveBeIdsInCluster) { int alive = 0; int aliveAndVersionComplete = 0; @@ -453,15 +452,16 @@ public Pair getHealthStatusWithPriority( } // 1. alive replicas are not enough - if (alive < replicationNum && replicas.size() >= availableBackendsNum - && availableBackendsNum >= replicationNum && replicationNum > 1) { + int aliveBackendsNum = aliveBeIdsInCluster.size(); + if (alive < replicationNum && replicas.size() >= aliveBackendsNum + && aliveBackendsNum >= replicationNum && replicationNum > 1) { // there is no enough backend for us to create a new replica, so we have to delete an existing replica, // so there can be available backend for us to create a new replica. // And if there is only one replica, we will not handle it(maybe need human interference) // condition explain: // 1. alive < replicationNum: replica is missing or bad - // 2. replicas.size() >= availableBackendsNum: the existing replicas occupies all available backends - // 3. availableBackendsNum >= replicationNum: make sure after deleting, there will be at least one backend for new replica. + // 2. replicas.size() >= aliveBackendsNum: the existing replicas occupies all available backends + // 3. aliveBackendsNum >= replicationNum: make sure after deleting, there will be at least one backend for new replica. // 4. replicationNum > 1: if replication num is set to 1, do not delete any replica, for safety reason return Pair.create(TabletStatus.FORCE_REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH); } else if (alive < (replicationNum / 2) + 1) { @@ -484,10 +484,23 @@ public Pair getHealthStatusWithPriority( } // 3. replica is under relocating - if (stable < (replicationNum / 2) + 1) { - return Pair.create(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.NORMAL); - } else if (stable < replicationNum) { - return Pair.create(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.LOW); + if (stable < replicationNum) { + List replicaBeIds = replicas.stream() + .map(Replica::getBackendId).collect(Collectors.toList()); + List availableBeIds = aliveBeIdsInCluster.stream() + .filter(systemInfoService::checkBackendAvailable) + .collect(Collectors.toList()); + if (replicaBeIds.containsAll(availableBeIds) + && availableBeIds.size() >= replicationNum + && replicationNum > 1) { // No BE can be choose to create a new replica + return Pair.create(TabletStatus.FORCE_REDUNDANT, + stable < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.NORMAL : TabletSchedCtx.Priority.LOW); + } + if (stable < (replicationNum / 2) + 1) { + return Pair.create(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.NORMAL); + } else if (stable < replicationNum) { + return Pair.create(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.LOW); + } } // 4. healthy replicas in cluster are not enough diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index 850fe04f8faccd..952f5648f3aa09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -206,7 +206,7 @@ private void checkTablets() { db.readLock(); try { - int availableBackendsNum = infoService.getClusterBackendIds(db.getClusterName(), true).size(); + List aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true); for (Table table : db.getTables()) { if (!table.needSchedule()) { continue; @@ -239,7 +239,7 @@ private void checkTablets() { partition.getVisibleVersion(), partition.getVisibleVersionHash(), olapTbl.getPartitionInfo().getReplicationNum(partition.getId()), - availableBackendsNum); + aliveBeIdsInCluster); if (statusWithPrio.first == TabletStatus.HEALTHY) { // Only set last status check time when status is healthy. diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 059306fe5589d6..98e1dab33f587a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -799,11 +799,11 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) throw new SchedException(Status.UNRECOVERABLE, "tablet does not exist"); } - int availableBackendsNum = infoService.getClusterBackendIds(db.getClusterName(), true).size(); + List aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true); short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partitionId); Pair pair = tablet.getHealthStatusWithPriority( infoService, db.getClusterName(), visibleVersion, visibleVersionHash, replicationNum, - availableBackendsNum); + aliveBeIdsInCluster); if (pair.first == TabletStatus.HEALTHY) { throw new SchedException(Status.FINISHED, "tablet is healthy"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 4ecfe56cfd4cf9..83fa29c44408f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -495,13 +495,13 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) statusPair = Pair.create(st, Priority.HIGH); tabletCtx.setColocateGroupBackendIds(backendsSet); } else { - int availableBackendsNum = infoService.getClusterBackendIds(db.getClusterName(), true).size(); + List aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true); statusPair = tablet.getHealthStatusWithPriority( infoService, tabletCtx.getCluster(), partition.getVisibleVersion(), partition.getVisibleVersionHash(), tbl.getPartitionInfo().getReplicationNum(partition.getId()), - availableBackendsNum); + aliveBeIdsInCluster); } if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java index e75a3fee69aca1..29f48c7dbe4f5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java @@ -17,6 +17,10 @@ package org.apache.doris.common.proc; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -34,19 +38,13 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.thrift.TTaskType; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Multimap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - public class StatisticProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("DbId").add("DbName").add("TableNum").add("PartitionNum") @@ -108,7 +106,7 @@ public ProcResult fetchResult() throws AnalysisException { } ++totalDbNum; - int availableBackendsNum = infoService.getClusterBackendIds(db.getClusterName(), true).size(); + List aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true); db.readLock(); try { int dbTableNum = 0; @@ -137,7 +135,7 @@ public ProcResult fetchResult() throws AnalysisException { Pair res = tablet.getHealthStatusWithPriority( infoService, db.getClusterName(), partition.getVisibleVersion(), partition.getVisibleVersionHash(), - replicationNum, availableBackendsNum); + replicationNum, aliveBeIdsInCluster); // here we treat REDUNDANT as HEALTHY, for user friendly. if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index c8c1d187e61a9f..8d4081260206d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -1027,10 +1027,10 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon return; } - int availableBackendsNum = infoService.getClusterBackendIds(db.getClusterName(), true).size(); + List aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true); Pair status = tablet.getHealthStatusWithPriority(infoService, db.getClusterName(), visibleVersion, visibleVersionHash, - replicationNum, availableBackendsNum); + replicationNum, aliveBeIdsInCluster); if (status.first == TabletStatus.VERSION_INCOMPLETE || status.first == TabletStatus.REPLICA_MISSING) { long lastFailedVersion = -1L; From 755717df4f24e472e2ff643e40fc8e97d94549ef Mon Sep 17 00:00:00 2001 From: liulijia Date: Tue, 21 Jul 2020 17:28:30 +0800 Subject: [PATCH 2/2] Refactor import order --- .../main/java/org/apache/doris/catalog/Tablet.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index a96fac080d7459..87e9309d985ca7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -17,11 +17,6 @@ package org.apache.doris.catalog; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.clone.TabletSchedCtx.Priority; @@ -30,6 +25,13 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger;