Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -536,23 +537,68 @@ public void releaseClients() {
beToAddr = null;
}

private String getBackendEndpoint(long beId) {
if (beToAddr != null) {
TNetworkAddress addr = beToAddr.get(beId);
if (addr != null) {
String host = addr.getHostname();
if (host == null) {
host = "unknown";
}
return host + ":" + addr.getPort();
}
}
if (beToThriftAddress != null) {
String addr = beToThriftAddress.get(beId);
if (addr != null) {
return addr;
}
}
return "unknown";
}

private final void clearJobOnBEs() {
try {
initClients();
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
// Iterate with explicit iterator so we can remove invalidated clients during iteration.
Iterator<Map.Entry<Long, Client>> iter = beToClient.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, Client> entry = iter.next();
long beId = entry.getKey();
Client client = entry.getValue();
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
request.setJobId(jobId);
if (this.isEventDriven()) {
TWarmUpEventType event = getTWarmUpEventType();
if (event == null) {
throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent);
// If event type is unknown, skip this BE but continue others.
LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE {} ({})",
syncEvent, beId, getBackendEndpoint(beId));
continue;
}
request.setEvent(event);
}
LOG.info("send warm up request to BE {}. job_id={}, request_type=CLEAR_JOB",
entry.getKey(), jobId);
entry.getValue().warmUpTablets(request);
LOG.info("send warm up request to BE {} ({}). job_id={}, request_type=CLEAR_JOB",
beId, getBackendEndpoint(beId), jobId);
try {
client.warmUpTablets(request);
} catch (Exception e) {
// If RPC to this BE fails, invalidate this client and remove it from map,
// then continue to next BE so that one bad BE won't block others.
LOG.warn("send warm up request to BE {} ({}) failed: {}",
beId, getBackendEndpoint(beId), e.getMessage());
try {
TNetworkAddress addr = beToAddr == null ? null : beToAddr.get(beId);
if (addr != null) {
ClientPool.backendPool.invalidateObject(addr, client);
}
} catch (Exception ie) {
LOG.warn("invalidate client for BE {} failed: {}", beId, ie.getMessage());
}
// remove from local map so releaseClients won't try to return an invalidated client
iter.remove();
}
}
} catch (Exception e) {
LOG.warn("send warm up request failed. job_id={}, request_type=CLEAR_JOB, exception={}",
Expand Down Expand Up @@ -653,8 +699,8 @@ private void runEventDrivenJob() throws Exception {
throw new IllegalArgumentException("Unknown SyncEvent " + syncEvent);
}
request.setEvent(event);
LOG.debug("send warm up request to BE {}. job_id={}, event={}, request_type=SET_JOB(EVENT)",
entry.getKey(), jobId, syncEvent);
LOG.debug("send warm up request to BE {} ({}). job_id={}, event={}, request_type=SET_JOB(EVENT)",
entry.getKey(), getBackendEndpoint(entry.getKey()), jobId, syncEvent);
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
Expand Down Expand Up @@ -698,9 +744,10 @@ private void runRunningJob() throws Exception {
request.setJobId(jobId);
request.setBatchId(lastBatchId + 1);
request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id));
LOG.info("send warm up request to BE {}. job_id={}, batch_id={}"
LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}"
+ ", job_size={}, request_type=SET_JOB",
entry.getKey(), jobId, request.batch_id, request.job_metas.size());
entry.getKey(), getBackendEndpoint(entry.getKey()),
jobId, request.batch_id, request.job_metas.size());
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
Expand All @@ -715,8 +762,9 @@ private void runRunningJob() throws Exception {
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE);
LOG.info("send warm up request to BE {}. job_id={}, request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
entry.getKey(), jobId);
LOG.info("send warm up request to BE {} ({}). job_id={}"
+ ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
entry.getKey(), getBackendEndpoint(entry.getKey()), jobId);
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
Expand Down Expand Up @@ -754,9 +802,10 @@ private void runRunningJob() throws Exception {
if (!request.job_metas.isEmpty()) {
// check all batches is done or not
allBatchesDone = false;
LOG.info("send warm up request to BE {}. job_id={}, batch_id={}"
LOG.info("send warm up request to BE {} ({}). job_id={}, batch_id={}"
+ ", job_size={}, request_type=SET_BATCH",
entry.getKey(), jobId, request.batch_id, request.job_metas.size());
entry.getKey(), getBackendEndpoint(entry.getKey()),
jobId, request.batch_id, request.job_metas.size());
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
Expand Down
Loading