diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java index f522e7185a..3c38fc52a7 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java @@ -43,26 +43,26 @@ public class TaskRuntime extends StatedPersistentBase { private long tableId; private String partition; private OptimizingTaskId taskId; - @StatedPersistentBase.StateField + @StateField private Status status = Status.PLANNED; private final TaskStatusMachine statusMachine = new TaskStatusMachine(); - @StatedPersistentBase.StateField + @StateField private int retry = 0; - @StatedPersistentBase.StateField + @StateField private long startTime = ArcticServiceConstants.INVALID_TIME; - @StatedPersistentBase.StateField + @StateField private long endTime = ArcticServiceConstants.INVALID_TIME; - @StatedPersistentBase.StateField + @StateField private long costTime = 0; - @StatedPersistentBase.StateField + @StateField private OptimizingQueue.OptimizingThread optimizingThread; - @StatedPersistentBase.StateField + @StateField private String failReason; private TaskOwner owner; private RewriteFilesInput input; - @StatedPersistentBase.StateField + @StateField private RewriteFilesOutput output; - @StatedPersistentBase.StateField + @StateField private MetricsSummary summary; private Map properties; diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/RuntimeHandlerChain.java b/ams/server/src/main/java/com/netease/arctic/server/table/RuntimeHandlerChain.java index 61c62574f5..bffad88587 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/RuntimeHandlerChain.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/RuntimeHandlerChain.java @@ -3,11 +3,15 @@ import com.netease.arctic.server.optimizing.OptimizingStatus; import com.netease.arctic.table.ArcticTable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; public abstract class RuntimeHandlerChain { + + private static final Logger LOG = LoggerFactory.getLogger(RuntimeHandlerChain.class); private RuntimeHandlerChain next; @@ -30,21 +34,21 @@ public final void startHandler(List tableRuntimeMetaList) { } public final void fireStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { - handleStatusChanged(tableRuntime, originalStatus); + doSilently(() -> handleStatusChanged(tableRuntime, originalStatus)); if (next != null) { next.fireStatusChanged(tableRuntime, originalStatus); } } public final void fireConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - handleConfigChanged(tableRuntime, originalConfig); + doSilently(() -> handleConfigChanged(tableRuntime, originalConfig)); if (next != null) { next.fireConfigChanged(tableRuntime, originalConfig); } } public final void fireTableAdded(ArcticTable table, TableRuntime tableRuntime) { - handleTableAdded(table, tableRuntime); + doSilently(() -> handleTableAdded(table, tableRuntime)); if (next != null) { next.fireTableAdded(table, tableRuntime); } @@ -54,14 +58,22 @@ public final void fireTableRemoved(TableRuntime tableRuntime) { if (next != null) { next.fireTableRemoved(tableRuntime); } - handleTableRemoved(tableRuntime); + doSilently(() -> handleTableRemoved(tableRuntime)); } public final void dispose() { if (next != null) { next.doDispose(); } - doDispose(); + doSilently(this::doDispose); + } + + private void doSilently(Runnable runnable) { + try { + runnable.run(); + } catch (Throwable t) { + LOG.error("failed to handle, ignore and continue", t); + } } protected abstract void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus); diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/TableRuntime.java b/ams/server/src/main/java/com/netease/arctic/server/table/TableRuntime.java index b7dcd02de3..312dfae523 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/TableRuntime.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/TableRuntime.java @@ -62,22 +62,38 @@ public class TableRuntime extends StatedPersistentBase { private final TableRuntimeHandler tableHandler; private final ServerTableIdentifier tableIdentifier; private final List taskQuotas = Collections.synchronizedList(new ArrayList<>()); + // for unKeyedTable or base table + @StateField private volatile long currentSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID; + @StateField private volatile long lastOptimizedSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID; + @StateField private volatile long lastOptimizedChangeSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID; // for change table + @StateField private volatile long currentChangeSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID; + @StateField private volatile OptimizingStatus optimizingStatus = OptimizingStatus.IDLE; + @StateField private volatile long currentStatusStartTime = System.currentTimeMillis(); + @StateField private volatile long lastMajorOptimizingTime; + @StateField private volatile long lastFullOptimizingTime; + @StateField private volatile long lastMinorOptimizingTime; + @StateField private volatile String optimizerGroup; + @StateField private volatile OptimizingProcess optimizingProcess; + @StateField private volatile TableConfiguration tableConfiguration; + @StateField private volatile long processId; + @StateField private volatile OptimizingEvaluator.PendingInput pendingInput; + private final ReentrantLock blockerLock = new ReentrantLock(); protected TableRuntime(