diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/TableConfiguration.java b/ams/server/src/main/java/com/netease/arctic/server/table/TableConfiguration.java index 122fa22f39..92b9fe6bf0 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/TableConfiguration.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/TableConfiguration.java @@ -15,6 +15,7 @@ public class TableConfiguration { private long changeDataTTLMinutes; private boolean cleanOrphanEnabled; private long orphanExistingMinutes; + private boolean deleteDanglingDeleteFilesEnabled; private OptimizingConfig optimizingConfig; public TableConfiguration() { @@ -74,6 +75,15 @@ public TableConfiguration setOrphanExistingMinutes(long orphanExistingMinutes) { return this; } + public boolean isDeleteDanglingDeleteFilesEnabled() { + return deleteDanglingDeleteFilesEnabled; + } + + public TableConfiguration setDeleteDanglingDeleteFilesEnabled(boolean deleteDanglingDeleteFilesEnabled) { + this.deleteDanglingDeleteFilesEnabled = deleteDanglingDeleteFilesEnabled; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -85,7 +95,15 @@ public boolean equals(Object o) { TableConfiguration that = (TableConfiguration) o; return expireSnapshotEnabled == that.expireSnapshotEnabled && snapshotTTLMinutes == that.snapshotTTLMinutes && changeDataTTLMinutes == that.changeDataTTLMinutes && cleanOrphanEnabled == that.cleanOrphanEnabled && - orphanExistingMinutes == that.orphanExistingMinutes && Objects.equal(optimizingConfig, that.optimizingConfig); + orphanExistingMinutes == that.orphanExistingMinutes && + deleteDanglingDeleteFilesEnabled == that.deleteDanglingDeleteFilesEnabled && + Objects.equal(optimizingConfig, that.optimizingConfig); + } + + @Override + public int hashCode() { + return Objects.hashCode(expireSnapshotEnabled, snapshotTTLMinutes, changeDataTTLMinutes, cleanOrphanEnabled, + orphanExistingMinutes, deleteDanglingDeleteFilesEnabled, optimizingConfig); } public static TableConfiguration parseConfig(Map properties) { @@ -109,6 +127,10 @@ public static TableConfiguration parseConfig(Map properties) { properties, TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME, TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT)) + .setDeleteDanglingDeleteFilesEnabled(CompatiblePropertyUtil.propertyAsBoolean( + properties, + TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN, + TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN_DEFAULT)) .setOptimizingConfig(OptimizingConfig.parseOptimizingConfig(properties)); } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java index 41c3ad336e..6a6e1b1f0e 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java @@ -30,9 +30,7 @@ import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableIdentifier; -import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; -import com.netease.arctic.utils.CompatiblePropertyUtil; import com.netease.arctic.utils.TableFileUtil; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; @@ -89,37 +87,30 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or public void execute(TableRuntime tableRuntime) { try { LOG.info("{} clean orphan files", tableRuntime.getTableIdentifier()); - ArcticTable arcticTable = loadTable(tableRuntime); - - boolean needOrphanClean = CompatiblePropertyUtil.propertyAsBoolean( - arcticTable.properties(), - TableProperties.ENABLE_ORPHAN_CLEAN, - TableProperties.ENABLE_ORPHAN_CLEAN_DEFAULT); + TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration(); - if (!needOrphanClean) { + if (!tableConfiguration.isCleanOrphanEnabled()) { return; } - long keepTime = CompatiblePropertyUtil.propertyAsLong( - arcticTable.properties(), - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME, - TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT) * 60 * 1000; + long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000; LOG.info("{} clean orphan files, keepTime={}", tableRuntime.getTableIdentifier(), keepTime); // clear data files + ArcticTable arcticTable = loadTable(tableRuntime); cleanContentFiles(arcticTable, System.currentTimeMillis() - keepTime); - arcticTable = loadTable(tableRuntime); + // it may cost a long time to clean content files, so refresh the table to the current snapshot before cleaning + // the metadata files + arcticTable.refresh(); // clear metadata files cleanMetadata(arcticTable, System.currentTimeMillis() - keepTime); - boolean needCleanDanglingDeleteFiles = CompatiblePropertyUtil.propertyAsBoolean(arcticTable.properties(), - TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN, - TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN_DEFAULT); - - if (!needCleanDanglingDeleteFiles) { + if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) { return; } + // refresh to the current snapshot before clean dangling delete files + arcticTable.refresh(); // clear dangling delete files cleanDanglingDeleteFiles(arcticTable); } catch (Throwable t) { diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java index 4814568318..d407d5fb77 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java @@ -29,9 +29,7 @@ import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.KeyedTable; -import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; -import com.netease.arctic.utils.CompatiblePropertyUtil; import com.netease.arctic.utils.TableFileUtil; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.commons.collections.CollectionUtils; @@ -68,7 +66,7 @@ public class SnapshotsExpiringExecutor extends BaseTableExecutor { private static final int DATA_FILE_LIST_SPLIT = 3000; - // 1 days + // 1 hour private static final long INTERVAL = 60 * 60 * 1000L; public SnapshotsExpiringExecutor(TableManager tableRuntimes, int poolSize) { @@ -93,15 +91,12 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or @Override public void execute(TableRuntime tableRuntime) { try { - ArcticTable arcticTable = loadTable(tableRuntime); - boolean needClean = CompatiblePropertyUtil.propertyAsBoolean( - arcticTable.properties(), - TableProperties.ENABLE_TABLE_EXPIRE, - TableProperties.ENABLE_TABLE_EXPIRE_DEFAULT); - if (!needClean) { + TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration(); + if (!tableConfiguration.isExpireSnapshotEnabled()) { return; } + ArcticTable arcticTable = loadTable(tableRuntime); expireArcticTable(arcticTable, tableRuntime); } catch (Throwable t) { LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); @@ -110,16 +105,11 @@ public void execute(TableRuntime tableRuntime) { public static void expireArcticTable(ArcticTable arcticTable, TableRuntime tableRuntime) { long startTime = System.currentTimeMillis(); + TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration(); LOG.info("{} start expire", tableRuntime.getTableIdentifier()); - long changeDataTTL = CompatiblePropertyUtil.propertyAsLong( - arcticTable.properties(), - TableProperties.CHANGE_DATA_TTL, - TableProperties.CHANGE_DATA_TTL_DEFAULT) * 60 * 1000; - long baseSnapshotsKeepTime = CompatiblePropertyUtil.propertyAsLong( - arcticTable.properties(), - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT) * 60 * 1000; + long changeDataTTL = tableConfiguration.getChangeDataTTLMinutes() * 60 * 1000; + long baseSnapshotsKeepTime = tableConfiguration.getSnapshotTTLMinutes() * 60 * 1000; Set hiveLocations = new HashSet<>(); if (TableTypeUtil.isHive(arcticTable)) { diff --git a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestSnapshotExpire.java b/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestSnapshotExpire.java index 725fad4c71..ba19601527 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestSnapshotExpire.java +++ b/ams/server/src/test/java/com/netease/arctic/server/table/executor/TestSnapshotExpire.java @@ -30,6 +30,7 @@ import com.netease.arctic.server.optimizing.OptimizingProcess; import com.netease.arctic.server.optimizing.OptimizingStatus; import com.netease.arctic.server.table.ServerTableIdentifier; +import com.netease.arctic.server.table.TableConfiguration; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.KeyedTable; @@ -258,6 +259,8 @@ public void testNotExpireFlinkLatestCommit4ChangeTable() { Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( ServerTableIdentifier.of(AmsUtil.toTableIdentifier(testKeyedTable.id()))); Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + Mockito.when(tableRuntime.getTableConfiguration()).thenReturn( + TableConfiguration.parseConfig(testKeyedTable.properties())); Assert.assertEquals(5, Iterables.size(testKeyedTable.changeTable().snapshots())); SnapshotsExpiringExecutor.expireArcticTable( @@ -302,6 +305,8 @@ public void testNotExpireFlinkLatestCommit4All() { Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()))); Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + Mockito.when(tableRuntime.getTableConfiguration()).thenReturn( + TableConfiguration.parseConfig(table.properties())); Assert.assertEquals(4, Iterables.size(table.snapshots())); SnapshotsExpiringExecutor.expireArcticTable( @@ -326,6 +331,8 @@ public void testNotExpireOptimizeCommit4All() { Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()))); Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + Mockito.when(tableRuntime.getTableConfiguration()).thenReturn( + TableConfiguration.parseConfig(table.properties())); SnapshotsExpiringExecutor.expireArcticTable(table, tableRuntime); Assert.assertEquals(1, Iterables.size(table.snapshots()));