Skip to content

Commit

Permalink
[Fix](group commit) Fix multiple cluster group commit BE select strat…
Browse files Browse the repository at this point in the history
…egy (apache#38644)
  • Loading branch information
Yukang-Lian authored Aug 4, 2024
1 parent 69857b0 commit ef4fba6
Showing 1 changed file with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public class GroupCommitManager {

private Set<Long> blockedTableIds = new HashSet<>();

// Table id to BE id map. Only for group commit.
private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
// BE id to pressure map. Only for group commit.
private Map<Long, SlidingWindowCounter> tablePressureMap = new ConcurrentHashMap<>();
// Encoded <Cluster and Table id> to BE id map. Only for group commit.
private final Map<String, Long> tableToBeMap = new ConcurrentHashMap<>();
// Table id to pressure map. Only for group commit.
private final Map<Long, SlidingWindowCounter> tableToPressureMap = new ConcurrentHashMap<>();

public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
Expand Down Expand Up @@ -243,13 +243,13 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo

private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster)
throws DdlException, LoadException {
LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(),
tablePressureMap.toString());
LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}",
tableToBeMap.toString(), tableToPressureMap.toString());
if (Strings.isNullOrEmpty(cluster)) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
}

Long cachedBackendId = getCachedBackend(tableId);
Long cachedBackendId = getCachedBackend(cluster, tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
Expand All @@ -261,7 +261,7 @@ private long selectBackendForCloudGroupCommitInternal(long tableId, String clust
throw new LoadException("No alive backend");
}
// If the cached backend is not active or decommissioned, select a random new backend.
Long randomBackendId = getRandomBackend(tableId, backends);
Long randomBackendId = getRandomBackend(cluster, tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
Expand All @@ -274,8 +274,8 @@ private long selectBackendForCloudGroupCommitInternal(long tableId, String clust

private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException {
LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(),
tablePressureMap.toString());
Long cachedBackendId = getCachedBackend(tableId);
tableToPressureMap.toString());
Long cachedBackendId = getCachedBackend(null, tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
Expand All @@ -293,7 +293,7 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE
}

// If the cached backend is not active or decommissioned, select a random new backend.
Long randomBackendId = getRandomBackend(tableId, backends);
Long randomBackendId = getRandomBackend(null, tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
Expand All @@ -305,38 +305,56 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE
}

@Nullable
private Long getCachedBackend(long tableId) {
private Long getCachedBackend(String cluster, long tableId) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
if (tableToBeMap.containsKey(tableId)) {
if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
if (tableToBeMap.containsKey(encode(cluster, tableId))) {
if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
// There are multiple threads getting cached backends for the same table.
// Maybe one thread removes the tableId from the tableToBeMap.
// Another thread gets the same tableId but can not find this tableId.
// So another thread needs to get the random backend.
Long backendId = tableToBeMap.get(encode(cluster, tableId));
Backend backend;
if (backendId != null) {
backend = Env.getCurrentSystemInfo().getBackend(backendId);
} else {
return null;
}
if (backend.isActive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
tableToBeMap.remove(tableId);
tableToBeMap.remove(encode(cluster, tableId));
}
} else {
tableToBeMap.remove(tableId);
tableToBeMap.remove(encode(cluster, tableId));
}
}
return null;
}

@Nullable
private Long getRandomBackend(long tableId, List<Backend> backends) {
private Long getRandomBackend(String cluster, long tableId, List<Backend> backends) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isActive() && !backend.isDecommissioned()) {
tableToBeMap.put(tableId, backend.getId());
tablePressureMap.put(tableId,
tableToBeMap.put(encode(cluster, tableId), backend.getId());
tableToPressureMap.put(tableId,
new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
}
return null;
}

private String encode(String cluster, long tableId) {
if (cluster == null) {
return String.valueOf(tableId);
} else {
return cluster + tableId;
}
}

public void updateLoadData(long tableId, long receiveData) {
if (tableId == -1) {
LOG.warn("invalid table id: " + tableId);
Expand All @@ -359,10 +377,10 @@ public void updateLoadData(long tableId, long receiveData) {
}

public void updateLoadDataInternal(long tableId, long receiveData) {
if (tablePressureMap.containsKey(tableId)) {
tablePressureMap.get(tableId).add(receiveData);
if (tableToPressureMap.containsKey(tableId)) {
tableToPressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData,
tablePressureMap.toString());
tableToPressureMap.toString());
} else {
LOG.warn("can not find backend id: {}", tableId);
}
Expand Down

0 comments on commit ef4fba6

Please sign in to comment.