diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java index a846168fa0..c7c4c66579 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java @@ -54,7 +54,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; public class SupportHiveSyncService implements ISupportHiveSyncService { @@ -89,33 +88,32 @@ public static class SupportHiveSyncTask implements ScheduledTasks.Task { @Override public void run() { long startTime = System.currentTimeMillis(); - final String traceId = UUID.randomUUID().toString(); try { - LOG.info("[{}] {} start hive sync", traceId, tableIdentifier); + LOG.info("{} start hive sync", tableIdentifier); ArcticCatalog catalog = CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog()); ArcticTable arcticTable = catalog.loadTable(tableIdentifier); if (!TableTypeUtil.isHive(arcticTable)) { - LOG.debug("[{}] {} is not a support hive table", traceId, tableIdentifier); + LOG.debug("{} is not a support hive table", tableIdentifier); return; } - syncIcebergToHive(arcticTable, traceId); + syncIcebergToHive(arcticTable); } catch (Exception e) { - LOG.error("[{}] {} hive sync failed", traceId, tableIdentifier, e); + LOG.error("{} hive sync failed", tableIdentifier, e); } finally { - LOG.info("[{}] {} hive sync finished, cost {}ms", traceId, tableIdentifier, + LOG.info("{} hive sync finished, cost {}ms", tableIdentifier, System.currentTimeMillis() - startTime); } } - public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) throws Exception { + public static void syncIcebergToHive(ArcticTable arcticTable) throws Exception { UnkeyedTable baseTable = arcticTable.isKeyedTable() ? arcticTable.asKeyedTable().baseTable() : arcticTable.asUnkeyedTable(); StructLikeMap> partitionProperty = baseTable.partitionProperty(); if (arcticTable.spec().isUnpartitioned()) { - syncNoPartitionTable(arcticTable, partitionProperty, traceId); + syncNoPartitionTable(arcticTable, partitionProperty); } else { syncPartitionTable(arcticTable, partitionProperty); } @@ -126,11 +124,10 @@ public static void syncIcebergToHive(ArcticTable arcticTable, String traceId) th * because only arctic update hive table location for unPartitioned table. */ private static void syncNoPartitionTable(ArcticTable arcticTable, - StructLikeMap> partitionProperty, - String traceId) { + StructLikeMap> partitionProperty) { Map property = partitionProperty.get(TablePropertyUtil.EMPTY_STRUCT); if (property == null || property.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_HIVE_LOCATION) == null) { - LOG.debug("[{}] {} has no hive location in partition property", traceId, arcticTable.id()); + LOG.debug("{} has no hive location in partition property", arcticTable.id()); return; } @@ -142,7 +139,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable, return hiveTable.getSd().getLocation(); }); } catch (Exception e) { - LOG.error("[{}] {} get hive location failed", traceId, arcticTable.id(), e); + LOG.error("{} get hive location failed", arcticTable.id(), e); return; } @@ -155,7 +152,7 @@ private static void syncNoPartitionTable(ArcticTable arcticTable, return null; }); } catch (Exception e) { - LOG.error("[{}] {} alter hive location failed", traceId, arcticTable.id(), e); + LOG.error("{} alter hive location failed", arcticTable.id(), e); } } } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index 3fe659bb13..8a287ab870 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -45,6 +45,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,13 +57,16 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; public class TableExpireService implements ITableExpireService { private static final Logger LOG = LoggerFactory.getLogger(TableExpireService.class); private static final long EXPIRE_INTERVAL = 3600_000; // 1 hour + /** + * the same with org.apache.iceberg.flink.sink.IcebergFilesCommitter#MAX_COMMITTED_CHECKPOINT_ID + */ + public static final String FLINK_MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; private ScheduledTasks cleanTasks; @@ -90,10 +94,7 @@ public static class TableExpireTask implements ScheduledTasks.Task { @Override public void run() { - long startTime = System.currentTimeMillis(); - final String traceId = UUID.randomUUID().toString(); try { - LOG.info("[{}] {} start expire", traceId, tableIdentifier); ArcticCatalog catalog = CatalogLoader.load(ServiceContainer.getTableMetastoreHandler(), tableIdentifier.getCatalog()); ArcticTable arcticTable = catalog.loadTable(tableIdentifier); @@ -103,69 +104,92 @@ public void run() { if (!needClean) { return; } - long changeDataTTL = Long.parseLong(arcticTable.properties() - .getOrDefault(TableProperties.CHANGE_DATA_TTL, - TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000; - long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties() - .getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; - long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties() - .getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, - TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; - - Set hiveLocation = new HashSet<>(); - if (TableTypeUtil.isHive(arcticTable)) { - hiveLocation = HiveLocationUtils.getHiveLocation(arcticTable); - } + expireArcticTable(arcticTable); + } catch (Throwable t) { + LOG.error("unexpected expire error of table {} ", tableIdentifier, t); + } + } + } - if (arcticTable.isKeyedTable()) { - KeyedTable keyedArcticTable = arcticTable.asKeyedTable(); - Set finalHiveLocation = hiveLocation; - keyedArcticTable.io().doAs(() -> { - UnkeyedTable baseTable = keyedArcticTable.baseTable(); - if (baseTable == null) { - LOG.warn("[{}] Base table is null: {} ", traceId, tableIdentifier); - return null; - } - UnkeyedTable changeTable = keyedArcticTable.changeTable(); - if (changeTable == null) { - LOG.warn("[{}] Change table is null: {}", traceId, tableIdentifier); - return null; - } + public static void expireArcticTable(ArcticTable arcticTable) { + TableIdentifier tableIdentifier = arcticTable.id(); + long startTime = System.currentTimeMillis(); + LOG.info("{} start expire", tableIdentifier); - // get valid files in the change store which shouldn't physically delete when expire the snapshot - // in the base store - Set baseExclude = UnKeyedTableUtil.getAllContentFilePath(changeTable); - baseExclude.addAll(finalHiveLocation); - expireSnapshots(baseTable, startTime - baseSnapshotsKeepTime, baseExclude); - long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} base expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); - - // delete ttl files - List changeDataFiles = ServiceContainer.getFileInfoCacheService() - .getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(), - System.currentTimeMillis() - changeDataTTL); - deleteChangeFile(keyedArcticTable, changeDataFiles); - - // get valid files in the base store which shouldn't physically delete when expire the snapshot - // in the change store - Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); - changeExclude.addAll(finalHiveLocation); - expireSnapshots(changeTable, startTime - changeSnapshotsKeepTime, changeExclude); - return null; - }); - LOG.info("[{}] {} expire cost total {} ms", traceId, arcticTable.id(), - System.currentTimeMillis() - startTime); - } else { - UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable(); - expireSnapshots(unKeyedArcticTable, startTime - baseSnapshotsKeepTime, hiveLocation); - long baseCleanedTime = System.currentTimeMillis(); - LOG.info("[{}] {} unKeyedTable expire cost {} ms", traceId, arcticTable.id(), baseCleanedTime - startTime); - } - } catch (Throwable t) { - LOG.error("[" + traceId + "] unexpected expire error of table " + tableIdentifier, t); + long changeDataTTL = Long.parseLong(arcticTable.properties() + .getOrDefault(TableProperties.CHANGE_DATA_TTL, + TableProperties.CHANGE_DATA_TTL_DEFAULT)) * 60 * 1000; + long baseSnapshotsKeepTime = Long.parseLong(arcticTable.properties() + .getOrDefault(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, + TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; + long changeSnapshotsKeepTime = Long.parseLong(arcticTable.properties() + .getOrDefault(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, + TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) * 60 * 1000; + + Set hiveLocations = new HashSet<>(); + if (TableTypeUtil.isHive(arcticTable)) { + hiveLocations = HiveLocationUtils.getHiveLocation(arcticTable); + } + + if (arcticTable.isKeyedTable()) { + KeyedTable keyedArcticTable = arcticTable.asKeyedTable(); + Set finalHiveLocations = hiveLocations; + keyedArcticTable.io().doAs(() -> { + UnkeyedTable baseTable = keyedArcticTable.baseTable(); + UnkeyedTable changeTable = keyedArcticTable.changeTable(); + + // get valid files in the change store which shouldn't physically delete when expire the snapshot + // in the base store + Set baseExcludePaths = UnKeyedTableUtil.getAllContentFilePath(changeTable); + baseExcludePaths.addAll(finalHiveLocations); + long latestBaseFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(baseTable); + expireSnapshots(baseTable, Math.min(latestBaseFlinkCommitTime, startTime - baseSnapshotsKeepTime), + baseExcludePaths); + long baseCleanedTime = System.currentTimeMillis(); + LOG.info("{} base expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime); + + // delete ttl files + List changeDataFiles = ServiceContainer.getFileInfoCacheService() + .getChangeTableTTLDataFiles(keyedArcticTable.id().buildTableIdentifier(), + System.currentTimeMillis() - changeDataTTL); + deleteChangeFile(keyedArcticTable, changeDataFiles); + + // get valid files in the base store which shouldn't physically delete when expire the snapshot + // in the change store + Set changeExclude = UnKeyedTableUtil.getAllContentFilePath(baseTable); + changeExclude.addAll(finalHiveLocations); + long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(changeTable); + expireSnapshots(changeTable, Math.min(latestChangeFlinkCommitTime, startTime - changeSnapshotsKeepTime), + changeExclude); + return null; + }); + LOG.info("{} expire cost total {} ms", arcticTable.id(), + System.currentTimeMillis() - startTime); + } else { + UnkeyedTable unKeyedArcticTable = arcticTable.asUnkeyedTable(); + long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unKeyedArcticTable); + expireSnapshots(unKeyedArcticTable, Math.min(latestFlinkCommitTime, startTime - baseSnapshotsKeepTime), + hiveLocations); + long baseCleanedTime = System.currentTimeMillis(); + LOG.info("{} unKeyedTable expire cost {} ms", arcticTable.id(), baseCleanedTime - startTime); + } + } + + /** + * When committing a snapshot, Flink will write a checkpoint id into the snapshot summary. + * The latest snapshot with checkpoint id should not be expired or the flink job can't recover from state. + * + * @param table - + * @return commit time of snapshot with the latest flink checkpointId in summary + */ + public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) { + long latestCommitTime = Long.MAX_VALUE; + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) { + latestCommitTime = snapshot.timestampMillis(); } } + return latestCommitTime; } public static void deleteChangeFile(KeyedTable keyedTable, List changeDataFiles) { @@ -229,7 +253,7 @@ public static void deleteChangeFile(KeyedTable keyedTable, List ch public static void expireSnapshots(UnkeyedTable arcticInternalTable, long olderThan, Set exclude) { - LOG.debug("start expire snapshots, the exclude is {}", exclude); + LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); final AtomicInteger deleteFiles = new AtomicInteger(0); Set parentDirectory = new HashSet<>(); diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java index 90e1819eaa..5066860f46 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestExpiredFileClean.java @@ -124,6 +124,37 @@ public void testExpiredChangeTableFilesInBase() throws Exception { Assert.assertFalse(testKeyedTable.io().exists((String) s1Files.get(1).path())); } + @Test + public void testNotExpireFlinkLatestCommit() throws IOException { + insertChangeDataFiles(1); + insertChangeDataFiles(2); + Assert.assertEquals(Long.MAX_VALUE, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + AppendFiles appendFiles = testKeyedTable.changeTable().newAppend(); + appendFiles.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "100"); + appendFiles.commit(); + long checkpointTime = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + AppendFiles appendFiles2 = testKeyedTable.changeTable().newAppend(); + appendFiles2.set(TableExpireService.FLINK_MAX_COMMITTED_CHECKPOINT_ID, "101"); + appendFiles2.commit(); + long checkpointTime2 = testKeyedTable.changeTable().currentSnapshot().timestampMillis(); + Assert.assertEquals(checkpointTime2, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + insertChangeDataFiles(2); + Assert.assertEquals(checkpointTime2, + TableExpireService.fetchLatestFlinkCommittedSnapshotTime(testKeyedTable.changeTable())); + + testKeyedTable.updateProperties().set(TableProperties.CHANGE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + TableExpireService.expireArcticTable(testKeyedTable); + + Assert.assertEquals(2, Iterables.size(testKeyedTable.changeTable().snapshots())); + } + private List insertChangeDataFiles(long transactionId) throws IOException { GenericChangeTaskWriter writer = GenericTaskWriters.builderFor(testKeyedTable) .withChangeAction(ChangeAction.INSERT) diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java index 3112ccb8af..f7ad15f580 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/service/TestSupportHiveSyncService.java @@ -65,7 +65,7 @@ public void testUnPartitionTableSyncInIceberg() throws Exception { }); Assert.assertNotEquals(newLocation, hiveLocation); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable); hiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> { Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(), testUnPartitionKeyedHiveTable.id().getTableName()); @@ -85,7 +85,7 @@ public void testUnPartitionTableSyncNotInIceberg() throws Exception { return hiveTable.getSd().getLocation(); }); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testUnPartitionKeyedHiveTable); String newHiveLocation = ((SupportHive) testUnPartitionKeyedHiveTable).getHMSClient().run(client -> { Table hiveTable = client.getTable(testUnPartitionKeyedHiveTable.id().getDatabase(), testUnPartitionKeyedHiveTable.id().getTableName()); @@ -110,7 +110,7 @@ public void testSyncOnlyInIceberg() throws Exception { client.getPartition(testKeyedHiveTable.id().getDatabase(), testKeyedHiveTable.id().getTableName(), partitionValues))); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); Partition hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), testKeyedHiveTable.id().getTableName(), partitionValues)); @@ -160,7 +160,7 @@ public void testSyncOnlyInHiveCreateByArctic() throws Exception { testKeyedHiveTable.id().getTableName(), partitionValues)); Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); Assert.assertThrows(NoSuchObjectException.class, () -> ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), @@ -209,7 +209,7 @@ public void testSyncOnlyInHiveCreateNotByArctic() throws Exception { testKeyedHiveTable.id().getTableName(), partitionValues)); Assert.assertEquals(partitionLocation, hivePartition.getSd().getLocation()); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), @@ -266,7 +266,7 @@ public void testSyncInBoth() throws Exception { .commit(); Assert.assertNotEquals(newPartitionLocation, hivePartition.getSd().getLocation()); - SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable, "UnitTest"); + SupportHiveSyncService.SupportHiveSyncTask.syncIcebergToHive(testKeyedHiveTable); hivePartition = ((SupportHive) testKeyedHiveTable).getHMSClient().run(client -> client.getPartition(testKeyedHiveTable.id().getDatabase(), diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java index bd96ac80c8..c400484f8e 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/Executor.java @@ -19,6 +19,12 @@ package com.netease.arctic.optimizer.operator.executor; public interface Executor { + /** + * Execute and return the execute result. + * + * @return OptimizeTaskResult - only the Prepared task. + * @throws Exception - exception + */ OptimizeTaskResult execute() throws Exception; void close();