diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java index 5382f71d9e..1a6ece7c1b 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/OrphanFilesCleaningExecutor.java @@ -74,7 +74,7 @@ protected boolean enabled(TableRuntime tableRuntime) { } @Override - protected void execute(TableRuntime tableRuntime) { + public void execute(TableRuntime tableRuntime) { try { LOG.info("{} clean orphan files", tableRuntime.getTableIdentifier()); ArcticTable arcticTable = tableRuntime.loadTable(); diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java index 4c69f006bf..213e0738c5 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/SnapshotsExpiringExecutor.java @@ -160,7 +160,8 @@ public static void expireArcticTable(ArcticTable arcticTable, TableRuntime table changeExclude.addAll(finalHiveLocations); long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable); - // avoid getting expired snapshots when deleting ttl files + // keep ttl <= snapshot keep time to avoid getting expired snapshots(but can't get) when deleting files. + // if ttl > snapshot keep time, some files will not be deleted correctly. long changeOlderThan = changeDataTTL <= changeSnapshotsKeepTime ? startTime - changeSnapshotsKeepTime : startTime - changeDataTTL; LOG.info("{} change table expire with latestFlinkCommitTime={}, olderThan={}", arcticTable.id(), @@ -214,7 +215,6 @@ public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) { */ public static long fetchOptimizingSnapshotTime(UnkeyedTable table, TableRuntime tableRuntime) { if (tableRuntime.getOptimizingStatus() != OptimizingStatus.IDLE) { - long currentSnapshotId = tableRuntime.getCurrentSnapshotId(); for (Snapshot snapshot : table.snapshots()) { if (snapshot.snapshotId() == currentSnapshotId) { @@ -256,7 +256,7 @@ public static void expireSnapshots( LOG.info("to delete {} files, success delete {} files", toDeleteFiles.get(), deleteFiles.get()); } - private static Snapshot getClosestExpireSnapshot(UnkeyedTable changeTable, long ttl) { + public static Snapshot getClosestExpireSnapshot(UnkeyedTable changeTable, long ttl) { if (changeTable.snapshots() == null) { return null; } @@ -266,7 +266,7 @@ private static Snapshot getClosestExpireSnapshot(UnkeyedTable changeTable, long .orElse(null); } - private static List getClosestExpireDataFiles(UnkeyedTable changeTable, Snapshot closestExpireSnapshot) { + public static List getClosestExpireDataFiles(UnkeyedTable changeTable, Snapshot closestExpireSnapshot) { List changeTTLDataFiles = new ArrayList<>(); long recentExpireSnapshotId = closestExpireSnapshot.snapshotId(); try (CloseableIterable fileScanTasks = changeTable.newScan() diff --git a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java index 37b8176fff..eb618231bb 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java @@ -46,7 +46,7 @@ public static Set getAllContentFilePath(UnkeyedTable internalTable) { Set validFilesPath = new HashSet<>(); TableEntriesScan manifestReader = TableEntriesScan.builder(internalTable) - .includeFileContent(FileContent.DATA, FileContent.POSITION_DELETES) + .includeFileContent(FileContent.DATA, FileContent.POSITION_DELETES, FileContent.EQUALITY_DELETES) .allEntries().build(); for (IcebergFileEntry entry : manifestReader.entries()) { validFilesPath.add(TableFileUtil.getUriPath(entry.getFile().path().toString())); diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/ExecutorTestUtil.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/ExecutorTestUtil.java new file mode 100644 index 0000000000..91e0191a48 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/ExecutorTestUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.data.ChangeAction; +import com.netease.arctic.hive.io.HiveDataTestHelpers; +import com.netease.arctic.hive.utils.HiveTableUtil; +import com.netease.arctic.io.DataTestHelpers; +import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.table.UnkeyedTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ReachableFileUtil; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.Assert; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.List; + +import static com.netease.arctic.hive.catalog.HiveTableTestHelper.HIVE_TABLE_SCHEMA; + +public class ExecutorTestUtil { + public static List createRecords(int start, int length) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = start; i < start + length; i++) { + builder.add(DataTestHelpers.createRecord( + i, "name" + i, 0L, + LocalDateTime.of(2022, 1, i % 2 + 1, 12, 0, 0).toString())); + } + return builder.build(); + } + + public static List createHiveRecords(int start, int length) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = start; i < start + length; i++) { + String opTime = LocalDateTime.of(2022, 1, i % 2 + 1, 12, 0, 0).toString(); + builder.add(DataTestHelpers.createRecord(HIVE_TABLE_SCHEMA, + i, "name" + i, 0L, opTime, opTime + "Z", new BigDecimal("0"), opTime.substring(0, 10) + )); + } + return builder.build(); + } + + public static List writeAndCommitBaseStore(UnkeyedTable table) { + // write 4 file,100 records to 2 partitions(2022-1-1\2022-1-2) + List dataFiles = DataTestHelpers.writeBaseStore(table, 0, createRecords(1,100), false); + AppendFiles appendFiles = table.newAppend(); + dataFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + return dataFiles; + } + + + public static List writeAndCommitBaseAndHive( + ArcticTable table, long txId, boolean writeHive) { + String hiveSubDir = HiveTableUtil.newHiveSubdirectory(txId); + List dataFiles = HiveDataTestHelpers.writeBaseStore( + table, txId, ExecutorTestUtil.createHiveRecords(1,100), false, writeHive, hiveSubDir); + UnkeyedTable baseTable = table.isKeyedTable() ? + table.asKeyedTable().baseTable() : table.asUnkeyedTable(); + AppendFiles baseAppend = baseTable.newAppend(); + dataFiles.forEach(baseAppend::appendFile); + baseAppend.commit(); + return dataFiles; + } + + public static void writeAndCommitBaseAndChange(ArcticTable table) { + List baseFiles = DataTestHelpers.writeBaseStore( + table, 1, createRecords(1, 100), false); + AppendFiles appendFiles = table.isKeyedTable() ? + table.asKeyedTable().baseTable().newAppend() : table.asUnkeyedTable().newAppend(); + baseFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + if (table.isKeyedTable()) { + DataTestHelpers.writeAndCommitChangeStore( + table.asKeyedTable(), 2, ChangeAction.INSERT, createRecords(101, 100) + ); + } + } + + public static void assertMetadataExists(ArcticTable table) { + if (table.isKeyedTable()){ + checkMetadataExistence(table.asKeyedTable().changeTable()); + } + UnkeyedTable baseTable = table.isKeyedTable() ? table.asKeyedTable().baseTable() : table.asUnkeyedTable(); + checkMetadataExistence(baseTable); + } + + public static void checkMetadataExistence(UnkeyedTable table) { + for (Snapshot snapshot : table.snapshots()) { + Assert.assertTrue(table.io().exists(snapshot.manifestListLocation())); + for (ManifestFile allManifest : snapshot.allManifests(table.io())) { + Assert.assertTrue(table.io().exists(allManifest.path())); + } + } + for (String metadataFile : ReachableFileUtil.metadataFileLocations(table, false)) { + Assert.assertTrue(table.io().exists(metadataFile)); + } + Assert.assertTrue(table.io().exists(ReachableFileUtil.versionHintLocation(table))); + } + +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestHiveCommitSync.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestHiveCommitSync.java new file mode 100644 index 0000000000..e056bf66e5 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestHiveCommitSync.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.hive.HiveTableProperties; +import com.netease.arctic.hive.TestHMS; +import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; +import com.netease.arctic.hive.catalog.HiveTableTestHelper; +import com.netease.arctic.hive.table.SupportHive; +import com.netease.arctic.hive.utils.HivePartitionUtil; +import com.netease.arctic.server.table.executor.HiveCommitSyncExecutor; +import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.TableFileUtil; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.StructLikeMap; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static com.netease.arctic.utils.TablePropertyUtil.EMPTY_STRUCT; + +@RunWith(Parameterized.class) +public class TestHiveCommitSync extends TableTestBase { + @ClassRule + public static TestHMS TEST_HMS = new TestHMS(); + + public TestHiveCommitSync(boolean ifKeyed, boolean ifPartitioned) { + super(new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {true, true}, + {true, false}, + {false, true}, + {false, false}}; + } + + + @Test + public void testUnPartitionTableSyncInIceberg() throws Exception { + Assume.assumeFalse(isPartitionedTable()); + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + StructLikeMap> partitionProperty = baseTable.partitionProperty(); + Assert.assertEquals(0, partitionProperty.size()); + String newLocation = createEmptyLocationForHive(getArcticTable()); + baseTable.updatePartitionProperties(null) + .set(EMPTY_STRUCT, HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION, newLocation).commit(); + String hiveLocation = ((SupportHive) getArcticTable()).getHMSClient().run(client -> { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName()); + return hiveTable.getSd().getLocation(); + }); + Assert.assertNotEquals(newLocation, hiveLocation); + + HiveCommitSyncExecutor.syncIcebergToHive(getArcticTable()); + hiveLocation = ((SupportHive) getArcticTable()).getHMSClient().run(client -> { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName()); + return hiveTable.getSd().getLocation(); + }); + Assert.assertEquals(newLocation, hiveLocation); + } + + @Test + public void testUnPartitionTableSyncNotInIceberg() throws Exception { + Assume.assumeFalse(isPartitionedTable()); + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + StructLikeMap> partitionProperty = baseTable.partitionProperty(); + Assert.assertEquals(0, partitionProperty.size()); + + String oldHiveLocation = ((SupportHive) getArcticTable()).getHMSClient().run(client -> { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName()); + return hiveTable.getSd().getLocation(); + }); + + HiveCommitSyncExecutor.syncIcebergToHive(getArcticTable()); + String newHiveLocation = ((SupportHive) getArcticTable()).getHMSClient().run(client -> { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName()); + return hiveTable.getSd().getLocation(); + }); + Assert.assertEquals(oldHiveLocation, newHiveLocation); + } + + @Test + public void testSyncOnlyInIceberg() throws Exception { + Assume.assumeTrue(isPartitionedTable()); + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + StructLikeMap> partitionProperty = baseTable.partitionProperty(); + Assert.assertEquals(0, partitionProperty.size()); + List dataFiles = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, true); + String partitionLocation = TableFileUtil.getFileDir(dataFiles.get(0).path().toString()); + baseTable.updatePartitionProperties(null) + .set(dataFiles.get(0).partition(), HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION, partitionLocation) + .commit(); + + List partitionValues = + HivePartitionUtil.partitionValuesAsList(dataFiles.get(0).partition(), getArcticTable().spec().partitionType()); + Assert.assertThrows(NoSuchObjectException.class, () -> ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues))); + + HiveCommitSyncExecutor.syncIcebergToHive(getArcticTable()); + Partition hivePartition = ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues)); + Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); + } + + @Test + public void testSyncOnlyInHiveCreateByArctic() throws Exception { + Assume.assumeTrue(isPartitionedTable()); + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + StructLikeMap> partitionProperty = baseTable.partitionProperty(); + Assert.assertEquals(0, partitionProperty.size()); + + List dataFiles = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, true); + String partitionLocation = TableFileUtil.getFileDir(dataFiles.get(0).path().toString()); + List partitionValues = + HivePartitionUtil.partitionValuesAsList(dataFiles.get(0).partition(), getArcticTable().spec().partitionType()); + ((SupportHive) getArcticTable()).getHMSClient().run(client -> + { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), getArcticTable().id().getTableName()); + StorageDescriptor tableSd = hiveTable.getSd(); + PrincipalPrivilegeSet privilegeSet = hiveTable.getPrivileges(); + int lastAccessTime = (int) (System.currentTimeMillis() / 1000); + Partition p = new Partition(); + p.setValues(partitionValues); + p.setDbName(hiveTable.getDbName()); + p.setTableName(hiveTable.getTableName()); + p.setCreateTime(lastAccessTime); + p.setLastAccessTime(lastAccessTime); + StorageDescriptor sd = tableSd.deepCopy(); + sd.setLocation(partitionLocation); + p.setSd(sd); + + int files = dataFiles.size(); + long totalSize = dataFiles.stream().map(ContentFile::fileSizeInBytes).reduce(0L, Long::sum); + p.putToParameters("transient_lastDdlTime", lastAccessTime + ""); + p.putToParameters("totalSize", totalSize + ""); + p.putToParameters("numFiles", files + ""); + p.putToParameters(HiveTableProperties.ARCTIC_TABLE_FLAG, "true"); + if (privilegeSet != null) { + p.setPrivileges(privilegeSet.deepCopy()); + } + + return client.addPartition(p); + }); + + Partition hivePartition = ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues)); + Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); + + HiveCommitSyncExecutor.syncIcebergToHive(getArcticTable()); + + Assert.assertThrows(NoSuchObjectException.class, () -> ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues))); + } + + @Test + public void testSyncOnlyInHiveCreateNotByArctic() throws Exception { + Assume.assumeTrue(isPartitionedTable()); + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + StructLikeMap> partitionProperty = baseTable.partitionProperty(); + Assert.assertEquals(0, partitionProperty.size()); + + List dataFiles = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, true); + String partitionLocation = TableFileUtil.getFileDir(dataFiles.get(0).path().toString()); + List partitionValues = + HivePartitionUtil.partitionValuesAsList(dataFiles.get(0).partition(), getArcticTable().spec().partitionType()); + ((SupportHive) getArcticTable()).getHMSClient().run(client -> + { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), getArcticTable().id().getTableName()); + StorageDescriptor tableSd = hiveTable.getSd(); + PrincipalPrivilegeSet privilegeSet = hiveTable.getPrivileges(); + int lastAccessTime = (int) (System.currentTimeMillis() / 1000); + Partition p = new Partition(); + p.setValues(partitionValues); + p.setDbName(hiveTable.getDbName()); + p.setTableName(hiveTable.getTableName()); + p.setCreateTime(lastAccessTime); + p.setLastAccessTime(lastAccessTime); + StorageDescriptor sd = tableSd.deepCopy(); + sd.setLocation(partitionLocation); + p.setSd(sd); + + int files = dataFiles.size(); + long totalSize = dataFiles.stream().map(ContentFile::fileSizeInBytes).reduce(0L, Long::sum); + p.putToParameters("transient_lastDdlTime", lastAccessTime + ""); + p.putToParameters("totalSize", totalSize + ""); + p.putToParameters("numFiles", files + ""); + if (privilegeSet != null) { + p.setPrivileges(privilegeSet.deepCopy()); + } + + return client.addPartition(p); + }); + + Partition hivePartition = ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues)); + Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); + + HiveCommitSyncExecutor.syncIcebergToHive(getArcticTable()); + + hivePartition = ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues)); + Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); + } + + @Test + public void testSyncInBoth() throws Exception { + Assume.assumeTrue(isPartitionedTable()); + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + StructLikeMap> partitionProperty = baseTable.partitionProperty(); + Assert.assertEquals(0, partitionProperty.size()); + + List dataFiles = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, true); + String partitionLocation = TableFileUtil.getFileDir(dataFiles.get(0).path().toString()); + List partitionValues = + HivePartitionUtil.partitionValuesAsList(dataFiles.get(0).partition(), getArcticTable().spec().partitionType()); + ((SupportHive) getArcticTable()).getHMSClient().run(client -> + { + Table hiveTable = client.getTable(getArcticTable().id().getDatabase(), getArcticTable().id().getTableName()); + StorageDescriptor tableSd = hiveTable.getSd(); + PrincipalPrivilegeSet privilegeSet = hiveTable.getPrivileges(); + int lastAccessTime = (int) (System.currentTimeMillis() / 1000); + Partition p = new Partition(); + p.setValues(partitionValues); + p.setDbName(hiveTable.getDbName()); + p.setTableName(hiveTable.getTableName()); + p.setCreateTime(lastAccessTime); + p.setLastAccessTime(lastAccessTime); + StorageDescriptor sd = tableSd.deepCopy(); + sd.setLocation(partitionLocation); + p.setSd(sd); + + int files = dataFiles.size(); + long totalSize = dataFiles.stream().map(ContentFile::fileSizeInBytes).reduce(0L, Long::sum); + p.putToParameters("transient_lastDdlTime", lastAccessTime + ""); + p.putToParameters("totalSize", totalSize + ""); + p.putToParameters("numFiles", files + ""); + if (privilegeSet != null) { + p.setPrivileges(privilegeSet.deepCopy()); + } + + return client.addPartition(p); + }); + + Partition hivePartition = ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues)); + Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); + + List newDataFiles = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 2, true); + String newPartitionLocation = TableFileUtil.getFileDir(newDataFiles.get(0).path().toString()); + baseTable.updatePartitionProperties(null) + .set(newDataFiles.get(0).partition(), HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION, newPartitionLocation) + .commit(); + Assert.assertNotEquals(newPartitionLocation, hivePartition.getSd().getLocation()); + + HiveCommitSyncExecutor.syncIcebergToHive(getArcticTable()); + + hivePartition = ((SupportHive) getArcticTable()).getHMSClient().run(client -> + client.getPartition(getArcticTable().id().getDatabase(), + getArcticTable().id().getTableName(), partitionValues)); + Assert.assertEquals(newPartitionLocation, hivePartition.getSd().getLocation()); + } + + private String createEmptyLocationForHive(ArcticTable arcticTable) { + // create a new empty location for hive + String newLocation = ((SupportHive) arcticTable).hiveLocation() + "/ts_" + System.currentTimeMillis(); + OutputFile file = arcticTable.io().newOutputFile(newLocation + "/.keep"); + try { + file.createOrOverwrite().close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return newLocation; + } +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanHive.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanHive.java new file mode 100644 index 0000000000..741c05427b --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanHive.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.hive.TestHMS; +import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; +import com.netease.arctic.hive.catalog.HiveTableTestHelper; +import com.netease.arctic.hive.table.SupportHive; +import com.netease.arctic.server.dashboard.utils.AmsUtil; +import com.netease.arctic.server.table.ServerTableIdentifier; +import com.netease.arctic.server.table.TableRuntime; +import com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor; +import com.netease.arctic.table.UnkeyedTable; +import org.apache.iceberg.io.OutputFile; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; + +import static com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor.DATA_FOLDER_NAME; + +@RunWith(Parameterized.class) +public class TestOrphanFileCleanHive extends TableTestBase { + + @ClassRule + public static TestHMS TEST_HMS = new TestHMS(); + + public TestOrphanFileCleanHive(boolean ifKeyed, boolean ifPartitioned) { + super(new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {true, true}, + {true, false}, + {false, true}, + {false, false}}; + } + + private static OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; + + @Before + public void mock() { + orphanFilesCleaningExecutor = Mockito.mock(OrphanFilesCleaningExecutor.class); + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.loadTable()).thenReturn(getArcticTable()); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(getArcticTable().id()))); + Mockito.doCallRealMethod().when(orphanFilesCleaningExecutor).execute(tableRuntime); + } + + @Test + public void orphanDataFileClean() throws IOException { + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + String baseOrphanFilePath = baseTable.location() + + File.separator + DATA_FOLDER_NAME + File.separator + "orphan.parquet"; + String hiveOrphanFilePath = ((SupportHive) getArcticTable()).hiveLocation() + + File.separator + DATA_FOLDER_NAME + File.separator + "orphan.parquet"; + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(baseOrphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + OutputFile changeOrphanDataFile = getArcticTable().io().newOutputFile(hiveOrphanFilePath); + changeOrphanDataFile.createOrOverwrite().close(); + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFilePath)); + Assert.assertTrue(getArcticTable().io().exists(hiveOrphanFilePath)); + orphanFilesCleaningExecutor.cleanContentFiles(getArcticTable(), System.currentTimeMillis()); + Assert.assertFalse(getArcticTable().io().exists(baseOrphanFilePath)); + Assert.assertTrue(getArcticTable().io().exists(hiveOrphanFilePath)); + } + + @Test + public void orphanMetadataFileClean() throws IOException { + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + String orphanFilePath = baseTable.location() + File.separator + "metadata" + + File.separator + "orphan.avro"; + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(orphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + Assert.assertTrue(getArcticTable().io().exists(orphanFilePath)); + orphanFilesCleaningExecutor.cleanMetadata(getArcticTable(), System.currentTimeMillis()); + Assert.assertFalse(getArcticTable().io().exists(orphanFilePath)); + ExecutorTestUtil.assertMetadataExists(getArcticTable()); + } + +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanIceberg.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanIceberg.java new file mode 100644 index 0000000000..8ea3b379c7 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanIceberg.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.server.dashboard.utils.AmsUtil; +import com.netease.arctic.server.table.ServerTableIdentifier; +import com.netease.arctic.server.table.TableRuntime; +import com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor; +import org.apache.iceberg.io.OutputFile; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; + +import static com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor.DATA_FOLDER_NAME; + +@RunWith(Parameterized.class) +public class TestOrphanFileCleanIceberg extends TableTestBase { + + public TestOrphanFileCleanIceberg(boolean ifKeyed, boolean ifPartitioned) { + super(new BasicCatalogTestHelper(TableFormat.ICEBERG), + new BasicTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {false, true}, + {false, false}}; + } + + private static OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; + + @Before + public void mock() { + orphanFilesCleaningExecutor = Mockito.mock(OrphanFilesCleaningExecutor.class); + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.loadTable()).thenReturn(getArcticTable()); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(getArcticTable().id()))); + Mockito.doCallRealMethod().when(orphanFilesCleaningExecutor).execute(tableRuntime); + } + + @Test + public void orphanDataFileClean() throws IOException { + String orphanFilePath = getArcticTable().asUnkeyedTable().location() + + File.separator + DATA_FOLDER_NAME + File.separator + "orphan.parquet"; + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(orphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + Assert.assertTrue(getArcticTable().io().exists(orphanFilePath)); + orphanFilesCleaningExecutor.cleanContentFiles(getArcticTable(), System.currentTimeMillis()); + Assert.assertFalse(getArcticTable().io().exists(orphanFilePath)); + } + + @Test + public void orphanMetadataFileClean() throws IOException { + String orphanFilePath = getArcticTable().asUnkeyedTable().location() + File.separator + "metadata" + + File.separator + "orphan.avro"; + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(orphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + Assert.assertTrue(getArcticTable().io().exists(orphanFilePath)); + orphanFilesCleaningExecutor.cleanMetadata(getArcticTable(), System.currentTimeMillis()); + Assert.assertFalse(getArcticTable().io().exists(orphanFilePath)); + ExecutorTestUtil.assertMetadataExists(getArcticTable()); + } + +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanMix.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanMix.java new file mode 100644 index 0000000000..70e7959edc --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestOrphanFileCleanMix.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.data.ChangeAction; +import com.netease.arctic.io.DataTestHelpers; +import com.netease.arctic.server.dashboard.utils.AmsUtil; +import com.netease.arctic.server.table.ServerTableIdentifier; +import com.netease.arctic.server.table.TableRuntime; +import com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor; +import com.netease.arctic.table.KeyedTable; +import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.TableFileUtil; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.io.OutputFile; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.netease.arctic.server.persistence.excutors.ExecutorTestUtil.createRecords; +import static com.netease.arctic.server.persistence.excutors.ExecutorTestUtil.writeAndCommitBaseAndChange; +import static com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor.DATA_FOLDER_NAME; +import static com.netease.arctic.server.table.executor.OrphanFilesCleaningExecutor.FLINK_JOB_ID; + +@RunWith(Parameterized.class) +public class TestOrphanFileCleanMix extends TableTestBase { + + public TestOrphanFileCleanMix(boolean ifKeyed, boolean ifPartitioned) { + super(new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {true, true}, + {true, false}, + {false, true}, + {false, false}}; + } + + private static OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; + private static TableRuntime tableRuntime; + + @Before + public void mock() { + orphanFilesCleaningExecutor = Mockito.mock(OrphanFilesCleaningExecutor.class); + tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.loadTable()).thenReturn(getArcticTable()); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(getArcticTable().id()))); + Mockito.doCallRealMethod().when(orphanFilesCleaningExecutor).execute(tableRuntime); + } + + @Test + public void orphanDataFileClean() throws IOException { + writeAndCommitBaseAndChange(getArcticTable()); + + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + String baseOrphanFileDir = baseTable.location() + + File.separator + DATA_FOLDER_NAME + File.separator + "testLocation"; + String baseOrphanFilePath = baseOrphanFileDir + File.separator + "orphan.parquet"; + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(baseOrphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + + String changeOrphanFilePath = isKeyedTable() ? getArcticTable().asKeyedTable().changeTable().location() + + File.separator + DATA_FOLDER_NAME + File.separator + "orphan.parquet" : ""; + if (isKeyedTable()) { + OutputFile changeOrphanDataFile = getArcticTable().io().newOutputFile(changeOrphanFilePath); + changeOrphanDataFile.createOrOverwrite().close(); + Assert.assertTrue(getArcticTable().io().exists(changeOrphanFilePath)); + } + + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFileDir)); + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFilePath)); + + + orphanFilesCleaningExecutor.execute(tableRuntime); + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFileDir)); + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFilePath)); + if (isKeyedTable()) { + Assert.assertTrue(getArcticTable().io().exists(changeOrphanFilePath)); + } + + + getArcticTable().updateProperties() + .set(TableProperties.MIN_ORPHAN_FILE_EXISTING_TIME, "0") + .set(TableProperties.ENABLE_ORPHAN_CLEAN, "true") + .commit(); + orphanFilesCleaningExecutor.execute(tableRuntime); + + Assert.assertFalse(getArcticTable().io().exists(baseOrphanFileDir)); + Assert.assertFalse(getArcticTable().io().exists(baseOrphanFilePath)); + if (isKeyedTable()) { + Assert.assertFalse(getArcticTable().io().exists(changeOrphanFilePath)); + } + for (FileScanTask task : baseTable.newScan().planFiles()) { + Assert.assertTrue(getArcticTable().io().exists(task.file().path().toString())); + } + if (isKeyedTable()) { + for (FileScanTask task : getArcticTable().asKeyedTable().changeTable().newScan().planFiles()) { + Assert.assertTrue(getArcticTable().io().exists(task.file().path().toString())); + } + } + } + + @Test + public void orphanChangeDataFileInBaseClean() throws IOException { + Assume.assumeTrue(isKeyedTable()); + KeyedTable testKeyedTable = getArcticTable().asKeyedTable(); + List dataFiles = DataTestHelpers.writeChangeStore( + testKeyedTable, 1, ChangeAction.INSERT, createRecords(1, 100), false); + Set pathAll = new HashSet<>(); + Set fileInBaseStore = new HashSet<>(); + Set fileOnlyInChangeLocation = new HashSet<>(); + + AppendFiles appendFiles = testKeyedTable.asKeyedTable().baseTable().newAppend(); + + for (int i = 0; i < dataFiles.size(); i++) { + DataFile dataFile = dataFiles.get(i); + pathAll.add(TableFileUtil.getUriPath(dataFile.path().toString())); + if (i == 0) { + appendFiles.appendFile(dataFile).commit(); + fileInBaseStore.add(TableFileUtil.getUriPath(dataFile.path().toString())); + } else { + fileOnlyInChangeLocation.add(TableFileUtil.getUriPath(dataFile.path().toString())); + } + } + pathAll.forEach(path -> Assert.assertTrue(testKeyedTable.io().exists(path))); + + orphanFilesCleaningExecutor.cleanContentFiles(testKeyedTable, System.currentTimeMillis()); + fileInBaseStore.forEach(path -> Assert.assertTrue(testKeyedTable.io().exists(path))); + fileOnlyInChangeLocation.forEach(path -> Assert.assertFalse(testKeyedTable.io().exists(path))); + } + + @Test + public void orphanMetadataFileClean() throws IOException { + ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, false); + + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + String baseOrphanFilePath = baseTable.location() + File.separator + "metadata" + + File.separator + "orphan.avro"; + + String changeOrphanFilePath = isKeyedTable() ? getArcticTable().asKeyedTable().changeTable().location() + + File.separator + "metadata" + File.separator + "orphan.avro" : ""; + + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(baseOrphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + + String changeInvalidMetadataJson = isKeyedTable() ? getArcticTable().asKeyedTable().changeTable().location() + + File.separator + "metadata" + File.separator + "v0.metadata.json" : ""; + if (isKeyedTable()) { + OutputFile changeOrphanDataFile = getArcticTable().io().newOutputFile(changeOrphanFilePath); + changeOrphanDataFile.createOrOverwrite().close(); + getArcticTable().io().newOutputFile(changeInvalidMetadataJson).createOrOverwrite().close(); + } + + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFilePath)); + if (isKeyedTable()) { + Assert.assertTrue(getArcticTable().io().exists(changeOrphanFilePath)); + Assert.assertTrue(getArcticTable().io().exists(changeInvalidMetadataJson)); + } + + orphanFilesCleaningExecutor.cleanMetadata(getArcticTable(), System.currentTimeMillis()); + Assert.assertFalse(getArcticTable().io().exists(baseOrphanFilePath)); + if (isKeyedTable()) { + Assert.assertFalse(getArcticTable().io().exists(changeOrphanFilePath)); + Assert.assertFalse(getArcticTable().io().exists(changeInvalidMetadataJson)); + } + ExecutorTestUtil.assertMetadataExists(getArcticTable()); + } + + @Test + public void notDeleteFlinkTemporaryFile() throws IOException { + ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, false); + String flinkJobId = "flinkJobTest"; + String fakeFlinkJobId = "fakeFlinkJobTest"; + + UnkeyedTable baseTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + String baseOrphanFilePath = baseTable.location() + File.separator + "metadata" + + File.separator + flinkJobId + "orphan.avro"; + + String changeOrphanFilePath = isKeyedTable() ? getArcticTable().asKeyedTable().changeTable().location() + + File.separator + "metadata" + File.separator + flinkJobId + "orphan.avro" : ""; + String fakeChangeOrphanFilePath = isKeyedTable() ? getArcticTable().asKeyedTable().changeTable().location() + + File.separator + "metadata" + File.separator + fakeFlinkJobId + "orphan.avro" : ""; + String changeInvalidMetadataJson = isKeyedTable() ? getArcticTable().asKeyedTable().changeTable().location() + + File.separator + "metadata" + File.separator + "v0.metadata.json" : ""; + + OutputFile baseOrphanDataFile = getArcticTable().io().newOutputFile(baseOrphanFilePath); + baseOrphanDataFile.createOrOverwrite().close(); + if (isKeyedTable()) { + OutputFile changeOrphanDataFile = getArcticTable().io().newOutputFile(changeOrphanFilePath); + changeOrphanDataFile.createOrOverwrite().close(); + OutputFile fakeChangeOrphanDataFile = getArcticTable().io().newOutputFile(fakeChangeOrphanFilePath); + fakeChangeOrphanDataFile.createOrOverwrite().close(); + + getArcticTable().io().newOutputFile(changeInvalidMetadataJson).createOrOverwrite().close(); + AppendFiles appendFiles = getArcticTable().asKeyedTable().changeTable().newAppend(); + appendFiles.set(FLINK_JOB_ID, fakeFlinkJobId); + appendFiles.commit(); + // set flink.job-id to change table + AppendFiles appendFiles2 = getArcticTable().asKeyedTable().changeTable().newAppend(); + appendFiles2.set(FLINK_JOB_ID, flinkJobId); + appendFiles2.commit(); + } + + Assert.assertTrue(getArcticTable().io().exists(baseOrphanFilePath)); + if (isKeyedTable()) { + Assert.assertTrue(getArcticTable().io().exists(changeOrphanFilePath)); + Assert.assertTrue(getArcticTable().io().exists(fakeChangeOrphanFilePath)); + Assert.assertTrue(getArcticTable().io().exists(changeInvalidMetadataJson)); + } + + orphanFilesCleaningExecutor.cleanMetadata(getArcticTable(), System.currentTimeMillis()); + Assert.assertFalse(getArcticTable().io().exists(baseOrphanFilePath)); + if (isKeyedTable()) { + // files whose file name starts with flink.job-id should not be deleted + Assert.assertTrue(getArcticTable().io().exists(changeOrphanFilePath)); + Assert.assertFalse(getArcticTable().io().exists(fakeChangeOrphanFilePath)); + Assert.assertFalse(getArcticTable().io().exists(changeInvalidMetadataJson)); + } + + ExecutorTestUtil.assertMetadataExists(getArcticTable()); + } + +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireHive.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireHive.java new file mode 100644 index 0000000000..2128ddd614 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireHive.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.hive.TestHMS; +import com.netease.arctic.hive.catalog.HiveCatalogTestHelper; +import com.netease.arctic.hive.catalog.HiveTableTestHelper; +import com.netease.arctic.server.table.executor.SnapshotsExpiringExecutor; +import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.TableFileUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@RunWith(Parameterized.class) +public class TestSnapshotExpireHive extends TableTestBase { + + @ClassRule + public static TestHMS TEST_HMS = new TestHMS(); + + public TestSnapshotExpireHive(boolean ifKeyed, boolean ifPartitioned) { + super(new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {true, true}, + {true, false}, + {false, true}, + {false, false}}; + } + + @Test + public void testExpireTableFiles() { + List hiveFiles = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, true); + List s2Files = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, false); + + DeleteFiles deleteHiveFiles = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable().newDelete() : getArcticTable().asUnkeyedTable().newDelete(); + for (DataFile hiveFile : hiveFiles) { + Assert.assertTrue(getArcticTable().io().exists(hiveFile.path().toString())); + deleteHiveFiles.deleteFile(hiveFile); + } + deleteHiveFiles.commit(); + + DeleteFiles deleteIcebergFiles = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable().newDelete() : getArcticTable().asUnkeyedTable().newDelete(); + for (DataFile s2File : s2Files) { + Assert.assertTrue(getArcticTable().io().exists(s2File.path().toString())); + deleteIcebergFiles.deleteFile(s2File); + } + deleteIcebergFiles.commit(); + + List s3Files = ExecutorTestUtil.writeAndCommitBaseAndHive(getArcticTable(), 1, false); + s3Files.forEach(file -> Assert.assertTrue(getArcticTable().io().exists(file.path().toString()))); + + Set hiveLocation = new HashSet<>(); + String partitionHiveLocation = hiveFiles.get(0).path().toString(); + hiveLocation.add(TableFileUtil.getUriPath(TableFileUtil.getFileDir(partitionHiveLocation))); + if (isPartitionedTable()) { + String anotherHiveLocation = partitionHiveLocation.contains("op_time_day=2022-01-01") ? + partitionHiveLocation.replace("op_time_day=2022-01-01", "op_time_day=2022-01-02") : + partitionHiveLocation.replace("op_time_day=2022-01-02", "op_time_day=2022-01-01"); + hiveLocation.add(TableFileUtil.getUriPath(TableFileUtil.getFileDir(anotherHiveLocation))); + } + UnkeyedTable unkeyedTable = isKeyedTable() ? + getArcticTable().asKeyedTable().baseTable() : getArcticTable().asUnkeyedTable(); + SnapshotsExpiringExecutor.expireSnapshots(unkeyedTable, System.currentTimeMillis(), hiveLocation); + Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots())); + + hiveFiles.forEach(file -> Assert.assertTrue(getArcticTable().io().exists(file.path().toString()))); + s2Files.forEach(file -> Assert.assertFalse(getArcticTable().io().exists(file.path().toString()))); + s3Files.forEach(file -> Assert.assertTrue(getArcticTable().io().exists(file.path().toString()))); + } + +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireIceberg.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireIceberg.java new file mode 100644 index 0000000000..ce90644022 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireIceberg.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.server.dashboard.utils.AmsUtil; +import com.netease.arctic.server.optimizing.OptimizingStatus; +import com.netease.arctic.server.table.ServerTableIdentifier; +import com.netease.arctic.server.table.TableRuntime; +import com.netease.arctic.server.table.executor.SnapshotsExpiringExecutor; +import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.util.HashSet; +import java.util.List; + +import static com.netease.arctic.server.persistence.excutors.ExecutorTestUtil.writeAndCommitBaseStore; + +@RunWith(Parameterized.class) +public class TestSnapshotExpireIceberg extends TableTestBase { + + public TestSnapshotExpireIceberg(boolean ifKeyed, boolean ifPartitioned) { + super(new BasicCatalogTestHelper(TableFormat.ICEBERG), + new BasicTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {false, true}, + {false, false}}; + } + + + @Test + public void testExpireTableFiles() { + UnkeyedTable table = getArcticTable().asUnkeyedTable(); + List dataFiles = writeAndCommitBaseStore(table); + + DeleteFiles deleteFiles = table.newDelete(); + for (DataFile dataFile : dataFiles) { + Assert.assertTrue(table.io().exists(dataFile.path().toString())); + deleteFiles.deleteFile(dataFile); + } + deleteFiles.commit(); + + List newDataFiles = writeAndCommitBaseStore(table); + Assert.assertEquals(3, Iterables.size(table.snapshots())); + SnapshotsExpiringExecutor.expireSnapshots(table, System.currentTimeMillis(), + new HashSet<>()); + Assert.assertEquals(1, Iterables.size(table.snapshots())); + + dataFiles.forEach(file -> Assert.assertFalse(table.io().exists(file.path().toString()))); + newDataFiles.forEach(file -> Assert.assertTrue(table.io().exists(file.path().toString()))); + } + + @Test + public void testNotExpireFlinkLatestCommit() { + UnkeyedTable table = getArcticTable().asUnkeyedTable(); + writeAndCommitBaseStore(table); + Assert.assertEquals(Long.MAX_VALUE, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(table)); + + AppendFiles appendFiles = table.newAppend(); + appendFiles.set(SnapshotsExpiringExecutor.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "100"); + appendFiles.commit(); + long checkpointTime = table.currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(table)); + + AppendFiles appendFiles2 = table.newAppend(); + appendFiles2.set(SnapshotsExpiringExecutor.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "101"); + appendFiles2.commit(); + long checkpointTime2 = table.currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime2, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(table)); + + writeAndCommitBaseStore(table); + Assert.assertEquals(checkpointTime2, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(table)); + + table.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()))); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + + Assert.assertEquals(4, Iterables.size(table.snapshots())); + SnapshotsExpiringExecutor.expireArcticTable( + table, tableRuntime); + + Assert.assertEquals(2, Iterables.size(table.snapshots())); + } + + @Test + public void testNotExpireOptimizeCommit() { + UnkeyedTable testTable = getArcticTable().asUnkeyedTable(); + // commit snapshot + testTable.newAppend().commit(); + testTable.newAppend().commit(); + testTable.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(testTable.id()))); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + SnapshotsExpiringExecutor.expireArcticTable(testTable, tableRuntime); + Assert.assertEquals(1, Iterables.size(testTable.snapshots())); + + testTable.newAppend().commit(); + + // mock tableRuntime which has optimizing task not committed + long optimizeSnapshotId = testTable.currentSnapshot().snapshotId(); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.COMMITTING); + Mockito.when(tableRuntime.getCurrentSnapshotId()).thenReturn(optimizeSnapshotId); + + + testTable.newAppend().commit(); + testTable.newAppend().commit(); + + SnapshotsExpiringExecutor.expireArcticTable(testTable, tableRuntime); + Assert.assertEquals(3, Iterables.size(testTable.snapshots())); + } + +} diff --git a/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireMix.java b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireMix.java new file mode 100644 index 0000000000..f1fb67c958 --- /dev/null +++ b/ams/server/src/test/java/com/netease/arctic/server/persistence/excutors/TestSnapshotExpireMix.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.server.persistence.excutors; + +import com.netease.arctic.BasicTableTestHelper; +import com.netease.arctic.ams.api.TableFormat; +import com.netease.arctic.catalog.BasicCatalogTestHelper; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.data.ChangeAction; +import com.netease.arctic.io.DataTestHelpers; +import com.netease.arctic.op.UpdatePartitionProperties; +import com.netease.arctic.server.dashboard.utils.AmsUtil; +import com.netease.arctic.server.optimizing.OptimizingStatus; +import com.netease.arctic.server.table.ServerTableIdentifier; +import com.netease.arctic.server.table.TableRuntime; +import com.netease.arctic.server.table.executor.SnapshotsExpiringExecutor; +import com.netease.arctic.server.utils.IcebergTableUtil; +import com.netease.arctic.table.KeyedTable; +import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + + +@RunWith(Parameterized.class) +public class TestSnapshotExpireMix extends TableTestBase { + + private final List changeTableFiles = new ArrayList<>(); + + public TestSnapshotExpireMix(boolean ifKeyed, boolean ifPartitioned) { + super(new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(ifKeyed, ifPartitioned)); + } + + @Parameterized.Parameters(name = "ifKeyed = {0}, ifPartitioned = {1}") + public static Object[][] parameters() { + return new Object[][]{ + {true, true}, + {true, false}, + {false, true}, + {false, false}}; + } + + @Test + public void testDeleteChangeFiles() throws Exception { + Assume.assumeTrue(isKeyedTable()); + Assume.assumeTrue(isPartitionedTable()); + KeyedTable testKeyedTable = getArcticTable().asKeyedTable(); + List s1Files = insertChangeDataFiles(testKeyedTable, 1); + List partitions = + new ArrayList<>(s1Files.stream().collect(Collectors.groupingBy(ContentFile::partition)).keySet()); + Assert.assertEquals(2, partitions.size()); + + UpdatePartitionProperties updateProperties = testKeyedTable.baseTable().updatePartitionProperties(null); + updateProperties.set(partitions.get(0), TableProperties.PARTITION_OPTIMIZED_SEQUENCE, "3"); + updateProperties.set(partitions.get(1), TableProperties.PARTITION_OPTIMIZED_SEQUENCE, "0"); + updateProperties.commit(); + List existedDataFiles = new ArrayList<>(); + try (CloseableIterable fileScanTasks = testKeyedTable.changeTable().newScan().planFiles()) { + fileScanTasks.forEach(fileScanTask -> existedDataFiles.add(fileScanTask.file())); + } + Assert.assertEquals(4, existedDataFiles.size()); + + SnapshotsExpiringExecutor.deleteChangeFile( + testKeyedTable, changeTableFiles, testKeyedTable.changeTable().currentSnapshot().sequenceNumber()); + List currentDataFiles = new ArrayList<>(); + try (CloseableIterable fileScanTasks = testKeyedTable.changeTable().newScan().planFiles()) { + fileScanTasks.forEach(fileScanTask -> currentDataFiles.add(fileScanTask.file())); + } + Assert.assertEquals(2, currentDataFiles.size()); + changeTableFiles.forEach(file -> Assert.assertTrue(testKeyedTable.io().exists(file.path().toString()))); + } + + @Test + public void testExpireTableFiles() throws Exception { + Assume.assumeTrue(isKeyedTable()); + Assume.assumeTrue(isPartitionedTable()); + KeyedTable testKeyedTable = getArcticTable().asKeyedTable(); + List s1Files = insertChangeDataFiles(testKeyedTable, 1); + List partitions = + new ArrayList<>(s1Files.stream().collect(Collectors.groupingBy(ContentFile::partition)).keySet()); + Assert.assertEquals(2, partitions.size()); + + UpdatePartitionProperties updateProperties = testKeyedTable.baseTable().updatePartitionProperties(null); + updateProperties.set(partitions.get(0), TableProperties.PARTITION_OPTIMIZED_SEQUENCE, "3"); + updateProperties.set(partitions.get(1), TableProperties.PARTITION_OPTIMIZED_SEQUENCE, "1"); + updateProperties.commit(); + s1Files.forEach(file -> Assert.assertTrue(testKeyedTable.io().exists(file.path().toString()))); + SnapshotsExpiringExecutor.deleteChangeFile( + testKeyedTable, changeTableFiles, testKeyedTable.changeTable().currentSnapshot().sequenceNumber()); + Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots())); + + insertChangeDataFiles(testKeyedTable, 2); + SnapshotsExpiringExecutor.expireSnapshots(testKeyedTable.changeTable(), System.currentTimeMillis(), new HashSet<>()); + Assert.assertEquals(1, Iterables.size(testKeyedTable.changeTable().snapshots())); + s1Files.forEach(file -> Assert.assertFalse(testKeyedTable.io().exists(file.path().toString()))); + } + + @Test + public void testExpiredChangeTableFilesInBase() throws Exception { + Assume.assumeTrue(isKeyedTable()); + Assume.assumeTrue(isPartitionedTable()); + KeyedTable testKeyedTable = getArcticTable().asKeyedTable(); + List s1Files = insertChangeDataFiles(testKeyedTable, 1); + testKeyedTable.baseTable().newAppend().appendFile(s1Files.get(0)).commit(); + List partitions = + new ArrayList<>(s1Files.stream().collect(Collectors.groupingBy(ContentFile::partition)).keySet()); + Assert.assertEquals(2, partitions.size()); + + UpdatePartitionProperties updateProperties = testKeyedTable.baseTable().updatePartitionProperties(null); + updateProperties.set(partitions.get(0), TableProperties.PARTITION_OPTIMIZED_SEQUENCE, "3"); + updateProperties.set(partitions.get(1), TableProperties.PARTITION_OPTIMIZED_SEQUENCE, "1"); + updateProperties.commit(); + Assert.assertTrue(testKeyedTable.io().exists((String) s1Files.get(0).path())); + SnapshotsExpiringExecutor.deleteChangeFile( + testKeyedTable, changeTableFiles, testKeyedTable.changeTable().currentSnapshot().sequenceNumber()); + Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots())); + + Set exclude = IcebergTableUtil.getAllContentFilePath(testKeyedTable.baseTable()); + insertChangeDataFiles(testKeyedTable, 2); + SnapshotsExpiringExecutor.expireSnapshots(testKeyedTable.changeTable(), System.currentTimeMillis(), exclude); + Assert.assertEquals(1, Iterables.size(testKeyedTable.changeTable().snapshots())); + Assert.assertTrue(testKeyedTable.io().exists((String) s1Files.get(0).path())); + Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(1).path())); + } + + @Test + public void testNotExpireFlinkLatestCommit() throws IOException { + Assume.assumeTrue(isKeyedTable()); + KeyedTable testKeyedTable = getArcticTable().asKeyedTable(); + insertChangeDataFiles(testKeyedTable, 1); + insertChangeDataFiles(testKeyedTable, 2); + Assert.assertEquals(Long.MAX_VALUE, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + AppendFiles appendFiles = testKeyedTable.changeTable().newAppend(); + appendFiles.set(SnapshotsExpiringExecutor.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "100"); + appendFiles.commit(); + long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + AppendFiles appendFiles2 = testKeyedTable.changeTable().newAppend(); + appendFiles2.set(SnapshotsExpiringExecutor.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "101"); + appendFiles2.commit(); + long checkpointTime2 = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime2, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + insertChangeDataFiles(testKeyedTable, 2); + Assert.assertEquals(checkpointTime2, + SnapshotsExpiringExecutor.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + testKeyedTable.updateProperties().set(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, "0").commit(); + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(testKeyedTable.id()))); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + + Assert.assertEquals(5, Iterables.size(testKeyedTable.changeTable().snapshots())); + SnapshotsExpiringExecutor.expireArcticTable( + testKeyedTable, tableRuntime); + + Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots())); + } + + + @Test + public void testGetClosestFiles() throws IOException { + Assume.assumeTrue(isKeyedTable()); + KeyedTable testKeyedTable = getArcticTable().asKeyedTable(); + insertChangeDataFiles(testKeyedTable, 1); + insertChangeDataFiles(testKeyedTable, 2); + testKeyedTable.changeTable().newAppend().commit(); + + Set willDelete = new HashSet<>(); + testKeyedTable.changeTable().newScan().planFiles().forEach(task -> willDelete.add(task.file())); + Assert.assertEquals(8, willDelete.size()); + Assert.assertEquals(3, Iterables.size(testKeyedTable.changeTable().snapshots())); + + long secondCommitTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Snapshot secondSnapshot = testKeyedTable.changeTable().currentSnapshot(); + + insertChangeDataFiles(testKeyedTable, 3); + Assert.assertEquals(12, Iterables.size(testKeyedTable.changeTable().newScan().planFiles())); + Snapshot closestExpireSnapshot = SnapshotsExpiringExecutor.getClosestExpireSnapshot( + testKeyedTable.changeTable(), secondCommitTime); + Snapshot closestExpireSnapshot2 = SnapshotsExpiringExecutor.getClosestExpireSnapshot( + testKeyedTable.changeTable(), secondCommitTime + 1); + Assert.assertEquals(secondSnapshot, closestExpireSnapshot); + Assert.assertEquals(secondSnapshot, closestExpireSnapshot2); + + Set dataFilesPath = new HashSet<>(SnapshotsExpiringExecutor.getClosestExpireDataFiles( + testKeyedTable.changeTable(), closestExpireSnapshot)) + .stream().map(ContentFile::path).collect(Collectors.toSet()); + Set willDeletePath = willDelete.stream().map(ContentFile::path).collect(Collectors.toSet()); + + Assert.assertTrue(willDeletePath.equals(dataFilesPath)); + } + + @Test + public void testNotExpireOptimizeCommit() { + Assume.assumeFalse(isKeyedTable()); + UnkeyedTable testTable = getArcticTable().asUnkeyedTable(); + testTable.newAppend().commit(); + testTable.newAppend().commit(); + testTable.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()).thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(testTable.id()))); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + SnapshotsExpiringExecutor.expireArcticTable(testTable, tableRuntime); + Assert.assertEquals(1, Iterables.size(testTable.snapshots())); + + testTable.newAppend().commit(); + + // mock tableRuntime which has optimizing task not committed + long optimizeSnapshotId = testTable.currentSnapshot().snapshotId(); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.COMMITTING); + Mockito.when(tableRuntime.getCurrentSnapshotId()).thenReturn(optimizeSnapshotId); + + + testTable.newAppend().commit(); + testTable.newAppend().commit(); + + SnapshotsExpiringExecutor.expireArcticTable(testTable, tableRuntime); + Assert.assertEquals(3, Iterables.size(testTable.snapshots())); + } + + private List insertChangeDataFiles(KeyedTable testKeyedTable, long transactionId) throws IOException { + List changeInsertFiles = DataTestHelpers.writeAndCommitChangeStore( + testKeyedTable, transactionId, ChangeAction.INSERT, ExecutorTestUtil.createRecords(1, 100)); + changeTableFiles.addAll(changeInsertFiles); + return changeInsertFiles; + } + +}