From ef4fba649cb563efe64874addcbac11204925cfc Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Sun, 4 Aug 2024 18:33:55 +0800 Subject: [PATCH] [Fix](group commit) Fix multiple cluster group commit BE select strategy (#38644) --- .../apache/doris/load/GroupCommitManager.java | 64 ++++++++++++------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 20f7b9ed9be9b8..1009c4257b85f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -58,10 +58,10 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); - // Table id to BE id map. Only for group commit. - private Map tableToBeMap = new ConcurrentHashMap<>(); - // BE id to pressure map. Only for group commit. - private Map tablePressureMap = new ConcurrentHashMap<>(); + // Encoded to BE id map. Only for group commit. + private final Map tableToBeMap = new ConcurrentHashMap<>(); + // Table id to pressure map. Only for group commit. + private final Map tableToPressureMap = new ConcurrentHashMap<>(); public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); @@ -243,13 +243,13 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster) throws DdlException, LoadException { - LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); + LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", + tableToBeMap.toString(), tableToPressureMap.toString()); if (Strings.isNullOrEmpty(cluster)) { ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); } - Long cachedBackendId = getCachedBackend(tableId); + Long cachedBackendId = getCachedBackend(cluster, tableId); if (cachedBackendId != null) { return cachedBackendId; } @@ -261,7 +261,7 @@ private long selectBackendForCloudGroupCommitInternal(long tableId, String clust throw new LoadException("No alive backend"); } // If the cached backend is not active or decommissioned, select a random new backend. - Long randomBackendId = getRandomBackend(tableId, backends); + Long randomBackendId = getRandomBackend(cluster, tableId, backends); if (randomBackendId != null) { return randomBackendId; } @@ -274,8 +274,8 @@ private long selectBackendForCloudGroupCommitInternal(long tableId, String clust private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); - Long cachedBackendId = getCachedBackend(tableId); + tableToPressureMap.toString()); + Long cachedBackendId = getCachedBackend(null, tableId); if (cachedBackendId != null) { return cachedBackendId; } @@ -293,7 +293,7 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } // If the cached backend is not active or decommissioned, select a random new backend. - Long randomBackendId = getRandomBackend(tableId, backends); + Long randomBackendId = getRandomBackend(null, tableId, backends); if (randomBackendId != null) { return randomBackendId; } @@ -305,31 +305,41 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } @Nullable - private Long getCachedBackend(long tableId) { + private Long getCachedBackend(String cluster, long tableId) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); - if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { - Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); + if (tableToBeMap.containsKey(encode(cluster, tableId))) { + if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + // There are multiple threads getting cached backends for the same table. + // Maybe one thread removes the tableId from the tableToBeMap. + // Another thread gets the same tableId but can not find this tableId. + // So another thread needs to get the random backend. + Long backendId = tableToBeMap.get(encode(cluster, tableId)); + Backend backend; + if (backendId != null) { + backend = Env.getCurrentSystemInfo().getBackend(backendId); + } else { + return null; + } if (backend.isActive() && !backend.isDecommissioned()) { return backend.getId(); } else { - tableToBeMap.remove(tableId); + tableToBeMap.remove(encode(cluster, tableId)); } } else { - tableToBeMap.remove(tableId); + tableToBeMap.remove(encode(cluster, tableId)); } } return null; } @Nullable - private Long getRandomBackend(long tableId, List backends) { + private Long getRandomBackend(String cluster, long tableId, List backends) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); Collections.shuffle(backends); for (Backend backend : backends) { if (backend.isActive() && !backend.isDecommissioned()) { - tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, + tableToBeMap.put(encode(cluster, tableId), backend.getId()); + tableToPressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); } @@ -337,6 +347,14 @@ private Long getRandomBackend(long tableId, List backends) { return null; } + private String encode(String cluster, long tableId) { + if (cluster == null) { + return String.valueOf(tableId); + } else { + return cluster + tableId; + } + } + public void updateLoadData(long tableId, long receiveData) { if (tableId == -1) { LOG.warn("invalid table id: " + tableId); @@ -359,10 +377,10 @@ public void updateLoadData(long tableId, long receiveData) { } public void updateLoadDataInternal(long tableId, long receiveData) { - if (tablePressureMap.containsKey(tableId)) { - tablePressureMap.get(tableId).add(receiveData); + if (tableToPressureMap.containsKey(tableId)) { + tableToPressureMap.get(tableId).add(receiveData); LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, - tablePressureMap.toString()); + tableToPressureMap.toString()); } else { LOG.warn("can not find backend id: {}", tableId); }