From 8fd72547e8ab93a29ad74195ddc0658789c8413c Mon Sep 17 00:00:00 2001 From: wangtaohz <103108928+wangtaohz@users.noreply.github.com> Date: Wed, 6 Dec 2023 13:30:30 +0800 Subject: [PATCH] [AMORO-1720] Fix Mixed Format KeyedTable expiring all the snapshots with optimized sequence (#2394) * should not expire the latest snapshot contains optimized sequence * add visible for testing * add fetchLatestNonOptimizedSnapshotTime for base store * get hive locations return the uri path * refactor codes and fix comments * improve for exclude files is empty for expring snapshots --------- Co-authored-by: ZhouJinsong --- .../maintainer/IcebergTableMaintainer.java | 111 +++++++++--------- .../maintainer/MixedTableMaintainer.java | 86 ++++++++++---- .../arctic/server/utils/IcebergTableUtil.java | 6 +- .../maintainer/TestSnapshotExpire.java | 45 ++++++- 4 files changed, 167 insertions(+), 81 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index 78b0795a9f..2ef29357db 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -20,11 +20,6 @@ import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Strings; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.netease.arctic.ams.api.CommitMetaProducer; import com.netease.arctic.io.ArcticFileIO; import com.netease.arctic.io.PathInfo; @@ -34,8 +29,6 @@ import com.netease.arctic.server.table.TableConfiguration; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.server.utils.IcebergTableUtil; -import com.netease.arctic.table.TableProperties; -import com.netease.arctic.utils.CompatiblePropertyUtil; import com.netease.arctic.utils.TableFileUtil; import org.apache.hadoop.fs.Path; import org.apache.iceberg.ContentFile; @@ -61,6 +54,11 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Optional; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; @@ -152,21 +150,23 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) { @Override public void expireSnapshots(TableRuntime tableRuntime) { - TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration(); - if (!tableConfiguration.isExpireSnapshotEnabled()) { + if (!expireSnapshotEnabled(tableRuntime)) { return; } - expireSnapshots( - olderThanSnapshotNeedToExpire(tableRuntime), expireSnapshotNeedToExcludeFiles()); + expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis())); } - public void expireSnapshots(long mustOlderThan) { - expireSnapshots( - olderThanSnapshotNeedToExpire(mustOlderThan), expireSnapshotNeedToExcludeFiles()); + protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) { + TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration(); + return tableConfiguration.isExpireSnapshotEnabled(); } @VisibleForTesting - public void expireSnapshots(long olderThan, Set exclude) { + void expireSnapshots(long mustOlderThan) { + expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles()); + } + + private void expireSnapshots(long olderThan, Set exclude) { LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); final AtomicInteger deleteFiles = new AtomicInteger(0); @@ -178,10 +178,14 @@ public void expireSnapshots(long olderThan, Set exclude) { .deleteWith( file -> { try { - String filePath = TableFileUtil.getUriPath(file); - if (!exclude.contains(filePath) - && !exclude.contains(new Path(filePath).getParent().toString())) { + if (exclude.isEmpty()) { arcticFileIO().deleteFile(file); + } else { + String fileUriPath = TableFileUtil.getUriPath(file); + if (!exclude.contains(fileUriPath) + && !exclude.contains(new Path(fileUriPath).getParent().toString())) { + arcticFileIO().deleteFile(file); + } } parentDirectory.add(new Path(file).getParent().toString()); deleteFiles.incrementAndGet(); @@ -278,26 +282,20 @@ protected void cleanDanglingDeleteFiles() { LOG.info("{} total delete {} dangling delete files", table.name(), danglingDeleteFilesCnt); } - protected long olderThanSnapshotNeedToExpire(TableRuntime tableRuntime) { - long optimizingSnapshotTime = fetchOptimizingSnapshotTime(table, tableRuntime); - return olderThanSnapshotNeedToExpire(optimizingSnapshotTime); + protected long mustOlderThan(TableRuntime tableRuntime, long now) { + return min( + // The snapshots keep time + now - snapshotsKeepTime(tableRuntime), + // The snapshot optimizing plan based should not be expired for committing + fetchOptimizingPlanSnapshotTime(table, tableRuntime), + // The latest non-optimized snapshot should not be expired for data expiring + fetchLatestNonOptimizedSnapshotTime(table), + // The latest flink committed snapshot should not be expired for recovering flink job + fetchLatestFlinkCommittedSnapshotTime(table)); } - protected long olderThanSnapshotNeedToExpire(long mustOlderThan) { - long baseSnapshotsKeepTime = - CompatiblePropertyUtil.propertyAsLong( - table.properties(), - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT) - * 60 - * 1000; - // Latest checkpoint of flink need retain. If Flink does not continuously commit new snapshots, - // it can lead to issues with table partitions not expiring. - long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table); - // Retain the latest non-optimized snapshot for remember the real latest update - long latestNonOptimizedTime = fetchLatestNonOptimizedSnapshotTime(table); - long olderThan = System.currentTimeMillis() - baseSnapshotsKeepTime; - return min(latestFlinkCommitTime, latestNonOptimizedTime, mustOlderThan, olderThan); + protected long snapshotsKeepTime(TableRuntime tableRuntime) { + return tableRuntime.getTableConfiguration().getSnapshotTTLMinutes() * 60 * 1000; } protected Set expireSnapshotNeedToExcludeFiles() { @@ -382,31 +380,28 @@ private int clearInternalTableDanglingDeleteFiles() { } /** - * When committing a snapshot, Flink will write a checkpoint id into the snapshot summary. The - * latest snapshot with checkpoint id should not be expired or the flink job can't recover from - * state. + * When committing a snapshot, Flink will write a checkpoint id into the snapshot summary, which + * will be used when Flink job recovers from the checkpoint. * - * @param table - - * @return commit time of snapshot with the latest flink checkpointId in summary + * @param table table + * @return commit time of snapshot with the latest flink checkpointId in summary, return + * Long.MAX_VALUE if not exist */ public static long fetchLatestFlinkCommittedSnapshotTime(Table table) { - long latestCommitTime = Long.MAX_VALUE; - for (Snapshot snapshot : table.snapshots()) { - if (snapshot.summary().containsKey(FLINK_MAX_COMMITTED_CHECKPOINT_ID)) { - latestCommitTime = snapshot.timestampMillis(); - } - } - return latestCommitTime; + Snapshot snapshot = findLatestSnapshotContainsKey(table, FLINK_MAX_COMMITTED_CHECKPOINT_ID); + return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis(); } /** - * When optimizing tasks are not committed, the snapshot with which it planned should not be - * expired, since it will use the snapshot to check conflict when committing. + * When the current optimizing process not committed, get the time of snapshot for optimizing + * process planned based. This snapshot will be used when optimizing process committing. * - * @param table - table - * @return commit time of snapshot for optimizing + * @param table table + * @param tableRuntime table runtime + * @return time of snapshot for optimizing process planned based, return Long.MAX_VALUE if no + * optimizing process exists */ - public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRuntime) { + public static long fetchOptimizingPlanSnapshotTime(Table table, TableRuntime tableRuntime) { if (tableRuntime.getOptimizingStatus().isProcessing()) { long fromSnapshotId = tableRuntime.getOptimizingProcess().getTargetSnapshotId(); @@ -419,6 +414,16 @@ public static long fetchOptimizingSnapshotTime(Table table, TableRuntime tableRu return Long.MAX_VALUE; } + public static Snapshot findLatestSnapshotContainsKey(Table table, String summaryKey) { + Snapshot latestSnapshot = null; + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.summary().containsKey(summaryKey)) { + latestSnapshot = snapshot; + } + } + return latestSnapshot; + } + /** * When expiring historic data and `data-expire.since` is `CURRENT_SNAPSHOT`, the latest snapshot * should not be produced by Amoro. diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java index a57807a0c9..31c47e8377 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java @@ -18,8 +18,9 @@ package com.netease.arctic.server.optimizing.maintainer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; +import static com.netease.arctic.utils.ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST; +import static org.apache.iceberg.relocated.com.google.common.primitives.Longs.min; + import com.netease.arctic.IcebergFileEntry; import com.netease.arctic.data.FileNameRules; import com.netease.arctic.hive.utils.TableTypeUtil; @@ -32,10 +33,8 @@ import com.netease.arctic.table.BaseTable; import com.netease.arctic.table.ChangeTable; import com.netease.arctic.table.KeyedTable; -import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; import com.netease.arctic.utils.ArcticTableUtil; -import com.netease.arctic.utils.CompatiblePropertyUtil; import com.netease.arctic.utils.TablePropertyUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -46,9 +45,12 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Longs; import org.apache.iceberg.types.Types; @@ -134,6 +136,7 @@ public void expireSnapshots(TableRuntime tableRuntime) { baseMaintainer.expireSnapshots(tableRuntime); } + @VisibleForTesting protected void expireSnapshots(long mustOlderThan) { if (changeMaintainer != null) { changeMaintainer.expireSnapshots(mustOlderThan); @@ -315,21 +318,29 @@ public Set orphanFileCleanNeedToExcludeFiles() { } @Override - public void expireSnapshots(TableRuntime tableRuntime) { - expireSnapshots(Long.MAX_VALUE); + @VisibleForTesting + void expireSnapshots(long mustOlderThan) { + expireFiles(mustOlderThan); + super.expireSnapshots(mustOlderThan); } @Override - public void expireSnapshots(long mustOlderThan) { - long changeTTLPoint = getChangeTTLPoint(); - expireFiles(Longs.min(getChangeTTLPoint(), mustOlderThan)); - super.expireSnapshots(Longs.min(changeTTLPoint, mustOlderThan)); + public void expireSnapshots(TableRuntime tableRuntime) { + if (!expireSnapshotEnabled(tableRuntime)) { + return; + } + long now = System.currentTimeMillis(); + expireFiles(now - snapshotsKeepTime(tableRuntime)); + expireSnapshots(mustOlderThan(tableRuntime, now)); } @Override - protected long olderThanSnapshotNeedToExpire(long mustOlderThan) { - long latestChangeFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(unkeyedTable); - return Longs.min(latestChangeFlinkCommitTime, mustOlderThan); + protected long mustOlderThan(TableRuntime tableRuntime, long now) { + return min( + // The change data ttl time + now - snapshotsKeepTime(tableRuntime), + // The latest flink committed snapshot should not be expired for recovering flink job + fetchLatestFlinkCommittedSnapshotTime(table)); } @Override @@ -337,21 +348,16 @@ protected Set expireSnapshotNeedToExcludeFiles() { return Sets.union(baseFiles, hiveFiles); } + @Override + protected long snapshotsKeepTime(TableRuntime tableRuntime) { + return tableRuntime.getTableConfiguration().getChangeDataTTLMinutes() * 60 * 1000; + } + public void expireFiles(long ttlPoint) { List expiredDataFileEntries = getExpiredDataFileEntries(ttlPoint); deleteChangeFile(expiredDataFileEntries); } - private long getChangeTTLPoint() { - return System.currentTimeMillis() - - CompatiblePropertyUtil.propertyAsLong( - unkeyedTable.properties(), - TableProperties.CHANGE_DATA_TTL, - TableProperties.CHANGE_DATA_TTL_DEFAULT) - * 60 - * 1000; - } - private List getExpiredDataFileEntries(long ttlPoint) { TableEntriesScan entriesScan = TableEntriesScan.builder(unkeyedTable).includeFileContent(FileContent.DATA).build(); @@ -473,6 +479,40 @@ public Set orphanFileCleanNeedToExcludeFiles() { return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles)); } + @Override + protected long mustOlderThan(TableRuntime tableRuntime, long now) { + return min( + // The snapshots keep time for base store + now - snapshotsKeepTime(tableRuntime), + // The snapshot optimizing plan based should not be expired for committing + fetchOptimizingPlanSnapshotTime(table, tableRuntime), + // The latest non-optimized snapshot should not be expired for data expiring + fetchLatestNonOptimizedSnapshotTime(table), + // The latest flink committed snapshot should not be expired for recovering flink job + fetchLatestFlinkCommittedSnapshotTime(table), + // The latest snapshot contains the optimized sequence should not be expired for MOR + fetchLatestOptimizedSequenceSnapshotTime(table)); + } + + /** + * When committing a snapshot to the base store of mixed format keyed table, it will store the + * optimized sequence to the snapshot, and will set a flag to the summary of this snapshot. + * + *

The optimized sequence will affect the correctness of MOR. + * + * @param table table + * @return time of the latest snapshot with the optimized sequence flag in summary + */ + private long fetchLatestOptimizedSequenceSnapshotTime(Table table) { + if (arcticTable.isKeyedTable()) { + Snapshot snapshot = + findLatestSnapshotContainsKey(table, BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST); + return snapshot == null ? Long.MAX_VALUE : snapshot.timestampMillis(); + } else { + return Long.MAX_VALUE; + } + } + @Override protected Set expireSnapshotNeedToExcludeFiles() { return Sets.union(changeFiles, hiveFiles); diff --git a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java index 969fc441ab..0a458c6a7c 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java @@ -18,9 +18,6 @@ package com.netease.arctic.server.utils; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.netease.arctic.IcebergFileEntry; import com.netease.arctic.scan.TableEntriesScan; import com.netease.arctic.server.ArcticServiceConstants; @@ -39,6 +36,9 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Optional; +import org.apache.iceberg.relocated.com.google.common.base.Predicate; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestSnapshotExpire.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestSnapshotExpire.java index 2f4f0fb792..007a2fe567 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestSnapshotExpire.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestSnapshotExpire.java @@ -19,6 +19,7 @@ package com.netease.arctic.server.optimizing.maintainer; import static com.netease.arctic.server.optimizing.maintainer.IcebergTableMaintainer.FLINK_MAX_COMMITTED_CHECKPOINT_ID; +import static com.netease.arctic.utils.ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST; import com.netease.arctic.BasicTableTestHelper; import com.netease.arctic.TableTestHelper; @@ -132,7 +133,7 @@ public void testExpireChangeTableFiles() { private void writeOptimizedSequence( KeyedTable testKeyedTable, StructLikeMap optimizedSequence) { BaseTable baseTable = testKeyedTable.baseTable(); - baseTable.newAppend().set(ArcticTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true").commit(); + baseTable.newAppend().set(BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true").commit(); Snapshot snapshot = baseTable.currentSnapshot(); StatisticsFile statisticsFile = StatisticsFileUtil.writerBuilder(baseTable) @@ -253,7 +254,47 @@ public void testNotExpireFlinkLatestCommit4All() { Assert.assertEquals(4, Iterables.size(table.snapshots())); - MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(table); + MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(getArcticTable()); + tableMaintainer.expireSnapshots(tableRuntime); + + Assert.assertEquals(2, Iterables.size(table.snapshots())); + List expectedSnapshots = new ArrayList<>(); + expectedSnapshots.add(checkpointTime2Snapshot); + expectedSnapshots.add(lastSnapshot); + Assert.assertTrue( + Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator())); + } + + @Test + public void testNotExpireOptimizedSequenceCommit4All() { + Assume.assumeTrue(isKeyedTable()); + BaseTable table = getArcticTable().asKeyedTable().baseTable(); + writeAndCommitBaseStore(table); + + AppendFiles appendFiles = table.newAppend(); + appendFiles.set(BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true"); + appendFiles.commit(); + + AppendFiles appendFiles2 = table.newAppend(); + appendFiles2.set(BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true"); + appendFiles2.commit(); + Snapshot checkpointTime2Snapshot = table.currentSnapshot(); + + writeAndCommitBaseStore(table); + Snapshot lastSnapshot = table.currentSnapshot(); + + table.updateProperties().set(TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, "0").commit(); + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()) + .thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()), getTestFormat())); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + Mockito.when(tableRuntime.getTableConfiguration()) + .thenReturn(TableConfiguration.parseConfig(table.properties())); + + Assert.assertEquals(4, Iterables.size(table.snapshots())); + + MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(getArcticTable()); tableMaintainer.expireSnapshots(tableRuntime); Assert.assertEquals(2, Iterables.size(table.snapshots()));