diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 7c8d6656b559..978735babc33 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -35,7 +35,7 @@ import org.apache.paimon.utils.FileDeletionThreadPool; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.TagManager; +import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,11 +77,12 @@ public abstract class FileDeletionBase { protected boolean changelogDecoupled; - /** Used to record which tag is cached. */ - private long cachedTag = 0; + /** Used to record which snapshot is cached. */ + private long cachedSnapshotId = 0; - /** Used to cache data files used by current tag. */ - private final Map>> cachedTagDataFiles = new HashMap<>(); + /** Used to cache data files used by current snapshot. */ + private final Map>> cachedSnapshotDataFiles = + new HashMap<>(); public FileDeletionBase( FileIO fileIO, @@ -328,17 +329,17 @@ protected void cleanUnusedManifests( } public Predicate dataFileSkipper( - List taggedSnapshots, long expiringSnapshotId) throws Exception { - int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId); - // refresh tag data files + List skippingSnapshots, long expiringSnapshotId) throws Exception { + int index = SnapshotManager.findPreviousSnapshot(skippingSnapshots, expiringSnapshotId); + // refresh snapshot data files if (index >= 0) { - Snapshot previousTag = taggedSnapshots.get(index); - if (previousTag.id() != cachedTag) { - cachedTag = previousTag.id(); - cachedTagDataFiles.clear(); - addMergedDataFiles(cachedTagDataFiles, previousTag); + Snapshot previousSnapshot = skippingSnapshots.get(index); + if (previousSnapshot.id() != cachedSnapshotId) { + cachedSnapshotId = previousSnapshot.id(); + cachedSnapshotDataFiles.clear(); + addMergedDataFiles(cachedSnapshotDataFiles, previousSnapshot); } - return entry -> containsDataFile(cachedTagDataFiles, entry); + return entry -> containsDataFile(cachedSnapshotDataFiles, entry); } return entry -> false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 759088a06dfa..72f655bbaafc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -134,10 +134,11 @@ public int expireUntil(long earliestId, long endExclusiveId) { LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); } - List taggedSnapshots = tagManager.taggedSnapshots(); + List referencedSnapshots = tagManager.taggedSnapshots(); List skippingSnapshots = - TagManager.findOverlappedSnapshots(taggedSnapshots, earliestId, endExclusiveId); + SnapshotManager.findOverlappedSnapshots( + referencedSnapshots, earliestId, endExclusiveId); skippingSnapshots.add(snapshotManager.changelog(endExclusiveId)); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { @@ -147,7 +148,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { Changelog changelog = snapshotManager.longLivedChangelog(id); Predicate skipper; try { - skipper = changelogDeletion.dataFileSkipper(taggedSnapshots, id); + skipper = changelogDeletion.dataFileSkipper(referencedSnapshots, id); } catch (Exception e) { LOG.info( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 1700472978ad..c49db64981d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -153,7 +153,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); } - List taggedSnapshots = tagManager.taggedSnapshots(); + List referencedSnapshots = tagManager.taggedSnapshots(); // delete merge tree files // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of @@ -166,7 +166,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { // expire merge tree files and collect changed buckets Predicate skipper; try { - skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, id); + skipper = snapshotDeletion.dataFileSkipper(referencedSnapshots, id); } catch (Exception e) { LOG.info( String.format( @@ -198,8 +198,8 @@ public int expireUntil(long earliestId, long endExclusiveId) { // delete manifests and indexFiles List skippingSnapshots = - TagManager.findOverlappedSnapshots( - taggedSnapshots, beginInclusiveId, endExclusiveId); + SnapshotManager.findOverlappedSnapshots( + referencedSnapshots, beginInclusiveId, endExclusiveId); skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId)); Set skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots); for (long id = beginInclusiveId; id < endExclusiveId; id++) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index cf0b44b5b4eb..59b31a1af5f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -668,6 +668,42 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null); } + /** + * Find the overlapping snapshots between sortedSnapshots and range of [beginInclusive, + * endExclusive). + */ + public static List findOverlappedSnapshots( + List sortedSnapshots, long beginInclusive, long endExclusive) { + List overlappedSnapshots = new ArrayList<>(); + int right = findPreviousSnapshot(sortedSnapshots, endExclusive); + if (right >= 0) { + int left = Math.max(findPreviousOrEqualSnapshot(sortedSnapshots, beginInclusive), 0); + for (int i = left; i <= right; i++) { + overlappedSnapshots.add(sortedSnapshots.get(i)); + } + } + return overlappedSnapshots; + } + + public static int findPreviousSnapshot(List sortedSnapshots, long targetSnapshotId) { + for (int i = sortedSnapshots.size() - 1; i >= 0; i--) { + if (sortedSnapshots.get(i).id() < targetSnapshotId) { + return i; + } + } + return -1; + } + + private static int findPreviousOrEqualSnapshot( + List sortedSnapshots, long targetSnapshotId) { + for (int i = sortedSnapshots.size() - 1; i >= 0; i--) { + if (sortedSnapshots.get(i).id() <= targetSnapshotId) { + return i; + } + } + return -1; + } + public void deleteLatestHint() throws IOException { Path snapshotDir = snapshotDirectory(); Path hintFile = new Path(snapshotDir, LATEST); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 65c6c232dafd..259a5bdbcda0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -366,36 +366,4 @@ private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { "Didn't find tag with snapshot id '%s'.This is unexpected.", taggedSnapshot.id())); } - - public static List findOverlappedSnapshots( - List taggedSnapshots, long beginInclusive, long endExclusive) { - List snapshots = new ArrayList<>(); - int right = findPreviousTag(taggedSnapshots, endExclusive); - if (right >= 0) { - int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0); - for (int i = left; i <= right; i++) { - snapshots.add(taggedSnapshots.get(i)); - } - } - return snapshots; - } - - public static int findPreviousTag(List taggedSnapshots, long targetSnapshotId) { - for (int i = taggedSnapshots.size() - 1; i >= 0; i--) { - if (taggedSnapshots.get(i).id() < targetSnapshotId) { - return i; - } - } - return -1; - } - - private static int findPreviousOrEqualTag( - List taggedSnapshots, long targetSnapshotId) { - for (int i = taggedSnapshots.size() - 1; i >= 0; i--) { - if (taggedSnapshots.get(i).id() <= targetSnapshotId) { - return i; - } - } - return -1; - } }