From 292111f6e728ff9559fce39145b029c357358890 Mon Sep 17 00:00:00 2001 From: rfyu <39233058+rfyu@users.noreply.github.com> Date: Sat, 14 Sep 2024 16:42:54 +0800 Subject: [PATCH] [AMORO-3111]Avoid optimization state blocking in planning (#3115) * Avoid optimization state blocking in planning * modify the optimizing status in memory * fix conflict and rollback configuration --- .../server/optimizing/OptimizingQueue.java | 3 +++ .../amoro/server/table/TableRuntime.java | 22 +++++++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 47a3f401e8..bd1e5023a0 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -225,6 +225,9 @@ private void triggerAsyncPlanning( CompletableFuture.supplyAsync(() -> planInternal(tableRuntime), planExecutor) .whenComplete( (process, throwable) -> { + if (throwable != null) { + LOG.error("Failed to plan table {}", tableRuntime.getTableIdentifier(), throwable); + } long currentTime = System.currentTimeMillis(); scheduleLock.lock(); try { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java index e70adfa80b..475ab28f1d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -188,13 +188,21 @@ public void beginPlanning() { } public void planFailed() { - invokeConsistency( - () -> { - OptimizingStatus originalStatus = optimizingStatus; - updateOptimizingStatus(OptimizingStatus.PENDING); - persistUpdatingRuntime(); - tableHandler.handleTableChanged(this, originalStatus); - }); + try { + invokeConsistency( + () -> { + OptimizingStatus originalStatus = optimizingStatus; + updateOptimizingStatus(OptimizingStatus.PENDING); + persistUpdatingRuntime(); + tableHandler.handleTableChanged(this, originalStatus); + }); + } catch (Exception e) { + OptimizingStatus originalStatus = optimizingStatus; + updateOptimizingStatus(OptimizingStatus.PENDING); + LOG.warn( + "Persistent database failed, only the optimizing state in the memory was changed.", e); + tableHandler.handleTableChanged(this, originalStatus); + } } public void beginProcess(OptimizingProcess optimizingProcess) {