diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java index 6a99b10f82e862..407375af43b70b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -536,23 +537,68 @@ public void releaseClients() { beToAddr = null; } + private String getBackendEndpoint(long beId) { + if (beToAddr != null) { + TNetworkAddress addr = beToAddr.get(beId); + if (addr != null) { + String host = addr.getHostname(); + if (host == null) { + host = "unknown"; + } + return host + ":" + addr.getPort(); + } + } + if (beToThriftAddress != null) { + String addr = beToThriftAddress.get(beId); + if (addr != null) { + return addr; + } + } + return "unknown"; + } + private final void clearJobOnBEs() { try { initClients(); - for (Map.Entry entry : beToClient.entrySet()) { + // Iterate with explicit iterator so we can remove invalidated clients during iteration. + Iterator> iter = beToClient.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + long beId = entry.getKey(); + Client client = entry.getValue(); TWarmUpTabletsRequest request = new TWarmUpTabletsRequest(); request.setType(TWarmUpTabletsRequestType.CLEAR_JOB); request.setJobId(jobId); if (this.isEventDriven()) { TWarmUpEventType event = getTWarmUpEventType(); if (event == null) { - throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent); + // If event type is unknown, skip this BE but continue others. + LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE {} ({})", + syncEvent, beId, getBackendEndpoint(beId)); + continue; } request.setEvent(event); } - LOG.info("send warm up request to BE {}. job_id={}, request_type=CLEAR_JOB", - entry.getKey(), jobId); - entry.getValue().warmUpTablets(request); + LOG.info("send warm up request to BE {} ({}). job_id={}, request_type=CLEAR_JOB", + beId, getBackendEndpoint(beId), jobId); + try { + client.warmUpTablets(request); + } catch (Exception e) { + // If RPC to this BE fails, invalidate this client and remove it from map, + // then continue to next BE so that one bad BE won't block others. + LOG.warn("send warm up request to BE {} ({}) failed: {}", + beId, getBackendEndpoint(beId), e.getMessage()); + try { + TNetworkAddress addr = beToAddr == null ? null : beToAddr.get(beId); + if (addr != null) { + ClientPool.backendPool.invalidateObject(addr, client); + } + } catch (Exception ie) { + LOG.warn("invalidate client for BE {} failed: {}", beId, ie.getMessage()); + } + // remove from local map so releaseClients won't try to return an invalidated client + iter.remove(); + } } } catch (Exception e) { LOG.warn("send warm up request failed. job_id={}, request_type=CLEAR_JOB, exception={}", @@ -653,8 +699,8 @@ private void runEventDrivenJob() throws Exception { throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent); } request.setEvent(event); - LOG.debug("send warm up request to BE {}. job_id={}, event={}, request_type=SET_JOB(EVENT)", - entry.getKey(), jobId, syncEvent); + LOG.debug("send warm up request to BE {} ({}). job_id={}, event={}, request_type=SET_JOB(EVENT)", + entry.getKey(), getBackendEndpoint(entry.getKey()), jobId, syncEvent); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { @@ -698,9 +744,10 @@ private void runRunningJob() throws Exception { request.setJobId(jobId); request.setBatchId(lastBatchId + 1); request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id)); - LOG.info("send warm up request to BE {}. job_id={}, batch_id={}" + LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}" + ", job_size={}, request_type=SET_JOB", - entry.getKey(), jobId, request.batch_id, request.job_metas.size()); + entry.getKey(), getBackendEndpoint(entry.getKey()), + jobId, request.batch_id, request.job_metas.size()); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { @@ -715,8 +762,9 @@ private void runRunningJob() throws Exception { for (Map.Entry entry : beToClient.entrySet()) { TWarmUpTabletsRequest request = new TWarmUpTabletsRequest(); request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE); - LOG.info("send warm up request to BE {}. job_id={}, request_type=GET_CURRENT_JOB_STATE_AND_LEASE", - entry.getKey(), jobId); + LOG.info("send warm up request to BE {} ({}). job_id={}" + + ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE", + entry.getKey(), getBackendEndpoint(entry.getKey()), jobId); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) { @@ -754,9 +802,10 @@ private void runRunningJob() throws Exception { if (!request.job_metas.isEmpty()) { // check all batches is done or not allBatchesDone = false; - LOG.info("send warm up request to BE {}. job_id={}, batch_id={}" + LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}" + ", job_size={}, request_type=SET_BATCH", - entry.getKey(), jobId, request.batch_id, request.job_metas.size()); + entry.getKey(), getBackendEndpoint(entry.getKey()), + jobId, request.batch_id, request.job_metas.size()); TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request); if (response.getStatus().getStatusCode() != TStatusCode.OK) { if (!response.getStatus().getErrorMsgs().isEmpty()) {