Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMS-Refactor] make TableRuntime's fileds as StateField #1495

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,26 @@ public class TaskRuntime extends StatedPersistentBase {
private long tableId;
private String partition;
private OptimizingTaskId taskId;
@StatedPersistentBase.StateField
@StateField
private Status status = Status.PLANNED;
private final TaskStatusMachine statusMachine = new TaskStatusMachine();
@StatedPersistentBase.StateField
@StateField
private int retry = 0;
@StatedPersistentBase.StateField
@StateField
private long startTime = ArcticServiceConstants.INVALID_TIME;
@StatedPersistentBase.StateField
@StateField
private long endTime = ArcticServiceConstants.INVALID_TIME;
@StatedPersistentBase.StateField
@StateField
private long costTime = 0;
@StatedPersistentBase.StateField
@StateField
private OptimizingQueue.OptimizingThread optimizingThread;
@StatedPersistentBase.StateField
@StateField
private String failReason;
private TaskOwner owner;
private RewriteFilesInput input;
@StatedPersistentBase.StateField
@StateField
private RewriteFilesOutput output;
@StatedPersistentBase.StateField
@StateField
private MetricsSummary summary;
private Map<String, String> properties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import com.netease.arctic.server.optimizing.OptimizingStatus;
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;

public abstract class RuntimeHandlerChain {

private static final Logger LOG = LoggerFactory.getLogger(RuntimeHandlerChain.class);

private RuntimeHandlerChain next;

Expand All @@ -30,21 +34,21 @@ public final void startHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
}

public final void fireStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
handleStatusChanged(tableRuntime, originalStatus);
doSilently(() -> handleStatusChanged(tableRuntime, originalStatus));
if (next != null) {
next.fireStatusChanged(tableRuntime, originalStatus);
}
}

public final void fireConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
handleConfigChanged(tableRuntime, originalConfig);
doSilently(() -> handleConfigChanged(tableRuntime, originalConfig));
if (next != null) {
next.fireConfigChanged(tableRuntime, originalConfig);
}
}

public final void fireTableAdded(ArcticTable table, TableRuntime tableRuntime) {
handleTableAdded(table, tableRuntime);
doSilently(() -> handleTableAdded(table, tableRuntime));
if (next != null) {
next.fireTableAdded(table, tableRuntime);
}
Expand All @@ -54,14 +58,22 @@ public final void fireTableRemoved(TableRuntime tableRuntime) {
if (next != null) {
next.fireTableRemoved(tableRuntime);
}
handleTableRemoved(tableRuntime);
doSilently(() -> handleTableRemoved(tableRuntime));
}

public final void dispose() {
if (next != null) {
next.doDispose();
}
doDispose();
doSilently(this::doDispose);
}

private void doSilently(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
LOG.error("failed to handle, ignore and continue", t);
}
}

protected abstract void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,38 @@ public class TableRuntime extends StatedPersistentBase {
private final TableRuntimeHandler tableHandler;
private final ServerTableIdentifier tableIdentifier;
private final List<TaskRuntime.TaskQuota> taskQuotas = Collections.synchronizedList(new ArrayList<>());

// for unKeyedTable or base table
@StateField
private volatile long currentSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
@StateField
private volatile long lastOptimizedSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
@StateField
private volatile long lastOptimizedChangeSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
// for change table
@StateField
private volatile long currentChangeSnapshotId = ArcticServiceConstants.INVALID_SNAPSHOT_ID;
@StateField
private volatile OptimizingStatus optimizingStatus = OptimizingStatus.IDLE;
@StateField
private volatile long currentStatusStartTime = System.currentTimeMillis();
@StateField
private volatile long lastMajorOptimizingTime;
@StateField
private volatile long lastFullOptimizingTime;
@StateField
private volatile long lastMinorOptimizingTime;
@StateField
private volatile String optimizerGroup;
@StateField
private volatile OptimizingProcess optimizingProcess;
@StateField
private volatile TableConfiguration tableConfiguration;
@StateField
private volatile long processId;
@StateField
private volatile OptimizingEvaluator.PendingInput pendingInput;

private final ReentrantLock blockerLock = new ReentrantLock();

protected TableRuntime(
Expand Down