Skip to content

Commit

Permalink
[AMS-Refactor] make TableRuntime's fileds as StateField (#1495)
Browse files Browse the repository at this point in the history
* add StatedPersistentBase.StateField for fields of TableRuntime and tableHandler should not throw exception

* replace @StatedPersistentBase.StateField with @StateField
  • Loading branch information
wangtaohz authored May 31, 2023
1 parent 309c639 commit 2cbe12e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,26 @@ public class TaskRuntime extends StatedPersistentBase {
private long tableId;
private String partition;
private OptimizingTaskId taskId;
@StatedPersistentBase.StateField
@StateField
private Status status = Status.PLANNED;
private final TaskStatusMachine statusMachine = new TaskStatusMachine();
@StatedPersistentBase.StateField
@StateField
private int retry = 0;
@StatedPersistentBase.StateField
@StateField
private long startTime = ArcticServiceConstants.INVALID_TIME;
@StatedPersistentBase.StateField
@StateField
private long endTime = ArcticServiceConstants.INVALID_TIME;
@StatedPersistentBase.StateField
@StateField
private long costTime = 0;
@StatedPersistentBase.StateField
@StateField
private OptimizingQueue.OptimizingThread optimizingThread;
@StatedPersistentBase.StateField
@StateField
private String failReason;
private TaskOwner owner;
private RewriteFilesInput input;
@StatedPersistentBase.StateField
@StateField
private RewriteFilesOutput output;
@StatedPersistentBase.StateField
@StateField
private MetricsSummary summary;
private Map<String, String> properties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import com.netease.arctic.server.optimizing.OptimizingStatus;
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;

public abstract class RuntimeHandlerChain {

private static final Logger LOG = LoggerFactory.getLogger(RuntimeHandlerChain.class);

private RuntimeHandlerChain next;

Expand All @@ -30,21 +34,21 @@ public final void startHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
}

public final void fireStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
handleStatusChanged(tableRuntime, originalStatus);
doSilently(() -> handleStatusChanged(tableRuntime, originalStatus));
if (next != null) {
next.fireStatusChanged(tableRuntime, originalStatus);
}
}

public final void fireConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
handleConfigChanged(tableRuntime, originalConfig);
doSilently(() -> handleConfigChanged(tableRuntime, originalConfig));
if (next != null) {
next.fireConfigChanged(tableRuntime, originalConfig);
}
}

public final void fireTableAdded(ArcticTable table, TableRuntime tableRuntime) {
handleTableAdded(table, tableRuntime);
doSilently(() -> handleTableAdded(table, tableRuntime));
if (next != null) {
next.fireTableAdded(table, tableRuntime);
}
Expand All @@ -54,14 +58,22 @@ public final void fireTableRemoved(TableRuntime tableRuntime) {
if (next != null) {
next.fireTableRemoved(tableRuntime);
}
handleTableRemoved(tableRuntime);
doSilently(() -> handleTableRemoved(tableRuntime));
}

public final void dispose() {
if (next != null) {
next.doDispose();
}
doDispose();
doSilently(this::doDispose);
}

private void doSilently(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
LOG.error("failed to handle, ignore and continue", t);
}
}

protected abstract void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,38 @@ public class TableRuntime extends StatedPersistentBase {
private final TableRuntimeHandler tableHandler;
private final ServerTableIdentifier tableIdentifier;
private final List<TaskRuntime.TaskQuota> taskQuotas = Collections.synchronizedList(new ArrayList<>());

// for unKeyedTable or base table
@StateField
private volatile long currentSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
@StateField
private volatile long lastOptimizedSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
@StateField
private volatile long lastOptimizedChangeSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
// for change table
@StateField
private volatile long currentChangeSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
@StateField
private volatile OptimizingStatus optimizingStatus = OptimizingStatus.IDLE;
@StateField
private volatile long currentStatusStartTime = System.currentTimeMillis();
@StateField
private volatile long lastMajorOptimizingTime;
@StateField
private volatile long lastFullOptimizingTime;
@StateField
private volatile long lastMinorOptimizingTime;
@StateField
private volatile String optimizerGroup;
@StateField
private volatile OptimizingProcess optimizingProcess;
@StateField
private volatile TableConfiguration tableConfiguration;
@StateField
private volatile long processId;
@StateField
private volatile OptimizingEvaluator.PendingInput pendingInput;

private final ReentrantLock blockerLock = new ReentrantLock();

protected TableRuntime(
Expand Down

0 comments on commit 2cbe12e

Please sign in to comment.