Skip to content

Commit

Permalink
[minor](stats) Throw error when sync analyze failed (apache#27846)
Browse files Browse the repository at this point in the history
pick from master apache#27845
  • Loading branch information
Kikyou1997 authored and gnehil committed Dec 4, 2023
1 parent eb96303 commit 827407b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected void writeBuf() {
if (killed) {
return;
}
// buf could be empty when nothing need to do, for example user submit an analysis task for table with no data
// buf could be empty when nothing need to do,r for example user submit an analysis task for table with no data
// change
if (!buf.isEmpty()) {
String insertStmt = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES ";
Expand All @@ -128,28 +128,17 @@ protected void writeBuf() {
values.add(data.toSQL(true));
}
insertStmt += values.toString();
int retryTimes = 0;
while (retryTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
if (killed) {
return;
}
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
executeWithExceptionOnFail(stmtExecutor);
break;
} catch (Exception t) {
LOG.warn("Failed to write buf: " + insertStmt, t);
retryTimes++;
if (retryTimes >= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
updateTaskState(AnalysisState.FAILED, t.getMessage());
return;
}
}
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
executeWithExceptionOnFail(stmtExecutor);
} catch (Exception t) {
throw new RuntimeException("Failed to analyze: " + t.getMessage());
}
}
updateTaskState(AnalysisState.FINISHED, "");
syncLoadStats();
queryFinished.clear();
buf.clear();
}

protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected void prepareExecution() {

protected void executeWithRetry() {
int retriedTimes = 0;
while (retriedTimes <= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
while (retriedTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
if (killed) {
break;
}
Expand All @@ -193,7 +193,7 @@ protected void executeWithRetry() {
throw new RuntimeException(t);
}
LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t);
if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
if (retriedTimes >= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
job.taskFailed(this, t.getMessage());
throw new RuntimeException(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,20 @@ protected void runAfterCatalogReady() {
}

public synchronized void clear() {
if (!init()) {
return;
try {
if (!init()) {
return;
}
clearStats(colStatsTbl);
clearStats(histStatsTbl);
} finally {
colStatsTbl = null;
histStatsTbl = null;
idToCatalog = null;
idToDb = null;
idToTbl = null;
idToMVIdx = null;
}
clearStats(colStatsTbl);
clearStats(histStatsTbl);
}

private void clearStats(OlapTable statsTbl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void updateTaskState(AnalysisState state, String msg) {

@Mock
protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
throw new RuntimeException();
// DO NOTHING
}

@Mock
Expand All @@ -218,7 +218,7 @@ protected void syncLoadStats() {
job.queryFinished = new HashSet<>();
job.queryFinished.add(task2);
job.writeBuf();
Assertions.assertEquals(1, job.queryFinished.size());
Assertions.assertEquals(0, job.queryFinished.size());
}

}

0 comments on commit 827407b

Please sign in to comment.