Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -52,7 +53,7 @@ public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize)
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
new BlockedPolicy("Analysis Job Executor Block Policy", Integer.MAX_VALUE),
"Analysis Job Executor", true);
cancelExpiredTask();
} else {
Expand Down Expand Up @@ -88,9 +89,9 @@ protected void tryToCancel() {
}
}

public void submitTask(BaseAnalysisTask task) {
public Future<?> submitTask(BaseAnalysisTask task) {
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
executors.submit(taskWrapper);
return executors.submit(taskWrapper);
}

public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -134,7 +136,15 @@ protected void processOneJob(TableIf table, Set<Pair<String, String>> columns,
}
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority);
LOG.debug("Auto analyze job : {}", analyzeJob.toString());
executeSystemAnalysisJob(analyzeJob);
try {
executeSystemAnalysisJob(analyzeJob);
} catch (Exception e) {
StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
for (Pair<String, String> pair : columns) {
stringJoiner.add(pair.toString());
}
LOG.warn("Fail to auto analyze table {}, columns [{}]", table.getName(), stringJoiner.toString());
}
}

protected void appendPartitionColumns(TableIf table, Set<Pair<String, String>> columns) throws DdlException {
Expand Down Expand Up @@ -205,7 +215,7 @@ protected AnalysisInfo createAnalyzeJobForTbl(
// Analysis job created by the system
@VisibleForTesting
protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
throws DdlException {
throws DdlException, ExecutionException, InterruptedException {
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
Expand All @@ -215,7 +225,14 @@ protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
}
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks);
analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
Future<?>[] futures = new Future[analysisTasks.values().size()];
int i = 0;
for (BaseAnalysisTask task : analysisTasks.values()) {
futures[i++] = analysisTaskExecutor.submitTask(task);
}
for (Future future : futures) {
future.get();
}
}

protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) {
Expand Down