diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java index 62edbe05c4..02aac39323 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java @@ -19,19 +19,17 @@ package com.netease.arctic.ams.server.service.impl; import com.netease.arctic.ams.api.Constants; -import com.netease.arctic.ams.api.DataFileInfo; import com.netease.arctic.ams.server.service.IOrphanFilesCleanService; import com.netease.arctic.ams.server.service.ServiceContainer; import com.netease.arctic.ams.server.utils.CatalogUtil; import com.netease.arctic.ams.server.utils.HiveLocationUtils; import com.netease.arctic.ams.server.utils.ScheduledTasks; import com.netease.arctic.ams.server.utils.ThreadPool; +import com.netease.arctic.ams.server.utils.UnKeyedTableUtil; import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.catalog.CatalogLoader; import com.netease.arctic.io.ArcticFileIO; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.BaseTable; -import com.netease.arctic.table.ChangeTable; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; @@ -40,11 +38,9 @@ import com.netease.arctic.utils.TableFileUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +52,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; public class OrphanFilesCleanService implements IOrphanFilesCleanService { private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesCleanService.class); @@ -125,61 +120,50 @@ public void run() { public static void clean(ArcticTable arcticTable, long lastTime, boolean execute, String mode, boolean metadata) { + Set validFiles = new HashSet<>(); + // For clean data files, should get valid files in the base store and the change store, so acquire in advance + // to prevent repeated acquisition + if (!metadata) { + validFiles = getValidDataFiles(arcticTable); + } if (arcticTable.isKeyedTable()) { KeyedTable keyedArcticTable = arcticTable.asKeyedTable(); if (Constants.INNER_TABLE_BASE.equals(mode)) { - clearInternalTable(keyedArcticTable, keyedArcticTable.baseTable(), lastTime, execute, metadata); + clearInternalTable(keyedArcticTable, keyedArcticTable.baseTable(), lastTime, execute, metadata, validFiles); } else if (Constants.INNER_TABLE_CHANGE.equals(mode)) { if (keyedArcticTable.primaryKeySpec().primaryKeyExisted()) { - clearInternalTable(keyedArcticTable, keyedArcticTable.changeTable(), lastTime, execute, metadata); + clearInternalTable(keyedArcticTable, keyedArcticTable.changeTable(), lastTime, execute, metadata, validFiles); } else { throw new IllegalStateException("no pk table, only support mode=all/base"); } } else if ("all".equals(mode)) { - clearInternalTable(keyedArcticTable, keyedArcticTable.baseTable(), lastTime, execute, metadata); - clearInternalTable(keyedArcticTable, keyedArcticTable.changeTable(), lastTime, execute, metadata); + clearInternalTable(keyedArcticTable, keyedArcticTable.baseTable(), lastTime, execute, metadata, validFiles); + clearInternalTable(keyedArcticTable, keyedArcticTable.changeTable(), lastTime, execute, metadata, validFiles); } else { throw new IllegalStateException("only support mode=all/base/change"); } } else { - clearInternalTable(arcticTable, arcticTable.asUnkeyedTable(), lastTime, execute, metadata); + clearInternalTable(arcticTable, arcticTable.asUnkeyedTable(), lastTime, execute, metadata, validFiles); } } private static void clearInternalTable(ArcticTable table, UnkeyedTable internalTable, long lastTime, - boolean execute, boolean metadata) { + boolean execute, boolean metadata, Set exclude) { if (metadata) { clearInternalTableMetadata(table, internalTable, lastTime, execute); } else { - clearInternalTableDataFiles(table, internalTable, lastTime, execute); + clearInternalTableDataFiles(table, internalTable, lastTime, execute, exclude); } } private static void clearInternalTableDataFiles(ArcticTable table, UnkeyedTable internalTable, long lastTime, - boolean execute) { - Set validFiles = getValidDataFiles(table.id(), table.io(), internalTable); - LOG.info("{} table get {} valid files", table.id(), validFiles.size()); + boolean execute, Set exclude) { int deleteFilesCnt = 0; - Set exclude = new HashSet<>(); - if (internalTable instanceof BaseTable) { - List dataFilesInfo = ServiceContainer.getFileInfoCacheService() - .getOptimizeDatafiles(table.id().buildTableIdentifier(), Constants.INNER_TABLE_CHANGE); - exclude = dataFilesInfo.stream().map(DataFileInfo::getPath).collect(Collectors.toSet()); - } else if (internalTable instanceof ChangeTable) { - List dataFilesInfo = ServiceContainer.getFileInfoCacheService() - .getOptimizeDatafiles(table.id().buildTableIdentifier(), Constants.INNER_TABLE_BASE); - exclude = dataFilesInfo.stream().map(DataFileInfo::getPath).collect(Collectors.toSet()); - } - - // add hive location to exclude - exclude.addAll(HiveLocationUtils.getHiveLocation(table)); - String dataLocation = internalTable.location() + File.separator + DATA_FOLDER_NAME; if (table.io().exists(dataLocation)) { for (FileStatus fileStatus : table.io().list(dataLocation)) { deleteFilesCnt += deleteInvalidDataFiles(table.io(), fileStatus, - validFiles, lastTime, exclude, execute); @@ -212,7 +196,6 @@ private static String formatTime(long timestamp) { private static int deleteInvalidDataFiles(ArcticFileIO io, FileStatus fileStatus, - Set validFiles, Long lastTime, Set exclude, boolean execute) { @@ -222,7 +205,7 @@ private static int deleteInvalidDataFiles(ArcticFileIO io, LOG.info("start orphan files clean in {}", location); int deleteFileCnt = 0; for (FileStatus file : io.list(location)) { - deleteFileCnt += deleteInvalidDataFiles(io, file, validFiles, lastTime, exclude, execute); + deleteFileCnt += deleteInvalidDataFiles(io, file, lastTime, exclude, execute); } LOG.info("delete[{}] {} files in {}", execute, deleteFileCnt, location); @@ -247,17 +230,15 @@ private static int deleteInvalidDataFiles(ArcticFileIO io, return 0; } } else { - if (!validFiles.contains(location) && + if (execute && + !exclude.contains(location) && + !exclude.contains(TableFileUtils.getUriPath(new Path(location).getParent().toString())) && fileStatus.getModificationTime() < lastTime) { - if (execute && - !exclude.contains(location) && - !exclude.contains(new Path(location).getParent().toString())) { - io.deleteFile(location); - } + io.deleteFile(location); return 1; - } else { - return 0; } + + return 0; } } @@ -313,34 +294,28 @@ private static Set getValidMetadataFiles(TableIdentifier tableIdentifier return validFiles; } - private static Set getValidDataFiles(TableIdentifier tableIdentifier, ArcticFileIO io, - UnkeyedTable internalTable) { + private static Set getValidDataFiles(ArcticTable arcticTable) { Set validFiles = new HashSet<>(); - Iterable snapshots = internalTable.snapshots(); - int size = Iterables.size(snapshots); - LOG.info("{} get {} snapshots to scan", tableIdentifier, size); - int cnt = 0; - for (Snapshot snapshot : snapshots) { - cnt++; - int before = validFiles.size(); - io.doAs(() -> { - // valid data files - - try (CloseableIterable fileScanTasks = - internalTable.newScan().useSnapshot(snapshot.snapshotId()).planFiles()) { - for (FileScanTask scanTask : fileScanTasks) { - if (scanTask.file() != null) { - validFiles.add(TableFileUtils.getUriPath(scanTask.file().path().toString())); - scanTask.deletes().forEach(file -> validFiles.add(TableFileUtils.getUriPath(file.path().toString()))); - } - } - return null; - } - }); - LOG.info("{} scan snapshot {}: {} and get {} files, complete {}/{}", tableIdentifier, snapshot.snapshotId(), - formatTime(snapshot.timestampMillis()), validFiles.size() - before, cnt, size); + if (arcticTable.isKeyedTable()) { + Set baseValidFiles = UnKeyedTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().baseTable()); + LOG.info("{} get {} valid files in the base store", arcticTable.id(), baseValidFiles.size()); + Set changeValidFiles = UnKeyedTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().changeTable()); + LOG.info("{} get {} valid files in the change store", arcticTable.id(), baseValidFiles.size()); + validFiles.addAll(baseValidFiles); + validFiles.addAll(changeValidFiles); + } else { + Set baseValidFiles = UnKeyedTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable()); + LOG.info("{} get {} valid files in the base store", arcticTable.id(), baseValidFiles.size()); + validFiles.addAll(baseValidFiles); } + LOG.info("{} get {} valid files", arcticTable.id(), validFiles.size()); + + // add hive location to exclude + Set hiveValidLocations = HiveLocationUtils.getHiveLocation(arcticTable); + LOG.info("{} get {} valid locations in the Hive", arcticTable.id(), hiveValidLocations.size()); + validFiles.addAll(hiveValidLocations); + return validFiles; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index 2e92971474..b93c71a1e0 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -18,7 +18,6 @@ package com.netease.arctic.ams.server.service.impl; -import com.netease.arctic.ams.api.Constants; import com.netease.arctic.ams.api.DataFileInfo; import com.netease.arctic.ams.server.service.ITableExpireService; import com.netease.arctic.ams.server.service.ServiceContainer; @@ -28,10 +27,9 @@ import com.netease.arctic.ams.server.utils.HiveLocationUtils; import com.netease.arctic.ams.server.utils.ScheduledTasks; import com.netease.arctic.ams.server.utils.ThreadPool; +import com.netease.arctic.ams.server.utils.UnKeyedTableUtil; import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.catalog.CatalogLoader; -import com.netease.arctic.data.DefaultKeyedFile; -import com.netease.arctic.data.PrimaryKeyedFile; import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.KeyedTable; @@ -129,27 +127,29 @@ public void run() { LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier); return null; } - List changeFilesInfo = ServiceContainer.getFileInfoCacheService() - .getOptimizeDatafiles(tableIdentifier.buildTableIdentifier(), Constants.INNER_TABLE_CHANGE); - Set baseExclude = changeFilesInfo.stream().map(DataFileInfo::getPath).collect(Collectors.toSet()); - baseExclude.addAll(finalHiveLocation); - expireSnapshots(baseTable, startTime - baseSnapshotsKeepTime, baseExclude); - long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); - UnkeyedTable changeTable = keyedArcticTable.changeTable(); if (changeTable == null) { LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier); return null; } + + // get valid files in the change store which shouldn't physically delete when expire the snapshot + // in the base store + Set baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable); + baseExclude.addAll(finalHiveLocation); + expireSnapshots(baseTable, startTime - baseSnapshotsKeepTime, baseExclude); + long baseCleanedTime = System.currentTimeMillis(); + LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); + // delete ttl files List changeDataFiles = ServiceContainer.getFileInfoCacheService() .getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(), System.currentTimeMillis() - changeDataTTL); deleteChangeFile(keyedArcticTable, changeDataFiles); - List baseFilesInfo = ServiceContainer.getFileInfoCacheService() - .getOptimizeDatafiles(tableIdentifier.buildTableIdentifier(), Constants.INNER_TABLE_BASE); - Set changeExclude = baseFilesInfo.stream().map(DataFileInfo::getPath).collect(Collectors.toSet()); + + // get valid files in the base store which shouldn't physically delete when expire the snapshot + // in the change store + Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); changeExclude.addAll(finalHiveLocation); expireSnapshots(changeTable, startTime - changeSnapshotsKeepTime, changeExclude); return null; @@ -237,7 +237,8 @@ public static void expireSnapshots(UnkeyedTable arcticInternalTable, .retainLast(1).expireOlderThan(olderThan) .deleteWith(file -> { try { - if (!exclude.contains(file) && !exclude.contains(new Path(file).getParent().toString())) { + String filePath = TableFileUtils.getUriPath(file); + if (!exclude.contains(filePath) && !exclude.contains(new Path(filePath).getParent().toString())) { arcticInternalTable.io().deleteFile(file); } parentDirectory.add(new Path(file).getParent().toString()); diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/ContentFileUtil.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/ContentFileUtil.java index de69979d83..f87511c0e1 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/ContentFileUtil.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/ContentFileUtil.java @@ -27,6 +27,9 @@ import org.apache.iceberg.FileMetadata; import org.apache.iceberg.PartitionSpec; +/** + * Tools for handling the ContentFile which in Iceberg + */ public class ContentFileUtil { public static ContentFileWithSequence buildContentFile(DataFileInfo dataFileInfo, diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/HiveLocationUtils.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/HiveLocationUtils.java index 39ac503a49..68babf68f8 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/HiveLocationUtils.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/HiveLocationUtils.java @@ -21,6 +21,7 @@ import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.utils.TableFileUtils; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; @@ -45,7 +46,7 @@ public static Set getHiveLocation(ArcticTable table) { try { Table hiveTable = ((SupportHive) table).getHMSClient().run(client -> client.getTable(table.id().getDatabase(), table.id().getTableName())); - hiveLocations.add(hiveTable.getSd().getLocation()); + hiveLocations.add(TableFileUtils.getUriPath(hiveTable.getSd().getLocation())); } catch (Exception e) { LOG.error("{} get hive table error", table.id(), e); throw new IllegalStateException("get hive table error", e); @@ -55,7 +56,7 @@ public static Set getHiveLocation(ArcticTable table) { List partitions = ((SupportHive) table).getHMSClient().run(client -> client.listPartitions(table.id().getDatabase(), table.id().getTableName(), Short.MAX_VALUE)); for (Partition partition : partitions) { - hiveLocations.add(partition.getSd().getLocation()); + hiveLocations.add(TableFileUtils.getUriPath(partition.getSd().getLocation())); } } catch (Exception e) { LOG.error("{} get hive partitions error", table.id(), e); diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtil.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtil.java index 39ceeb34c0..e077fe2211 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtil.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtil.java @@ -20,9 +20,28 @@ import com.netease.arctic.ams.server.model.TableOptimizeRuntime; import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.ManifestEntryFields; +import com.netease.arctic.utils.TableFileUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; public class UnKeyedTableUtil { + private static final Logger LOG = LoggerFactory.getLogger(UnKeyedTableUtil.class); + public static long getSnapshotId(UnkeyedTable internalTable) { internalTable.refresh(); Snapshot currentSnapshot = internalTable.currentSnapshot(); @@ -37,4 +56,32 @@ public static Snapshot getCurrentSnapshot(UnkeyedTable internalTable) { internalTable.refresh(); return internalTable.currentSnapshot(); } + + public static Set getAllContentFilePath(UnkeyedTable internalTable) { + Set validFilesPath = new HashSet<>(); + + Table manifestTable = + MetadataTableUtils.createMetadataTableInstance(((HasTableOperations) internalTable).operations(), + internalTable.name(), metadataTableName(internalTable.name(), MetadataTableType.ALL_ENTRIES), + MetadataTableType.ALL_ENTRIES); + try (CloseableIterable entries = IcebergGenerics.read(manifestTable).build()) { + for (Record entry : entries) { + ManifestEntryFields.Status status = + ManifestEntryFields.Status.of((int) entry.get(ManifestEntryFields.STATUS.fieldId())); + if (status == ManifestEntryFields.Status.ADDED || status == ManifestEntryFields.Status.EXISTING) { + GenericRecord dataFile = (GenericRecord) entry.get(ManifestEntryFields.DATA_FILE_ID); + String filePath = (String) dataFile.getField(DataFile.FILE_PATH.name()); + validFilesPath.add(TableFileUtils.getUriPath(filePath)); + } + } + } catch (IOException e) { + LOG.error("close manifest file error", e); + } + + return validFilesPath; + } + + private static String metadataTableName(String tableName, MetadataTableType type) { + return tableName + (tableName.contains("/") ? "#" : ".") + type; + } } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java index 5f8999299f..c3a11b7f89 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java @@ -49,11 +49,10 @@ import com.netease.arctic.ams.server.service.TestDDLTracerService; import com.netease.arctic.ams.server.service.TestFileInfoCacheService; import com.netease.arctic.ams.server.service.TestOptimizerService; -import com.netease.arctic.ams.server.service.impl.ContainerMetaService; -import com.netease.arctic.ams.server.service.impl.TestTableBlockerService; import com.netease.arctic.ams.server.service.impl.AdaptHiveService; import com.netease.arctic.ams.server.service.impl.ArcticTransactionService; import com.netease.arctic.ams.server.service.impl.CatalogMetadataService; +import com.netease.arctic.ams.server.service.impl.ContainerMetaService; import com.netease.arctic.ams.server.service.impl.DDLTracerService; import com.netease.arctic.ams.server.service.impl.FileInfoCacheService; import com.netease.arctic.ams.server.service.impl.JDBCMetaService; @@ -61,9 +60,11 @@ import com.netease.arctic.ams.server.service.impl.OptimizerService; import com.netease.arctic.ams.server.service.impl.PlatformFileInfoService; import com.netease.arctic.ams.server.service.impl.TableBlockerService; +import com.netease.arctic.ams.server.service.impl.TestTableBlockerService; import com.netease.arctic.ams.server.util.DerbyTestUtil; import com.netease.arctic.ams.server.utils.CatalogUtil; import com.netease.arctic.ams.server.utils.JDBCSqlSessionFactoryProvider; +import com.netease.arctic.ams.server.utils.UnKeyedTableUtilTest; import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.catalog.CatalogLoader; import com.netease.arctic.hive.HMSMockServer; @@ -123,7 +124,8 @@ TestTableBlockerService.class, SupportHiveTestGroup.class, TestArcticTransactionService.class, - TestOptimizerService.class + TestOptimizerService.class, + UnKeyedTableUtilTest.class }) @PrepareForTest({ JDBCSqlSessionFactoryProvider.class, diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java index 631a601a45..f8e37420e6 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java @@ -22,6 +22,7 @@ import com.netease.arctic.ams.api.DataFileInfo; import com.netease.arctic.ams.server.service.impl.TableExpireService; import com.netease.arctic.ams.server.util.DataFileInfoUtils; +import com.netease.arctic.ams.server.utils.UnKeyedTableUtil; import com.netease.arctic.data.ChangeAction; import com.netease.arctic.io.writer.GenericChangeTaskWriter; import com.netease.arctic.io.writer.GenericTaskWriters; @@ -49,6 +50,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class TestExpiredFileClean extends TableTestBase { @@ -99,6 +101,29 @@ public void testExpireTableFiles() throws Exception { Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(0).path())); } + @Test + public void testExpiredChangeTableFilesInBase() throws Exception { + List s1Files = insertChangeDataFiles(1); + testKeyedTable.baseTable().newAppend().appendFile(s1Files.get(0)).commit(); + List partitions = new ArrayList<>(s1Files.stream().collect(Collectors.groupingBy(ContentFile::partition)).keySet()); + Assert.assertEquals(2, partitions.size()); + + UpdatePartitionProperties updateProperties = testKeyedTable.baseTable().updatePartitionProperties(null); + updateProperties.set(partitions.get(0), TableProperties.PARTITION_MAX_TRANSACTION_ID, "3"); + updateProperties.set(partitions.get(1), TableProperties.PARTITION_MAX_TRANSACTION_ID, "1"); + updateProperties.commit(); + Assert.assertTrue(testKeyedTable.io().exists((String) s1Files.get(0).path())); + TableExpireService.deleteChangeFile(testKeyedTable, changeTableFilesInfo); + Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots())); + + Set exclude = UnKeyedTableUtil.getAllContentFilePath(testKeyedTable.baseTable()); + insertChangeDataFiles(2); + TableExpireService.expireSnapshots(testKeyedTable.changeTable(), System.currentTimeMillis(), exclude); + Assert.assertEquals(1, Iterables.size(testKeyedTable.changeTable().snapshots())); + Assert.assertTrue(testKeyedTable.io().exists((String) s1Files.get(0).path())); + Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(1).path())); + } + private List insertChangeDataFiles(long transactionId) throws IOException { GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable) .withChangeAction(ChangeAction.INSERT) diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileCleanSupportHive.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileCleanSupportHive.java index b10fc367d0..23ee5779f8 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileCleanSupportHive.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileCleanSupportHive.java @@ -69,7 +69,7 @@ public void testExpireTableFiles() throws Exception { Set hiveLocation = new HashSet<>(); if (TableTypeUtil.isHive(testUnPartitionKeyedHiveTable)) { - hiveLocation.add(TableFileUtils.getFileDir(hiveFiles.get(0).path().toString())); + hiveLocation.add(TableFileUtils.getUriPath(TableFileUtils.getFileDir(hiveFiles.get(0).path().toString()))); } TableExpireService.expireSnapshots(testUnPartitionKeyedHiveTable.baseTable(), System.currentTimeMillis(), hiveLocation); Assert.assertEquals(1, Iterables.size(testUnPartitionKeyedHiveTable.baseTable().snapshots())); diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestOrphanFileClean.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestOrphanFileClean.java index fcb9cee325..d1a9e72a25 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestOrphanFileClean.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestOrphanFileClean.java @@ -24,12 +24,19 @@ import com.netease.arctic.ams.server.service.impl.FileInfoCacheService; import com.netease.arctic.ams.server.service.impl.OrphanFilesCleanService; import com.netease.arctic.ams.server.utils.JDBCSqlSessionFactoryProvider; +import com.netease.arctic.io.writer.GenericChangeTaskWriter; +import com.netease.arctic.io.writer.GenericTaskWriters; +import com.netease.arctic.utils.TableFileUtils; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriteResult; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -39,7 +46,9 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static com.netease.arctic.ams.server.service.impl.OrphanFilesCleanService.DATA_FOLDER_NAME; import static org.powermock.api.mockito.PowerMockito.mockStatic; @@ -88,6 +97,42 @@ public void orphanDataFileClean() throws IOException { } } + @Test + public void orphanChangeDataFileInBaseClean() throws IOException { + GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable) + .withTransactionId(1L).buildChangeWriter(); + for (Record record : baseRecords(1, 10, testKeyedTable.schema())) { + writer.write(record); + } + Set pathAll = new HashSet<>(); + Set fileInBaseStore = new HashSet<>(); + Set fileOnlyInChangeLocation = new HashSet<>(); + WriteResult result = writer.complete(); + AppendFiles appendFiles = testKeyedTable.asKeyedTable().baseTable().newAppend(); + + for (int i = 0; i < result.dataFiles().length; i++) { + DataFile dataFile = result.dataFiles()[i]; + pathAll.add(TableFileUtils.getUriPath(dataFile.path().toString())); + if (i == 0) { + appendFiles.appendFile(dataFile).commit(); + fileInBaseStore.add(TableFileUtils.getUriPath(dataFile.path().toString())); + } else { + fileOnlyInChangeLocation.add(TableFileUtils.getUriPath(dataFile.path().toString())); + } + } + for (String s : pathAll) { + Assert.assertTrue(testKeyedTable.io().exists(s)); + } + + OrphanFilesCleanService.clean(testKeyedTable, System.currentTimeMillis(), true, "all", false); + for (String s : fileInBaseStore) { + Assert.assertTrue(testKeyedTable.io().exists(s)); + } + for (String s : fileOnlyInChangeLocation) { + Assert.assertFalse(testKeyedTable.io().exists(s)); + } + } + @Test public void orphanMetadataFileClean() throws IOException { insertTableBaseDataFiles(testKeyedTable); diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtilTest.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtilTest.java new file mode 100644 index 0000000000..132d603932 --- /dev/null +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/utils/UnKeyedTableUtilTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.ams.server.utils; + +import com.netease.arctic.ams.api.properties.TableFormat; +import com.netease.arctic.catalog.TableTestBase; +import com.netease.arctic.io.DataTestHelpers; +import com.netease.arctic.io.writer.GenericBaseTaskWriter; +import com.netease.arctic.io.writer.GenericTaskWriters; +import com.netease.arctic.io.writer.SortedPosDeleteWriter; +import com.netease.arctic.utils.TableFileUtils; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class UnKeyedTableUtilTest extends TableTestBase { + public UnKeyedTableUtilTest() { + super(TableFormat.MIXED_ICEBERG, true, true); + } + + @Test + public void testGetAllContentFilePath() throws Exception { + GenericBaseTaskWriter writer = GenericTaskWriters.builderFor(getArcticTable().asKeyedTable()) + .withTransactionId(1L).buildBaseWriter(); + + for (Record record : writeRecords()) { + writer.write(record); + } + + // DataFiles + Set s1FilePath = new HashSet<>(); + WriteResult result = writer.complete(); + AppendFiles appendFiles = getArcticTable().asKeyedTable().baseTable().newAppend(); + for (DataFile dataFile : result.dataFiles()) { + appendFiles.appendFile(dataFile); + s1FilePath.add(TableFileUtils.getUriPath(dataFile.path().toString())); + } + appendFiles.commit(); + + // DeleteFiles + DataFile dataFile = result.dataFiles()[0]; + SortedPosDeleteWriter posDeleteWriter = GenericTaskWriters.builderFor(getArcticTable().asKeyedTable()) + .withTransactionId(1L).buildBasePosDeleteWriter(2, 1, dataFile.partition()); + + posDeleteWriter.delete(dataFile.path(), 1); + posDeleteWriter.delete(dataFile.path(), 3); + posDeleteWriter.delete(dataFile.path(), 5); + List posDeleteResult = posDeleteWriter.complete(); + Assert.assertEquals(1, posDeleteResult.size()); + RowDelta rowDelta = getArcticTable().asKeyedTable().baseTable().newRowDelta(); + for (DeleteFile deleteFile : posDeleteResult) { + rowDelta.addDeletes(deleteFile); + s1FilePath.add(TableFileUtils.getUriPath(deleteFile.path().toString())); + } + rowDelta.commit(); + + Assert.assertEquals(s1FilePath, UnKeyedTableUtil.getAllContentFilePath(getArcticTable().asKeyedTable().baseTable())); + } + + @Test + public void testGetAllContentFilePathWithDelete() throws Exception { + GenericBaseTaskWriter writer = GenericTaskWriters.builderFor(getArcticTable().asKeyedTable()) + .withTransactionId(1L).buildBaseWriter(); + + for (Record record : writeRecords()) { + writer.write(record); + } + + Set s1FilePath = new HashSet<>(); + WriteResult result = writer.complete(); + AppendFiles appendFiles = getArcticTable().asKeyedTable().baseTable().newAppend(); + for (DataFile dataFile : result.dataFiles()) { + appendFiles.appendFile(dataFile); + s1FilePath.add(TableFileUtils.getUriPath(dataFile.path().toString())); + } + appendFiles.commit(); + + DeleteFiles deleteFile = getArcticTable().asKeyedTable().baseTable().newDelete(); + deleteFile.deleteFile(result.dataFiles()[0]).commit(); + Assert.assertEquals(s1FilePath, UnKeyedTableUtil.getAllContentFilePath(getArcticTable().asKeyedTable().baseTable())); + } + + @Test + public void testGetAllContentFilePathWithExpire() throws Exception { + GenericBaseTaskWriter writer = GenericTaskWriters.builderFor(getArcticTable().asKeyedTable()) + .withTransactionId(1L).buildBaseWriter(); + + for (Record record : writeRecords()) { + writer.write(record); + } + + Set s1FilePath = new HashSet<>(); + WriteResult result = writer.complete(); + AppendFiles appendFiles = getArcticTable().asKeyedTable().baseTable().newAppend(); + for (DataFile dataFile : result.dataFiles()) { + appendFiles.appendFile(dataFile); + s1FilePath.add(TableFileUtils.getUriPath(dataFile.path().toString())); + } + appendFiles.commit(); + + DeleteFiles deleteFile = getArcticTable().asKeyedTable().baseTable().newDelete(); + deleteFile.deleteFile(result.dataFiles()[0]).commit(); + + Assert.assertEquals(s1FilePath, UnKeyedTableUtil.getAllContentFilePath(getArcticTable().asKeyedTable().baseTable())); + getArcticTable().asKeyedTable().baseTable().newAppend().commit(); + getArcticTable().asKeyedTable().baseTable().expireSnapshots() + .retainLast(1).expireOlderThan(System.currentTimeMillis()).cleanExpiredFiles(true).commit(); + + s1FilePath.remove(TableFileUtils.getUriPath(result.dataFiles()[0].path().toString())); + Assert.assertEquals(s1FilePath, UnKeyedTableUtil.getAllContentFilePath(getArcticTable().asKeyedTable().baseTable())); + } + + private List writeRecords() { + ImmutableList.Builder builder = ImmutableList.builder(); + builder.add(DataTestHelpers.createRecord(2, "lily", 0, "2022-01-01T12:00:00")); + builder.add(DataTestHelpers.createRecord(3, "jake", 0, "2022-02-01T23:00:00")); + builder.add(DataTestHelpers.createRecord(4, "sam", 0, "2022-02-01T06:00:00")); + builder.add(DataTestHelpers.createRecord(5, "john", 0, "2022-01-01T12:00:00")); + + return builder.build(); + } +}