Skip to content

Commit

Permalink
[AMORO-3110] Avoid optimization status blocking in running (#3113)
Browse files Browse the repository at this point in the history
* [AMORO-3110] Avoid optimization status blocking in running

* format

* Ignore the exception when it occurs and do not perform a rollback
  • Loading branch information
rfyu authored Sep 14, 2024
1 parent 43ee3c9 commit 9877c20
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,6 @@ public void acceptResult(TaskRuntime taskRuntime) {
persistProcessCompleted(false);
}
}
} catch (Exception e) {
LOG.error("accept result error:", e);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.function.Consumer;
Expand All @@ -31,6 +33,8 @@

public abstract class PersistentBase {

private static final Logger LOG = LoggerFactory.getLogger(PersistentBase.class);

protected PersistentBase() {}

@VisibleForTesting
Expand Down Expand Up @@ -69,6 +73,18 @@ protected final <T> void doAs(Class<T> mapperClz, Consumer<T> consumer) {
}
}

protected final <T> void doAsIgnoreError(Class<T> mapperClz, Consumer<T> consumer) {
try (NestedSqlSession session = beginSession()) {
try {
T mapper = getMapper(session, mapperClz);
consumer.accept(mapper);
session.commit();
} catch (Throwable t) {
LOG.error("Ignore error in doAsIgnoreError", t);
}
}
}

protected final void doAsTransaction(Runnable... operations) {
try (NestedSqlSession session = beginSession()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ private boolean updateConfigInternal(Map<String, String> properties) {
}

public void addTaskQuota(TaskRuntime.TaskQuota taskQuota) {
doAs(OptimizingMapper.class, mapper -> mapper.insertTaskQuota(taskQuota));
doAsIgnoreError(OptimizingMapper.class, mapper -> mapper.insertTaskQuota(taskQuota));
taskQuotas.add(taskQuota);
long validTime = System.currentTimeMillis() - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
this.taskQuotas.removeIf(task -> task.checkExpired(validTime));
Expand Down

0 comments on commit 9877c20

Please sign in to comment.