Skip to content

Commit

Permalink
[fix](stats) Fix creating too many tasks on new env (apache#27362)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kikyou1997 authored and gnehil committed Dec 4, 2023
1 parent 9a0ac8a commit 6f51d1d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -45,10 +46,6 @@ public class AnalysisJob {

protected List<ColStatsData> buf;

protected int totalTaskCount;

protected int queryFinishedTaskCount;

protected StmtExecutor stmtExecutor;

protected boolean killed;
Expand All @@ -63,10 +60,9 @@ public AnalysisJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask>
for (BaseAnalysisTask task : queryingTask) {
task.job = this;
}
this.queryingTask = new HashSet<>(queryingTask);
this.queryFinished = new HashSet<>();
this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask));
this.queryFinished = Collections.synchronizedSet(new HashSet<>());
this.buf = new ArrayList<>();
totalTaskCount = queryingTask.size();
start = System.currentTimeMillis();
this.jobInfo = jobInfo;
this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
Expand All @@ -86,12 +82,14 @@ public synchronized void rowCountDone(BaseAnalysisTask task) {
}

protected void markOneTaskDone() {
queryFinishedTaskCount += 1;
if (queryFinishedTaskCount == totalTaskCount) {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
deregisterJob();
if (queryingTask.isEmpty()) {
try {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
} finally {
deregisterJob();
}
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
writeBuf();
}
Expand Down Expand Up @@ -175,9 +173,12 @@ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exce
}

public void taskFailed(BaseAnalysisTask task, String reason) {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
deregisterJob();
try {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
} finally {
deregisterJob();
}
}

public void cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,4 +1072,8 @@ public void constructJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysis
public void removeJob(long id) {
idToAnalysisJob.remove(id);
}

public boolean hasUnFinished() {
return !analysisJobIdToTaskMap.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class StatisticConstants {

public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;

public static final int SUBMIT_JOB_LIMIT = 5;

static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon {

protected final AnalysisTaskExecutor analysisTaskExecutor;

protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;

public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
Expand All @@ -53,8 +54,8 @@ protected void runAfterCatalogReady() {
if (Env.isCheckpointThread()) {
return;
}

if (!analysisTaskExecutor.idle()) {
submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
LOG.info("Analyze tasks those submitted in last time is not finished, skip");
return;
}
Expand All @@ -71,7 +72,9 @@ protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
// No statistics need to be collected or updated
return;
}

if (submittedThisRound-- < 0) {
return;
}
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public void initTest(@Mocked AnalysisInfo jobInfo, @Mocked OlapAnalysisTask task
}

@Test
public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) {
public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo,
@Mocked OlapAnalysisTask olapAnalysisTask,
@Mocked OlapAnalysisTask olapAnalysisTask2) {
AtomicInteger writeBufInvokeTimes = new AtomicInteger();
new MockUp<AnalysisJob>() {
@Mock
Expand All @@ -63,9 +65,9 @@ public void deregisterJob() {
AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask));
job.queryingTask = new HashSet<>();
job.queryingTask.add(olapAnalysisTask);
job.queryingTask.add(olapAnalysisTask2);
job.queryFinished = new HashSet<>();
job.buf = new ArrayList<>();
job.totalTaskCount = 20;

// not all task finished nor cached limit exceed, shouldn't write
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
Expand Down Expand Up @@ -97,7 +99,6 @@ public void deregisterJob() {
job.queryingTask.add(olapAnalysisTask);
job.queryFinished = new HashSet<>();
job.buf = new ArrayList<>();
job.totalTaskCount = 1;

job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
// all task finished, should write and deregister this job
Expand Down Expand Up @@ -132,7 +133,6 @@ public void deregisterJob() {
for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) {
job.buf.add(colStatsData);
}
job.totalTaskCount = 100;

job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
// cache limit exceed, should write them
Expand Down

0 comments on commit 6f51d1d

Please sign in to comment.