From 9d2751196e75362228145d71d198182d7bf53b3a Mon Sep 17 00:00:00 2001 From: wangtao Date: Wed, 1 Mar 2023 16:12:53 +0800 Subject: [PATCH 1/4] the last snapshot with flink.max-committed-checkpoint-id should not be expired --- .../service/impl/TableExpireService.java | 24 +++++++++++++++---- .../server/optimize/TestExpiredFileClean.java | 20 ++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index 3fe659bb13..05e2dc0b73 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -45,6 +45,8 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +139,8 @@ public void run() { // in the base store Set baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable); baseExclude.addAll(finalHiveLocation); - expireSnapshots(baseTable, startTime - baseSnapshotsKeepTime, baseExclude); + long baseOlderThan = getExpireTime(baseTable, startTime - baseSnapshotsKeepTime); + expireSnapshots(baseTable, baseOlderThan, baseExclude); long baseCleanedTime = System.currentTimeMillis(); LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); @@ -151,14 +154,16 @@ public void run() { // in the change store Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); changeExclude.addAll(finalHiveLocation); - expireSnapshots(changeTable, startTime - changeSnapshotsKeepTime, changeExclude); + long changeOlderThan = getExpireTime(changeTable, startTime - changeSnapshotsKeepTime); + expireSnapshots(changeTable, changeOlderThan, changeExclude); return null; }); LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(), System.currentTimeMillis() - startTime); } else { UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable(); - expireSnapshots(unKeyedArcticTable, startTime - baseSnapshotsKeepTime, hiveLocation); + long baseOlderThan = getExpireTime(unKeyedArcticTable, startTime - baseSnapshotsKeepTime); + expireSnapshots(unKeyedArcticTable, baseOlderThan, hiveLocation); long baseCleanedTime = System.currentTimeMillis(); LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); } @@ -168,6 +173,17 @@ public void run() { } } + public static long getExpireTime(UnkeyedTable table, long olderThan) { + ArrayList snapshots = Lists.newArrayList(table.snapshots()); + for (int i = snapshots.size() - 1; i >= 0; i--) { + Snapshot snapshot = snapshots.get(i); + if (snapshot.summary().containsKey("flink.max-committed-checkpoint-id")) { + return Math.min(snapshot.timestampMillis(), olderThan); + } + } + return olderThan; + } + public static void deleteChangeFile(KeyedTable keyedTable, List changeDataFiles) { if (CollectionUtils.isEmpty(changeDataFiles)) { return; @@ -229,7 +245,7 @@ public static void deleteChangeFile(KeyedTable keyedTable, List ch public static void expireSnapshots(UnkeyedTable arcticInternalTable, long olderThan, Set exclude) { - LOG.debug("start expire snapshots, the exclude is {}", exclude); + LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); final AtomicInteger deleteFiles = new AtomicInteger(0); Set parentDirectory = new HashSet<>(); diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java index 90e1819eaa..f80bea586d 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java @@ -124,6 +124,26 @@ public void testExpiredChangeTableFilesInBase() throws Exception { Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(1).path())); } + @Test + public void testGetExpireTime() throws IOException { + insertChangeDataFiles(1); + insertChangeDataFiles(2); + long olderThan = System.currentTimeMillis() - 100; + Assert.assertEquals(olderThan, TableExpireService.getExpireTime(testKeyedTable.changeTable(), olderThan)); + + AppendFiles appendFiles = testKeyedTable.changeTable().newAppend(); + appendFiles.set("flink.max-committed-checkpoint-id", "100"); + appendFiles.commit(); + long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Assert.assertTrue(checkpointTime > olderThan); + Assert.assertEquals(olderThan, TableExpireService.getExpireTime(testKeyedTable.changeTable(), olderThan)); + + insertChangeDataFiles(2); + olderThan = testKeyedTable.changeTable().currentSnapshot().timestampMillis() + 100; + Assert.assertTrue(checkpointTime < olderThan); + Assert.assertEquals(checkpointTime, TableExpireService.getExpireTime(testKeyedTable.changeTable(), olderThan)); + } + private List insertChangeDataFiles(long transactionId) throws IOException { GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable) .withChangeAction(ChangeAction.INSERT) From a55ae2bf25c6354fac5919775545c8eefa2c3f5f Mon Sep 17 00:00:00 2001 From: wangtao Date: Wed, 1 Mar 2023 20:54:57 +0800 Subject: [PATCH 2/4] 1.refactor TableExpireServicec for unit test 2.refactor method fetchLatestFlinkCommittedSnapshotTime --- .../service/impl/TableExpireService.java | 147 +++++++++--------- .../server/optimize/TestExpiredFileClean.java | 27 +++- 2 files changed, 96 insertions(+), 78 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index 05e2dc0b73..cced41fe3f 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -46,7 +46,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,10 +91,7 @@ public static class TableExpireTask implements ScheduledTasks.Task { @Override public void run() { - long startTime = System.currentTimeMillis(); - final String traceId = UUID.randomUUID().toString(); try { - LOG.info("[{}] {} start expire", traceId, tableIdentifier); ArcticCatalog catalog = CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog()); ArcticTable arcticTable = catalog.loadTable(tableIdentifier); @@ -105,83 +101,94 @@ public void run() { if (!needClean) { return; } - long changeDataTTL = Long.parseLong(arcticTable.properties() - .getOrDefault(TableProperties.CHANGE_DATA_TTL, - TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000; - long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties() - .getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; - long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties() - .getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, - TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; - - Set hiveLocation = new HashSet<>(); - if (TableTypeUtil.isHive(arcticTable)) { - hiveLocation = HiveLocationUtils.getHiveLocation(arcticTable); - } + expireArcticTable(arcticTable); + } catch (Throwable t) { + LOG.error("unexpected expire error of table {} ", tableIdentifier, t); + } + } + } - if (arcticTable.isKeyedTable()) { - KeyedTable keyedArcticTable = arcticTable.asKeyedTable(); - Set finalHiveLocation = hiveLocation; - keyedArcticTable.io().doAs(() -> { - UnkeyedTable baseTable = keyedArcticTable.baseTable(); - if (baseTable == null) { - LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier); - return null; - } - UnkeyedTable changeTable = keyedArcticTable.changeTable(); - if (changeTable == null) { - LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier); - return null; - } + public static void expireArcticTable(ArcticTable arcticTable) { + TableIdentifier tableIdentifier = arcticTable.id(); + final String traceId = UUID.randomUUID().toString(); + long startTime = System.currentTimeMillis(); + LOG.info("[{}] {} start expire", traceId, tableIdentifier); - // get valid files in the change store which shouldn't physically delete when expire the snapshot - // in the base store - Set baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable); - baseExclude.addAll(finalHiveLocation); - long baseOlderThan = getExpireTime(baseTable, startTime - baseSnapshotsKeepTime); - expireSnapshots(baseTable, baseOlderThan, baseExclude); - long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); + long changeDataTTL = Long.parseLong(arcticTable.properties() + .getOrDefault(TableProperties.CHANGE_DATA_TTL, + TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000; + long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties() + .getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, + TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; + long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties() + .getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, + TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; - // delete ttl files - List changeDataFiles = ServiceContainer.getFileInfoCacheService() - .getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(), - System.currentTimeMillis() - changeDataTTL); - deleteChangeFile(keyedArcticTable, changeDataFiles); + Set hiveLocation = new HashSet<>(); + if (TableTypeUtil.isHive(arcticTable)) { + hiveLocation = HiveLocationUtils.getHiveLocation(arcticTable); + } - // get valid files in the base store which shouldn't physically delete when expire the snapshot - // in the change store - Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); - changeExclude.addAll(finalHiveLocation); - long changeOlderThan = getExpireTime(changeTable, startTime - changeSnapshotsKeepTime); - expireSnapshots(changeTable, changeOlderThan, changeExclude); - return null; - }); - LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(), - System.currentTimeMillis() - startTime); - } else { - UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable(); - long baseOlderThan = getExpireTime(unKeyedArcticTable, startTime - baseSnapshotsKeepTime); - expireSnapshots(unKeyedArcticTable, baseOlderThan, hiveLocation); - long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); + if (arcticTable.isKeyedTable()) { + KeyedTable keyedArcticTable = arcticTable.asKeyedTable(); + Set finalHiveLocation = hiveLocation; + keyedArcticTable.io().doAs(() -> { + UnkeyedTable baseTable = keyedArcticTable.baseTable(); + if (baseTable == null) { + LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier); + return null; } - } catch (Throwable t) { - LOG.error("[" + traceId + "] unexpected expire error of table " + tableIdentifier, t); - } + UnkeyedTable changeTable = keyedArcticTable.changeTable(); + if (changeTable == null) { + LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier); + return null; + } + + // get valid files in the change store which shouldn't physically delete when expire the snapshot + // in the base store + Set baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable); + baseExclude.addAll(finalHiveLocation); + long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable); + expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime), + baseExclude); + long baseCleanedTime = System.currentTimeMillis(); + LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); + + // delete ttl files + List changeDataFiles = ServiceContainer.getFileInfoCacheService() + .getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(), + System.currentTimeMillis() - changeDataTTL); + deleteChangeFile(keyedArcticTable, changeDataFiles); + + // get valid files in the base store which shouldn't physically delete when expire the snapshot + // in the change store + Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); + changeExclude.addAll(finalHiveLocation); + long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable); + expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime), + changeExclude); + return null; + }); + LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(), + System.currentTimeMillis() - startTime); + } else { + UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable(); + long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable); + expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime), + hiveLocation); + long baseCleanedTime = System.currentTimeMillis(); + LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); } } - public static long getExpireTime(UnkeyedTable table, long olderThan) { - ArrayList snapshots = Lists.newArrayList(table.snapshots()); - for (int i = snapshots.size() - 1; i >= 0; i--) { - Snapshot snapshot = snapshots.get(i); + public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) { + long latestCommitTime = Long.MAX_VALUE; + for (Snapshot snapshot : table.snapshots()) { if (snapshot.summary().containsKey("flink.max-committed-checkpoint-id")) { - return Math.min(snapshot.timestampMillis(), olderThan); + latestCommitTime = snapshot.timestampMillis(); } } - return olderThan; + return latestCommitTime; } public static void deleteChangeFile(KeyedTable keyedTable, List changeDataFiles) { diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java index f80bea586d..083291ec25 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java @@ -125,23 +125,34 @@ public void testExpiredChangeTableFilesInBase() throws Exception { } @Test - public void testGetExpireTime() throws IOException { + public void testNotExpireFlinkLatestCommit() throws IOException { insertChangeDataFiles(1); insertChangeDataFiles(2); - long olderThan = System.currentTimeMillis() - 100; - Assert.assertEquals(olderThan, TableExpireService.getExpireTime(testKeyedTable.changeTable(), olderThan)); + Assert.assertEquals(Long.MAX_VALUE, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); AppendFiles appendFiles = testKeyedTable.changeTable().newAppend(); appendFiles.set("flink.max-committed-checkpoint-id", "100"); appendFiles.commit(); long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); - Assert.assertTrue(checkpointTime > olderThan); - Assert.assertEquals(olderThan, TableExpireService.getExpireTime(testKeyedTable.changeTable(), olderThan)); + Assert.assertEquals(checkpointTime, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + AppendFiles appendFiles2 = testKeyedTable.changeTable().newAppend(); + appendFiles2.set("flink.max-committed-checkpoint-id", "101"); + appendFiles2.commit(); + long checkpointTime2 = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime2, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); insertChangeDataFiles(2); - olderThan = testKeyedTable.changeTable().currentSnapshot().timestampMillis() + 100; - Assert.assertTrue(checkpointTime < olderThan); - Assert.assertEquals(checkpointTime, TableExpireService.getExpireTime(testKeyedTable.changeTable(), olderThan)); + Assert.assertEquals(checkpointTime2, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + testKeyedTable.updateProperties().set(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + TableExpireService.expireArcticTable(testKeyedTable); + + Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots())); } private List insertChangeDataFiles(long transactionId) throws IOException { From 1c6652e2e34c8bfa08cad153293b4b82ced433f5 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 2 Mar 2023 15:05:41 +0800 Subject: [PATCH 3/4] remove the use of traceId change to hiveLocations remove baseTable == null, changeTable == null change to baseExcludePaths add some comment for fetchLatestFlinkCommittedSnapshotTime import a static variable for flink.max-committed-checkpoint-id --- .../service/impl/SupportHiveSyncService.java | 25 +++++----- .../service/impl/TableExpireService.java | 47 ++++++++++--------- .../server/optimize/TestExpiredFileClean.java | 4 +- .../service/TestSupportHiveSyncService.java | 12 ++--- 4 files changed, 43 insertions(+), 45 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java index a846168fa0..c7c4c66579 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java @@ -54,7 +54,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; public class SupportHiveSyncService implements ISupportHiveSyncService { @@ -89,33 +88,32 @@ public static class SupportHiveSyncTask implements ScheduledTasks.Task { @Override public void run() { long startTime = System.currentTimeMillis(); - final String traceId = UUID.randomUUID().toString(); try { - LOG.info("[{}] {} start hive sync", traceId, tableIdentifier); + LOG.info("{} start hive sync", tableIdentifier); ArcticCatalog catalog = CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog()); ArcticTable arcticTable = catalog.loadTable(tableIdentifier); if (!TableTypeUtil.isHive(arcticTable)) { - LOG.debug("[{}] {} is not a support hive table", traceId, tableIdentifier); + LOG.debug("{} is not a support hive table", tableIdentifier); return; } - syncIcebergToHive(arcticTable, traceId); + syncIcebergToHive(arcticTable); } catch (Exception e) { - LOG.error("[{}] {} hive sync failed", traceId, tableIdentifier, e); + LOG.error("{} hive sync failed", tableIdentifier, e); } finally { - LOG.info("[{}] {} hive sync finished, cost {}ms", traceId, tableIdentifier, + LOG.info("{} hive sync finished, cost {}ms", tableIdentifier, System.currentTimeMillis() - startTime); } } - public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) throws Exception { + public static void syncIcebergToHive(ArcticTable arcticTable) throws Exception { UnkeyedTable baseTable = arcticTable.isKeyedTable() ? arcticTable.asKeyedTable().baseTable() : arcticTable.asUnkeyedTable(); StructLikeMap> partitionProperty = baseTable.partitionProperty(); if (arcticTable.spec().isUnpartitioned()) { - syncNoPartitionTable(arcticTable, partitionProperty, traceId); + syncNoPartitionTable(arcticTable, partitionProperty); } else { syncPartitionTable(arcticTable, partitionProperty); } @@ -126,11 +124,10 @@ public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) th * because only arctic update hive table location for unPartitioned table. */ private static void syncNoPartitionTable(ArcticTable arcticTable, - StructLikeMap> partitionProperty, - String traceId) { + StructLikeMap> partitionProperty) { Map property = partitionProperty.get(TablePropertyUtil.EMPTY_STRUCT); if (property == null || property.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION) == null) { - LOG.debug("[{}] {} has no hive location in partition property", traceId, arcticTable.id()); + LOG.debug("{} has no hive location in partition property", arcticTable.id()); return; } @@ -142,7 +139,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable, return hiveTable.getSd().getLocation(); }); } catch (Exception e) { - LOG.error("[{}] {} get hive location failed", traceId, arcticTable.id(), e); + LOG.error("{} get hive location failed", arcticTable.id(), e); return; } @@ -155,7 +152,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable, return null; }); } catch (Exception e) { - LOG.error("[{}] {} alter hive location failed", traceId, arcticTable.id(), e); + LOG.error("{} alter hive location failed", arcticTable.id(), e); } } } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index cced41fe3f..8a287ab870 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -57,13 +57,16 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; public class TableExpireService implements ITableExpireService { private static final Logger LOG = LoggerFactory.getLogger(TableExpireService.class); private static final long EXPIRE_INTERVAL = 3600_000; // 1 hour + /** + * the same with org.apache.iceberg.flink.sink.IcebergFilesCommitter#MAX_COMMITTED_CHECKPOINT_ID + */ + public static final String FLINK_MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private ScheduledTasks cleanTasks; @@ -110,9 +113,8 @@ public void run() { public static void expireArcticTable(ArcticTable arcticTable) { TableIdentifier tableIdentifier = arcticTable.id(); - final String traceId = UUID.randomUUID().toString(); long startTime = System.currentTimeMillis(); - LOG.info("[{}] {} start expire", traceId, tableIdentifier); + LOG.info("{} start expire", tableIdentifier); long changeDataTTL = Long.parseLong(arcticTable.properties() .getOrDefault(TableProperties.CHANGE_DATA_TTL, @@ -124,35 +126,27 @@ public static void expireArcticTable(ArcticTable arcticTable) { .getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; - Set hiveLocation = new HashSet<>(); + Set hiveLocations = new HashSet<>(); if (TableTypeUtil.isHive(arcticTable)) { - hiveLocation = HiveLocationUtils.getHiveLocation(arcticTable); + hiveLocations = HiveLocationUtils.getHiveLocation(arcticTable); } if (arcticTable.isKeyedTable()) { KeyedTable keyedArcticTable = arcticTable.asKeyedTable(); - Set finalHiveLocation = hiveLocation; + Set finalHiveLocations = hiveLocations; keyedArcticTable.io().doAs(() -> { UnkeyedTable baseTable = keyedArcticTable.baseTable(); - if (baseTable == null) { - LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier); - return null; - } UnkeyedTable changeTable = keyedArcticTable.changeTable(); - if (changeTable == null) { - LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier); - return null; - } // get valid files in the change store which shouldn't physically delete when expire the snapshot // in the base store - Set baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable); - baseExclude.addAll(finalHiveLocation); + Set baseExcludePaths = UnKeyedTableUtil.getAllContentFilePath(changeTable); + baseExcludePaths.addAll(finalHiveLocations); long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable); expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime), - baseExclude); + baseExcludePaths); long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); + LOG.info("{} base expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime); // delete ttl files List changeDataFiles = ServiceContainer.getFileInfoCacheService() @@ -163,28 +157,35 @@ public static void expireArcticTable(ArcticTable arcticTable) { // get valid files in the base store which shouldn't physically delete when expire the snapshot // in the change store Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); - changeExclude.addAll(finalHiveLocation); + changeExclude.addAll(finalHiveLocations); long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable); expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime), changeExclude); return null; }); - LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(), + LOG.info("{} expire cost total {} ms", arcticTable.id(), System.currentTimeMillis() - startTime); } else { UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable(); long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable); expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime), - hiveLocation); + hiveLocations); long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); + LOG.info("{} unKeyedTable expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime); } } + /** + * When committing a snapshot, Flink will write a checkpoint id into the snapshot summary. + * The latest snapshot with checkpoint id should not be expired or the flink job can't recover from state. + * + * @param table - + * @return commit time of snapshot with the latest flink checkpointId in summary + */ public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) { long latestCommitTime = Long.MAX_VALUE; for (Snapshot snapshot : table.snapshots()) { - if (snapshot.summary().containsKey("flink.max-committed-checkpoint-id")) { + if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) { latestCommitTime = snapshot.timestampMillis(); } } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java index 083291ec25..5066860f46 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java @@ -132,14 +132,14 @@ public void testNotExpireFlinkLatestCommit() throws IOException { TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); AppendFiles appendFiles = testKeyedTable.changeTable().newAppend(); - appendFiles.set("flink.max-committed-checkpoint-id", "100"); + appendFiles.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "100"); appendFiles.commit(); long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); Assert.assertEquals(checkpointTime, TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); AppendFiles appendFiles2 = testKeyedTable.changeTable().newAppend(); - appendFiles2.set("flink.max-committed-checkpoint-id", "101"); + appendFiles2.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "101"); appendFiles2.commit(); long checkpointTime2 = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); Assert.assertEquals(checkpointTime2, diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java index 3112ccb8af..f7ad15f580 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java @@ -65,7 +65,7 @@ public void testUnPartitionTableSyncInIceberg() throws Exception { }); Assert.assertNotEquals(newLocation, hiveLocation); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable); hiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> { Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(), testUnPartitionKeyedHiveTable.id().getTableName()); @@ -85,7 +85,7 @@ public void testUnPartitionTableSyncNotInIceberg() throws Exception { return hiveTable.getSd().getLocation(); }); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable); String newHiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> { Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(), testUnPartitionKeyedHiveTable.id().getTableName()); @@ -110,7 +110,7 @@ public void testSyncOnlyInIceberg() throws Exception { client.getPartition(testKeyedHiveTable.id().getDatabase(), testKeyedHiveTable.id().getTableName(), partitionValues))); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); Partition hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), testKeyedHiveTable.id().getTableName(), partitionValues)); @@ -160,7 +160,7 @@ public void testSyncOnlyInHiveCreateByArctic() throws Exception { testKeyedHiveTable.id().getTableName(), partitionValues)); Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); Assert.assertThrows(NoSuchObjectException.class, () -> ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), @@ -209,7 +209,7 @@ public void testSyncOnlyInHiveCreateNotByArctic() throws Exception { testKeyedHiveTable.id().getTableName(), partitionValues)); Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), @@ -266,7 +266,7 @@ public void testSyncInBoth() throws Exception { .commit(); Assert.assertNotEquals(newPartitionLocation, hivePartition.getSd().getLocation()); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), From d971b80e71a6503664d19b019b5c570a0022073a Mon Sep 17 00:00:00 2001 From: wangtao Date: Sat, 4 Mar 2023 16:57:42 +0800 Subject: [PATCH 4/4] add some comment for optimizer --- .../arctic/optimizer/operator/executor/Executor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java index bd96ac80c8..c400484f8e 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java @@ -19,6 +19,12 @@ package com.netease.arctic.optimizer.operator.executor; public interface Executor { + /** + * Execute and return the execute result. + * + * @return OptimizeTaskResult - only the Prepared task. + * @throws Exception - exception + */ OptimizeTaskResult execute() throws Exception; void close();