Skip to content

Commit

Permalink
[AMORO-1943] using TableConfiguration instead of TableProperties
Browse files Browse the repository at this point in the history
…for AMS (#1944)

* using TableConfiguration instead of TableProperties

* fix unit test
  • Loading branch information
wangtaohz authored Sep 11, 2023
1 parent b4c01ef commit 80239e1
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class TableConfiguration {
private long changeDataTTLMinutes;
private boolean cleanOrphanEnabled;
private long orphanExistingMinutes;
private boolean deleteDanglingDeleteFilesEnabled;
private OptimizingConfig optimizingConfig;

public TableConfiguration() {
Expand Down Expand Up @@ -74,6 +75,15 @@ public TableConfiguration setOrphanExistingMinutes(long orphanExistingMinutes) {
return this;
}

public boolean isDeleteDanglingDeleteFilesEnabled() {
return deleteDanglingDeleteFilesEnabled;
}

public TableConfiguration setDeleteDanglingDeleteFilesEnabled(boolean deleteDanglingDeleteFilesEnabled) {
this.deleteDanglingDeleteFilesEnabled = deleteDanglingDeleteFilesEnabled;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -85,7 +95,15 @@ public boolean equals(Object o) {
TableConfiguration that = (TableConfiguration) o;
return expireSnapshotEnabled == that.expireSnapshotEnabled && snapshotTTLMinutes == that.snapshotTTLMinutes &&
changeDataTTLMinutes == that.changeDataTTLMinutes && cleanOrphanEnabled == that.cleanOrphanEnabled &&
orphanExistingMinutes == that.orphanExistingMinutes && Objects.equal(optimizingConfig, that.optimizingConfig);
orphanExistingMinutes == that.orphanExistingMinutes &&
deleteDanglingDeleteFilesEnabled == that.deleteDanglingDeleteFilesEnabled &&
Objects.equal(optimizingConfig, that.optimizingConfig);
}

@Override
public int hashCode() {
return Objects.hashCode(expireSnapshotEnabled, snapshotTTLMinutes, changeDataTTLMinutes, cleanOrphanEnabled,
orphanExistingMinutes, deleteDanglingDeleteFilesEnabled, optimizingConfig);
}

public static TableConfiguration parseConfig(Map<String, String> properties) {
Expand All @@ -109,6 +127,10 @@ public static TableConfiguration parseConfig(Map<String, String> properties) {
properties,
TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME,
TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT))
.setDeleteDanglingDeleteFilesEnabled(CompatiblePropertyUtil.propertyAsBoolean(
properties,
TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN,
TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN_DEFAULT))
.setOptimizingConfig(OptimizingConfig.parseOptimizingConfig(properties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TableFileUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
Expand Down Expand Up @@ -89,37 +87,30 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
public void execute(TableRuntime tableRuntime) {
try {
LOG.info("{} clean orphan files", tableRuntime.getTableIdentifier());
ArcticTable arcticTable = loadTable(tableRuntime);

boolean needOrphanClean = CompatiblePropertyUtil.propertyAsBoolean(
arcticTable.properties(),
TableProperties.ENABLE_ORPHAN_CLEAN,
TableProperties.ENABLE_ORPHAN_CLEAN_DEFAULT);
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();

if (!needOrphanClean) {
if (!tableConfiguration.isCleanOrphanEnabled()) {
return;
}

long keepTime = CompatiblePropertyUtil.propertyAsLong(
arcticTable.properties(),
TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME,
TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME_DEFAULT) * 60 * 1000;
long keepTime = tableConfiguration.getOrphanExistingMinutes() * 60 * 1000;

LOG.info("{} clean orphan files, keepTime={}", tableRuntime.getTableIdentifier(), keepTime);
// clear data files
ArcticTable arcticTable = loadTable(tableRuntime);
cleanContentFiles(arcticTable, System.currentTimeMillis() - keepTime);

arcticTable = loadTable(tableRuntime);
// it may cost a long time to clean content files, so refresh the table to the current snapshot before cleaning
// the metadata files
arcticTable.refresh();
// clear metadata files
cleanMetadata(arcticTable, System.currentTimeMillis() - keepTime);

boolean needCleanDanglingDeleteFiles = CompatiblePropertyUtil.propertyAsBoolean(arcticTable.properties(),
TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN,
TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN_DEFAULT);

if (!needCleanDanglingDeleteFiles) {
if (!tableConfiguration.isDeleteDanglingDeleteFilesEnabled()) {
return;
}
// refresh to the current snapshot before clean dangling delete files
arcticTable.refresh();
// clear dangling delete files
cleanDanglingDeleteFiles(arcticTable);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.TableFileUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -68,7 +66,7 @@ public class SnapshotsExpiringExecutor extends BaseTableExecutor {

private static final int DATA_FILE_LIST_SPLIT = 3000;

// 1 days
// 1 hour
private static final long INTERVAL = 60 * 60 * 1000L;

public SnapshotsExpiringExecutor(TableManager tableRuntimes, int poolSize) {
Expand All @@ -93,15 +91,12 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
@Override
public void execute(TableRuntime tableRuntime) {
try {
ArcticTable arcticTable = loadTable(tableRuntime);
boolean needClean = CompatiblePropertyUtil.propertyAsBoolean(
arcticTable.properties(),
TableProperties.ENABLE_TABLE_EXPIRE,
TableProperties.ENABLE_TABLE_EXPIRE_DEFAULT);
if (!needClean) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
if (!tableConfiguration.isExpireSnapshotEnabled()) {
return;
}

ArcticTable arcticTable = loadTable(tableRuntime);
expireArcticTable(arcticTable, tableRuntime);
} catch (Throwable t) {
LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t);
Expand All @@ -110,16 +105,11 @@ public void execute(TableRuntime tableRuntime) {

public static void expireArcticTable(ArcticTable arcticTable, TableRuntime tableRuntime) {
long startTime = System.currentTimeMillis();
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
LOG.info("{} start expire", tableRuntime.getTableIdentifier());

long changeDataTTL = CompatiblePropertyUtil.propertyAsLong(
arcticTable.properties(),
TableProperties.CHANGE_DATA_TTL,
TableProperties.CHANGE_DATA_TTL_DEFAULT) * 60 * 1000;
long baseSnapshotsKeepTime = CompatiblePropertyUtil.propertyAsLong(
arcticTable.properties(),
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT) * 60 * 1000;
long changeDataTTL = tableConfiguration.getChangeDataTTLMinutes() * 60 * 1000;
long baseSnapshotsKeepTime = tableConfiguration.getSnapshotTTLMinutes() * 60 * 1000;

Set<String> hiveLocations = new HashSet<>();
if (TableTypeUtil.isHive(arcticTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netease.arctic.server.optimizing.OptimizingProcess;
import com.netease.arctic.server.optimizing.OptimizingStatus;
import com.netease.arctic.server.table.ServerTableIdentifier;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.KeyedTable;
Expand Down Expand Up @@ -258,6 +259,8 @@ public void testNotExpireFlinkLatestCommit4ChangeTable() {
Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(
ServerTableIdentifier.of(AmsUtil.toTableIdentifier(testKeyedTable.id())));
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
Mockito.when(tableRuntime.getTableConfiguration()).thenReturn(
TableConfiguration.parseConfig(testKeyedTable.properties()));

Assert.assertEquals(5, Iterables.size(testKeyedTable.changeTable().snapshots()));
SnapshotsExpiringExecutor.expireArcticTable(
Expand Down Expand Up @@ -302,6 +305,8 @@ public void testNotExpireFlinkLatestCommit4All() {
Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(
ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id())));
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
Mockito.when(tableRuntime.getTableConfiguration()).thenReturn(
TableConfiguration.parseConfig(table.properties()));

Assert.assertEquals(4, Iterables.size(table.snapshots()));
SnapshotsExpiringExecutor.expireArcticTable(
Expand All @@ -326,6 +331,8 @@ public void testNotExpireOptimizeCommit4All() {
Mockito.when(tableRuntime.getTableIdentifier()).thenReturn(
ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id())));
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);
Mockito.when(tableRuntime.getTableConfiguration()).thenReturn(
TableConfiguration.parseConfig(table.properties()));
SnapshotsExpiringExecutor.expireArcticTable(table, tableRuntime);
Assert.assertEquals(1, Iterables.size(table.snapshots()));

Expand Down

0 comments on commit 80239e1

Please sign in to comment.