Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Kikyou1997 committed Nov 21, 2023
1 parent d255757 commit a82f59c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -47,8 +48,6 @@ public class AnalysisJob {

protected int totalTaskCount;

protected int queryFinishedTaskCount;

protected StmtExecutor stmtExecutor;

protected boolean killed;
Expand All @@ -63,8 +62,8 @@ public AnalysisJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask>
for (BaseAnalysisTask task : queryingTask) {
task.job = this;
}
this.queryingTask = new HashSet<>(queryingTask);
this.queryFinished = new HashSet<>();
this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask));
this.queryFinished = Collections.synchronizedSet(new HashSet<>());
this.buf = new ArrayList<>();
totalTaskCount = queryingTask.size();
start = System.currentTimeMillis();
Expand All @@ -86,12 +85,14 @@ public synchronized void rowCountDone(BaseAnalysisTask task) {
}

protected void markOneTaskDone() {
queryFinishedTaskCount += 1;
if (queryFinishedTaskCount == totalTaskCount) {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
deregisterJob();
if (queryingTask.isEmpty()) {
try {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
} finally {
deregisterJob();
}
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
writeBuf();
}
Expand Down Expand Up @@ -175,9 +176,12 @@ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exce
}

public void taskFailed(BaseAnalysisTask task, String reason) {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
deregisterJob();
try {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
} finally {
deregisterJob();
}
}

public void cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,4 +1073,8 @@ public void constructJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysis
public void removeJob(long id) {
idToAnalysisJob.remove(id);
}

public boolean hasUnFinished() {
return !analysisJobIdToTaskMap.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class StatisticConstants {

public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;

public static final int SUBMIT_JOB_LIMIT = 5;

static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {

public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
TimeUnit.SECONDS.toMillis(Config.full_auto_analyze_simultaneously_running_task_num),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon {

protected final AnalysisTaskExecutor analysisTaskExecutor;

protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;

public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
Expand All @@ -54,8 +55,8 @@ protected void runAfterCatalogReady() {
if (Env.isCheckpointThread()) {
return;
}

if (!analysisTaskExecutor.idle()) {
submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
LOG.info("Analyze tasks those submitted in last time is not finished, skip");
return;
}
Expand All @@ -72,7 +73,9 @@ protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
// No statistics need to be collected or updated
return;
}

if (submittedThisRound-- < 0) {
return;
}
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
Expand Down

0 comments on commit a82f59c

Please sign in to comment.