Skip to content

Commit

Permalink
[ARCTIC-1245][AMS] If the Optimize_task is in the commit process, the…
Browse files Browse the repository at this point in the history
… MySQL connection is disconnected, let it retry next time (#1249)

* fix close_wait problem in limit query

* Retry when commit fails due to MySQL disconnection

---------

Co-authored-by: Chao He <hechao@rd.netease.com>
  • Loading branch information
2 people authored and baiyangtx committed Mar 22, 2023
1 parent 26c9d41 commit afbd1c0
Showing 1 changed file with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public class TableOptimizeItem extends IJDBCService {
private volatile double quotaCache;
private volatile String groupNameCache;

private BasicOptimizeCommit optimizeCommit;
private long commitTime;
private static final long INIT_COMMIT_TIME = 0L;

/**
* -1: not initialized
* 0: not committed
Expand Down Expand Up @@ -209,7 +213,7 @@ public void tryTriggerCommit() {
if (waitCommit.get()) {
return;
}
if (!allTasksPrepared()) {
if (!allTasksPrepared() && !isRetryCommit()) {
return;
}
boolean success = ServiceContainer.getOptimizeService().triggerOptimizeCommit(this);
Expand Down Expand Up @@ -863,7 +867,7 @@ private void optimizeTasksCommitted(BasicOptimizeCommit optimizeCommit,

sqlSession.commit(true);
}

this.commitTime = INIT_COMMIT_TIME;
updateTableOptimizeStatus();
}

Expand Down Expand Up @@ -1028,11 +1032,14 @@ public void commitOptimizeTasks() throws Exception {
}

try {
// If it is a retry task, do optimizeTasksCommitted()
if (isRetryCommit()) {
optimizeTasksCommitted(optimizeCommit, commitTime);
}
Map<String, List<OptimizeTaskItem>> tasksToCommit = getOptimizeTasksToCommit();
long taskCount = tasksToCommit.values().stream().mapToLong(Collection::size).sum();
if (MapUtils.isNotEmpty(tasksToCommit)) {
LOG.info("{} get {} tasks of {} partitions to commit", tableIdentifier, taskCount, tasksToCommit.size());
BasicOptimizeCommit optimizeCommit;
if (com.netease.arctic.utils.TableTypeUtil.isIcebergTableFormat(getArcticTable())) {
optimizeCommit = new IcebergOptimizeCommit(getArcticTable(true), tasksToCommit);
} else if (TableTypeUtil.isHive(getArcticTable())) {
Expand All @@ -1044,7 +1051,7 @@ public void commitOptimizeTasks() throws Exception {

boolean committed = optimizeCommit.commit(tableOptimizeRuntime.getCurrentSnapshotId());
if (committed) {
long commitTime = System.currentTimeMillis();
commitTime = System.currentTimeMillis();
optimizeTasksCommitted(optimizeCommit, commitTime);
} else {
LOG.warn("{} commit failed, clear optimize tasks", tableIdentifier);
Expand All @@ -1058,6 +1065,14 @@ public void commitOptimizeTasks() throws Exception {
}
}

/**
* Check if the task currently being submitted needs to be retried
* @return boolean
*/
private boolean isRetryCommit() {
return commitTime != INIT_COMMIT_TIME;
}

/**
* Get all optimize tasks.
*
Expand Down

0 comments on commit afbd1c0

Please sign in to comment.