From 6e8d1e32965915b8b44c2824aee648ade6bc3ea7 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Fri, 26 Jul 2024 16:09:07 +0800 Subject: [PATCH 1/5] [core] Cleaning snapshots or deleting tags should skip those referenced by branches. --- .../org/apache/paimon/AbstractFileStore.java | 27 ++ .../java/org/apache/paimon/FileStore.java | 6 + .../paimon/operation/BranchDeletion.java | 95 +++++ .../paimon/operation/FileDeletionBase.java | 13 + .../apache/paimon/operation/TagDeletion.java | 17 - .../paimon/privilege/PrivilegedFileStore.java | 13 + .../paimon/table/AbstractFileStoreTable.java | 16 +- .../paimon/table/ExpireChangelogImpl.java | 11 +- .../paimon/table/ExpireSnapshotsImpl.java | 11 +- .../apache/paimon/tag/TagAutoCreation.java | 8 + .../org/apache/paimon/tag/TagAutoManager.java | 12 +- .../org/apache/paimon/tag/TagTimeExpire.java | 11 +- .../apache/paimon/utils/BranchManager.java | 136 ++++++- .../apache/paimon/utils/SnapshotManager.java | 46 +++ .../org/apache/paimon/utils/TagManager.java | 91 ++--- .../java/org/apache/paimon/TestFileStore.java | 9 +- .../paimon/operation/ExpireSnapshotsTest.java | 1 + .../paimon/operation/FileDeletionTest.java | 26 +- .../AutoTagForSavepointCommitterOperator.java | 10 +- .../sink/BatchWriteGeneratorTagOperator.java | 15 +- .../apache/paimon/flink/sink/FlinkSink.java | 1 + .../apache/paimon/flink/BranchSqlITCase.java | 332 ++++++++++++++++++ ...oTagForSavepointCommitterOperatorTest.java | 2 + 23 files changed, 822 insertions(+), 87 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 5120db295b69..ae1a7642ddce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -29,6 +29,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.metastore.AddPartitionTagCallback; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.operation.BranchDeletion; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.PartitionExpire; @@ -48,6 +49,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -252,6 +254,30 @@ public TagManager newTagManager() { return new TagManager(fileIO, options.path()); } + @Override + public BranchManager newBranchManager() { + return new BranchManager( + fileIO, + options.path(), + snapshotManager(), + newTagManager(), + schemaManager, + newBranchDeletion()); + } + + @Override + public BranchDeletion newBranchDeletion() { + return new BranchDeletion( + fileIO, + pathFactory(), + manifestFileFactory().create(), + manifestListFactory().create(), + newIndexFileHandler(), + newStatsFileHandler(), + options.cleanEmptyDirectories(), + options.deleteFileThreadNum()); + } + @Override public TagDeletion newTagDeletion() { return new TagDeletion( @@ -297,6 +323,7 @@ public TagAutoManager newTagCreationManager() { options, snapshotManager(), newTagManager(), + newBranchManager(), newTagDeletion(), createTagCallbacks()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 715062565683..59e8cec5b1ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -23,6 +23,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.BranchDeletion; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; @@ -38,6 +39,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -90,8 +92,12 @@ public interface FileStore { TagManager newTagManager(); + BranchManager newBranchManager(); + TagDeletion newTagDeletion(); + BranchDeletion newBranchDeletion(); + @Nullable PartitionExpire newPartitionExpire(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java new file mode 100644 index 000000000000..90f8e8d72e38 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.stats.StatsFileHandler; +import org.apache.paimon.utils.FileStorePathFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Predicate; + +/** Delete branch files. */ +public class BranchDeletion extends FileDeletionBase { + + private static final Logger LOG = LoggerFactory.getLogger(BranchDeletion.class); + + public BranchDeletion( + FileIO fileIO, + FileStorePathFactory pathFactory, + ManifestFile manifestFile, + ManifestList manifestList, + IndexFileHandler indexFileHandler, + StatsFileHandler statsFileHandler, + boolean cleanEmptyDirectories, + int deleteFileThreadNum) { + super( + fileIO, + pathFactory, + manifestFile, + manifestList, + indexFileHandler, + statsFileHandler, + cleanEmptyDirectories, + deleteFileThreadNum); + } + + @Override + public void cleanUnusedDataFiles(Snapshot snapshotToClean, Predicate skipper) { + Collection manifestEntries; + try { + manifestEntries = readMergedDataFiles(snapshotToClean); + } catch (IOException e) { + LOG.info("Skip data file clean for the snapshot {}.", snapshotToClean.id(), e); + return; + } + + Set dataFileToDelete = new HashSet<>(); + for (ManifestEntry entry : manifestEntries) { + if (!skipper.test(entry)) { + Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); + dataFileToDelete.add(new Path(bucketPath, entry.file().fileName())); + for (String file : entry.file().extraFiles()) { + dataFileToDelete.add(new Path(bucketPath, file)); + } + + recordDeletionBuckets(entry); + } + } + deleteFiles(dataFileToDelete, fileIO::deleteQuietly); + } + + @Override + public void cleanUnusedManifests(Snapshot snapshotToClean, Set skippingSet) { + // doesn't clean changelog files because they are handled by SnapshotDeletion + cleanUnusedManifests(snapshotToClean, skippingSet, true, false); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 978735babc33..e6d7b7acac70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -344,6 +344,19 @@ public Predicate dataFileSkipper( return entry -> false; } + public Predicate dataFileSkipper(Snapshot skippingSnapshot) throws Exception { + return dataFileSkipper(Collections.singletonList(skippingSnapshot)); + } + + public Predicate dataFileSkipper(List skippingSnapshots) + throws Exception { + Map>> skipped = new HashMap<>(); + for (Snapshot snapshot : skippingSnapshots) { + addMergedDataFiles(skipped, snapshot); + } + return entry -> containsDataFile(skipped, entry); + } + /** * It is possible that a job was killed during expiration and some manifest files have been * deleted, so if the clean methods need to get manifests of a snapshot to be cleaned, we should diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index a6cd338d5859..1788876b718a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -19,7 +19,6 @@ package org.apache.paimon.operation; import org.apache.paimon.Snapshot; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; @@ -34,11 +33,7 @@ import java.io.IOException; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -97,16 +92,4 @@ public void cleanUnusedManifests(Snapshot taggedSnapshot, Set skippingSe // doesn't clean changelog files because they are handled by SnapshotDeletion cleanUnusedManifests(taggedSnapshot, skippingSet, true, false); } - - public Predicate dataFileSkipper(Snapshot fromSnapshot) throws Exception { - return dataFileSkipper(Collections.singletonList(fromSnapshot)); - } - - public Predicate dataFileSkipper(List fromSnapshots) throws Exception { - Map>> skipped = new HashMap<>(); - for (Snapshot snapshot : fromSnapshots) { - addMergedDataFiles(skipped, snapshot); - } - return entry -> containsDataFile(skipped, entry); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 768e6259e2c1..612342ad3f99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.BranchDeletion; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; @@ -41,6 +42,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -164,6 +166,17 @@ public TagManager newTagManager() { return wrapped.newTagManager(); } + @Override + public BranchManager newBranchManager() { + privilegeChecker.assertCanInsert(identifier); + return wrapped.newBranchManager(); + } + + @Override + public BranchDeletion newBranchDeletion() { + return wrapped.newBranchDeletion(); + } + @Override public TagDeletion newTagDeletion() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d30bd11efda6..639a0caf2764 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -351,13 +351,16 @@ public SnapshotManager snapshotManager() { @Override public ExpireSnapshots newExpireSnapshots() { return new ExpireSnapshotsImpl( - snapshotManager(), store().newSnapshotDeletion(), store().newTagManager()); + snapshotManager(), + store().newSnapshotDeletion(), + store().newTagManager(), + branchManager()); } @Override public ExpireSnapshots newExpireChangelog() { return new ExpireChangelogImpl( - snapshotManager(), tagManager(), store().newChangelogDeletion()); + snapshotManager(), tagManager(), store().newChangelogDeletion(), branchManager()); } @Override @@ -566,6 +569,7 @@ public void deleteTag(String tagName) { tagName, store().newTagDeletion(), snapshotManager(), + branchManager(), store().createTagCallbacks()); } @@ -627,7 +631,13 @@ public TagManager tagManager() { @Override public BranchManager branchManager() { - return new BranchManager(fileIO, path, snapshotManager(), tagManager(), schemaManager()); + return new BranchManager( + fileIO, + path, + snapshotManager(), + tagManager(), + schemaManager(), + store().newBranchDeletion()); } private RollbackHelper rollbackHelper() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 72f655bbaafc..98b1bd8ff8b7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -24,6 +24,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -46,15 +47,18 @@ public class ExpireChangelogImpl implements ExpireSnapshots { private final ConsumerManager consumerManager; private final ChangelogDeletion changelogDeletion; private final TagManager tagManager; + private final BranchManager branchManager; private ExpireConfig expireConfig; public ExpireChangelogImpl( SnapshotManager snapshotManager, TagManager tagManager, - ChangelogDeletion changelogDeletion) { + ChangelogDeletion changelogDeletion, + BranchManager branchManager) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; + this.branchManager = branchManager; this.consumerManager = new ConsumerManager( snapshotManager.fileIO(), @@ -134,7 +138,10 @@ public int expireUntil(long earliestId, long endExclusiveId) { LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); } - List referencedSnapshots = tagManager.taggedSnapshots(); + List referencedSnapshots = + SnapshotManager.mergeTreeSetToList( + tagManager.taggedSnapshots(), + branchManager.branchesCreateSnapshots().keySet()); List skippingSnapshots = SnapshotManager.findOverlappedSnapshots( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index c49db64981d1..9e68ddea4bb4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -47,13 +48,15 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { private final ConsumerManager consumerManager; private final SnapshotDeletion snapshotDeletion; private final TagManager tagManager; + private final BranchManager branchManager; private ExpireConfig expireConfig; public ExpireSnapshotsImpl( SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion, - TagManager tagManager) { + TagManager tagManager, + BranchManager branchManager) { this.snapshotManager = snapshotManager; this.consumerManager = new ConsumerManager( @@ -62,6 +65,7 @@ public ExpireSnapshotsImpl( snapshotManager.branch()); this.snapshotDeletion = snapshotDeletion; this.tagManager = tagManager; + this.branchManager = branchManager; this.expireConfig = ExpireConfig.builder().build(); } @@ -153,7 +157,10 @@ public int expireUntil(long earliestId, long endExclusiveId) { "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); } - List referencedSnapshots = tagManager.taggedSnapshots(); + List referencedSnapshots = + SnapshotManager.mergeTreeSetToList( + tagManager.taggedSnapshots(), + branchManager.branchesCreateSnapshots().keySet()); // delete merge tree files // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 58241033f5fb..6cf8006d0c90 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -24,6 +24,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor; import org.apache.paimon.tag.TagTimeExtractor.WatermarkExtractor; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -51,6 +52,8 @@ public class TagAutoCreation { private final SnapshotManager snapshotManager; private final TagManager tagManager; + private final BranchManager branchManager; + private final TagDeletion tagDeletion; private final TagTimeExtractor timeExtractor; private final TagPeriodHandler periodHandler; @@ -67,6 +70,7 @@ public class TagAutoCreation { private TagAutoCreation( SnapshotManager snapshotManager, TagManager tagManager, + BranchManager branchManager, TagDeletion tagDeletion, TagTimeExtractor timeExtractor, TagPeriodHandler periodHandler, @@ -78,6 +82,7 @@ private TagAutoCreation( List callbacks) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; + this.branchManager = branchManager; this.tagDeletion = tagDeletion; this.timeExtractor = timeExtractor; this.periodHandler = periodHandler; @@ -182,6 +187,7 @@ private void tryToCreateTags(Snapshot snapshot) { checkAndGetOneAutoTag(tag), tagDeletion, snapshotManager, + branchManager, callbacks); i++; if (i == toDelete) { @@ -211,6 +217,7 @@ public static TagAutoCreation create( CoreOptions options, SnapshotManager snapshotManager, TagManager tagManager, + BranchManager branchManager, TagDeletion tagDeletion, List callbacks) { TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); @@ -220,6 +227,7 @@ public static TagAutoCreation create( return new TagAutoCreation( snapshotManager, tagManager, + branchManager, tagDeletion, extractor, TagPeriodHandler.create(options), diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java index 387a3e746adc..77e184ff01de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -50,6 +51,7 @@ public static TagAutoManager create( CoreOptions options, SnapshotManager snapshotManager, TagManager tagManager, + BranchManager branchManager, TagDeletion tagDeletion, List callbacks) { TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); @@ -58,8 +60,14 @@ public static TagAutoManager create( extractor == null ? null : TagAutoCreation.create( - options, snapshotManager, tagManager, tagDeletion, callbacks), - TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, callbacks)); + options, + snapshotManager, + tagManager, + branchManager, + tagDeletion, + callbacks), + TagTimeExpire.create( + snapshotManager, tagManager, branchManager, tagDeletion, callbacks)); } public TagAutoCreation getTagAutoCreation() { diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java index d4797c0cb056..21442a2dd060 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java @@ -20,6 +20,7 @@ import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -38,16 +39,19 @@ public class TagTimeExpire { private final SnapshotManager snapshotManager; private final TagManager tagManager; + private final BranchManager branchManager; private final TagDeletion tagDeletion; private final List callbacks; private TagTimeExpire( SnapshotManager snapshotManager, TagManager tagManager, + BranchManager branchManager, TagDeletion tagDeletion, List callbacks) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; + this.branchManager = branchManager; this.tagDeletion = tagDeletion; this.callbacks = callbacks; } @@ -67,7 +71,8 @@ public void run() { "Delete tag {}, because its existence time has reached its timeRetained of {}.", tagName, timeRetained); - tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks); + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, branchManager, callbacks); } } } @@ -75,8 +80,10 @@ public void run() { public static TagTimeExpire create( SnapshotManager snapshotManager, TagManager tagManager, + BranchManager branchManager, TagDeletion tagDeletion, List callbacks) { - return new TagTimeExpire(snapshotManager, tagManager, tagDeletion, callbacks); + return new TagTimeExpire( + snapshotManager, tagManager, branchManager, tagDeletion, callbacks); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index af598587c79f..7762c00b736b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -19,17 +19,29 @@ package org.apache.paimon.utils; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.BranchDeletion; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,17 +64,21 @@ public class BranchManager { private final TagManager tagManager; private final SchemaManager schemaManager; + private final BranchDeletion branchDeletion; + public BranchManager( FileIO fileIO, Path path, SnapshotManager snapshotManager, TagManager tagManager, - SchemaManager schemaManager) { + SchemaManager schemaManager, + BranchDeletion branchDeletion) { this.fileIO = fileIO; this.tablePath = path; this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.schemaManager = schemaManager; + this.branchDeletion = branchDeletion; } /** Return the root Directory of branch. */ @@ -199,8 +215,39 @@ branchName, branchPath(tablePath, branchName)), } public void deleteBranch(String branchName) { + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName); + try { + + if (!branchExists(branchName)) { + return; + } + + Snapshot snapshotToClean = + snapshotManager.copyWithBranch(branchName).earliestSnapshot(); + + if (snapshotToClean != null) { + if (!snapshotManager.snapshotExists(snapshotToClean.id())) { + SortedMap> branchReferenceSnapshotsMap = + branchesCreateSnapshots(); + // If the snapshotToClean is not referenced by other branches or tags, we need + // to do clean the dataFiles and manifestFiles. + if (branchReferenceSnapshotsMap + .getOrDefault(snapshotToClean, Collections.emptyList()) + .size() + == 1 + && !tagManager.taggedSnapshots().contains(snapshotToClean)) { + // do clean. + doClean( + snapshotToClean, + SnapshotManager.mergeTreeSetToList( + branchReferenceSnapshotsMap.keySet(), + tagManager.taggedSnapshots())); + } + } + } + // Delete branch directory fileIO.delete(branchPath(branchName), true); } catch (IOException e) { @@ -212,13 +259,40 @@ public void deleteBranch(String branchName) { } } + public void doClean(Snapshot snapshotToDelete, List referencedSnapshots) { + // collect skipping sets from the left neighbor branch and the nearest right neighbor + // (either the earliest snapshot or right neighbor branch) + List skippedSnapshots = + SnapshotManager.findNearestNeighborsSnapshot( + snapshotToDelete, referencedSnapshots, snapshotManager); + + // delete data files and empty directories + Predicate dataFileSkipper = null; + boolean success = true; + try { + dataFileSkipper = branchDeletion.dataFileSkipper(skippedSnapshots); + } catch (Exception e) { + LOG.info( + String.format( + "Skip cleaning data files for branch of snapshot %s due to failed to build skipping set.", + snapshotToDelete.id()), + e); + success = false; + } + if (success) { + branchDeletion.cleanUnusedDataFiles(snapshotToDelete, dataFileSkipper); + branchDeletion.cleanEmptyDirectories(); + } + + // delete manifests + branchDeletion.cleanUnusedManifests( + snapshotToDelete, branchDeletion.manifestSkippingSet(skippedSnapshots)); + } + /** Check if path exists. */ public boolean fileExists(Path path) { try { - if (fileIO.exists(path)) { - return true; - } - return false; + return fileIO.exists(path); } catch (IOException e) { throw new RuntimeException( String.format("Failed to determine if path '%s' exists.", path), e); @@ -316,4 +390,56 @@ public List branches() { throw new RuntimeException(e); } } + + /** Get all snapshots that are referenced by branches. */ + public SortedMap> branchesCreateSnapshots() { + TreeMap> sortedSnapshots = + new TreeMap<>(Comparator.comparingLong(Snapshot::id)); + + for (String branchName : branches()) { + Snapshot branchCreateSnapshot = + snapshotManager.copyWithBranch(branchName).earliestSnapshot(); + if (branchCreateSnapshot == null) { + // Support empty branch. + branchSnapshots.put(new TableBranch(branchName, path.getValue()), null); + continue; + } + FileStoreTable branchTable = + FileStoreTableFactory.create( + fileIO, new Path(branchPath(tablePath, branchName))); + SortedMap> snapshotTags = branchTable.tagManager().tags(); + Snapshot earliestSnapshot = branchTable.snapshotManager().earliestSnapshot(); + if (snapshotTags.isEmpty()) { + // Create based on snapshotId. + branchSnapshots.put( + new TableBranch(branchName, earliestSnapshot.id(), path.getValue()), + earliestSnapshot); + } else { + Snapshot snapshot = snapshotTags.firstKey(); + // current branch is create from tag. + if (earliestSnapshot.id() == snapshot.id()) { + List tags = snapshotTags.get(snapshot); + checkArgument(tags.size() == 1); + branchSnapshots.put( + new TableBranch( + branchName, tags.get(0), snapshot.id(), path.getValue()), + snapshot); + } else { + // Create based on snapshotId. + branchSnapshots.put( + new TableBranch(branchName, earliestSnapshot.id(), path.getValue()), + earliestSnapshot); + } + } + } + + for (Map.Entry snapshotEntry : branches().entrySet()) { + if (snapshotEntry.getValue() != null) { + sortedSnapshots + .computeIfAbsent(snapshotEntry.getValue(), s -> new ArrayList<>()) + .add(snapshotEntry.getKey()); + } + } + return sortedSnapshots; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index af83fab6a7e9..05c7ee03b984 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -32,6 +32,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -39,6 +40,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; import java.util.function.Function; @@ -696,6 +698,50 @@ private static int findPreviousOrEqualSnapshot( return -1; } + public static int findIndex(Snapshot targetSnapshot, List snapshotList) { + for (int i = 0; i < snapshotList.size(); i++) { + if (targetSnapshot.id() == snapshotList.get(i).id()) { + return i; + } + } + throw new RuntimeException( + String.format( + "Didn't find tag with snapshot id '%s'.This is unexpected.", + targetSnapshot.id())); + } + + public static List findNearestNeighborsSnapshot( + Snapshot targetSnapshot, List snapshotList, SnapshotManager snapshotManager) { + List skippedSnapshots = new ArrayList<>(); + + int index = SnapshotManager.findIndex(targetSnapshot, snapshotList); + // the left neighbor tag + if (index - 1 >= 0) { + skippedSnapshots.add(snapshotList.get(index - 1)); + } + // the nearest right neighbor + Snapshot right = snapshotManager.earliestSnapshot(); + if (index + 1 < snapshotList.size()) { + Snapshot rightTag = snapshotList.get(index + 1); + right = right.id() < rightTag.id() ? right : rightTag; + } + skippedSnapshots.add(right); + return skippedSnapshots; + } + + public static List mergeTreeSetToList( + Collection set1, Collection set2) { + if (set1 != null && set2 != null) { + if (!set1.isEmpty() || !set2.isEmpty()) { + TreeSet re = new TreeSet<>(Comparator.comparingLong(Snapshot::id)); + re.addAll(set1); + re.addAll(set2); + return new ArrayList<>(re); + } + } + return Collections.emptyList(); + } + public void deleteLatestHint() throws IOException { Path snapshotDir = snapshotDirectory(); Path hintFile = new Path(snapshotDir, LATEST); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 259a5bdbcda0..dd7f0db8a339 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -38,8 +38,10 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Predicate; @@ -136,12 +138,21 @@ public void createTag( /** Make sure the tagNames are ALL tags of one snapshot. */ public void deleteAllTagsOfOneSnapshot( - List tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) { + List tagNames, + TagDeletion tagDeletion, + SnapshotManager snapshotManager, + BranchManager branchManager) { Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0)); List taggedSnapshots; + Set referencedSnapshotsByBranch = + branch.equals(DEFAULT_MAIN_BRANCH) + ? branchManager.branchesCreateSnapshots().keySet() + : Collections.emptySet(); + // skip file deletion if snapshot exists - if (snapshotManager.snapshotExists(taggedSnapshot.id())) { + if (snapshotManager.snapshotExists(taggedSnapshot.id()) + || referencedSnapshotsByBranch.contains(taggedSnapshot)) { tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(tagName))); return; } else { @@ -150,36 +161,53 @@ public void deleteAllTagsOfOneSnapshot( tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(tagName))); } - doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); + doClean( + taggedSnapshot, + SnapshotManager.mergeTreeSetToList(referencedSnapshotsByBranch, taggedSnapshots), + snapshotManager, + tagDeletion); } public void deleteTag( String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager, + BranchManager branchManager, List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - Snapshot taggedSnapshot = taggedSnapshot(tagName); - List taggedSnapshots; + // If the current branch is the master branch, snapshots referenced by other branches + // should be skipped. + Set referencedSnapshotsByBranch = + branch.equals(DEFAULT_MAIN_BRANCH) + ? branchManager.branchesCreateSnapshots().keySet() + : Collections.emptySet(); - // skip file deletion if snapshot exists - if (snapshotManager.copyWithBranch(branch).snapshotExists(taggedSnapshot.id())) { + Snapshot snapshotToClean = taggedSnapshot(tagName); + + // skip file deletion if snapshot exists or the snapshot is still referenced by other + // branches or tags. + if (snapshotManager.copyWithBranch(branch).snapshotExists(snapshotToClean.id()) + || referencedSnapshotsByBranch.contains(snapshotToClean)) { + // delete tag meta file. deleteTagMetaFile(tagName, callbacks); return; - } else { - // FileIO discovers tags by tag file, so we should read all tags before we delete tag - SortedMap> tags = tags(); - deleteTagMetaFile(tagName, callbacks); - // skip data file clean if more than 1 tags are created based on this snapshot - if (tags.get(taggedSnapshot).size() > 1) { - return; - } - taggedSnapshots = new ArrayList<>(tags.keySet()); } - doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); + // FileIO discovers tags by tag file, so we should read all tags before we delete tag. + SortedMap> tags = tags(); + deleteTagMetaFile(tagName, callbacks); + // skip data file clean if more than 1 tag are created based on this snapshot + if (tags.get(snapshotToClean).size() > 1) { + return; + } + + doClean( + snapshotToClean, + SnapshotManager.mergeTreeSetToList(referencedSnapshotsByBranch, tags.keySet()), + snapshotManager, + tagDeletion); } private void deleteTagMetaFile(String tagName, List callbacks) { @@ -200,20 +228,9 @@ private void doClean( TagDeletion tagDeletion) { // collect skipping sets from the left neighbor tag and the nearest right neighbor (either // the earliest snapshot or right neighbor tag) - List skippedSnapshots = new ArrayList<>(); - - int index = findIndex(taggedSnapshot, taggedSnapshots); - // the left neighbor tag - if (index - 1 >= 0) { - skippedSnapshots.add(taggedSnapshots.get(index - 1)); - } - // the nearest right neighbor - Snapshot right = snapshotManager.copyWithBranch(branch).earliestSnapshot(); - if (index + 1 < taggedSnapshots.size()) { - Snapshot rightTag = taggedSnapshots.get(index + 1); - right = right.id() < rightTag.id() ? right : rightTag; - } - skippedSnapshots.add(right); + List skippedSnapshots = + SnapshotManager.findNearestNeighborsSnapshot( + taggedSnapshot, taggedSnapshots, snapshotManager.copyWithBranch(branch)); // delete data files and empty directories Predicate dataFileSkipper = null; @@ -354,16 +371,4 @@ public List sortTagsOfOneSnapshot(List tagNames) { public List allTagNames() { return tags().values().stream().flatMap(Collection::stream).collect(Collectors.toList()); } - - private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { - for (int i = 0; i < taggedSnapshots.size(); i++) { - if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) { - return i; - } - } - throw new RuntimeException( - String.format( - "Didn't find tag with snapshot id '%s'.This is unexpected.", - taggedSnapshot.id())); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index bd6950d77b3d..3778c6d9346f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -158,7 +158,8 @@ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long mi return new ExpireSnapshotsImpl( snapshotManager(), newSnapshotDeletion(), - new TagManager(fileIO, options.path())) + new TagManager(fileIO, options.path()), + newBranchManager()) .config( ExpireConfig.builder() .snapshotRetainMax(numRetainedMax) @@ -171,7 +172,8 @@ public ExpireSnapshots newExpire(ExpireConfig expireConfig) { return new ExpireSnapshotsImpl( snapshotManager(), newSnapshotDeletion(), - new TagManager(fileIO, options.path())) + new TagManager(fileIO, options.path()), + newBranchManager()) .config(expireConfig); } @@ -180,7 +182,8 @@ public ExpireSnapshots newChangelogExpire(ExpireConfig config) { new ExpireChangelogImpl( snapshotManager(), new TagManager(fileIO, options.path()), - newChangelogDeletion()); + newChangelogDeletion(), + newBranchManager()); impl.config(config); return impl; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index bd32e48e5d1b..75acba7bff7f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -162,6 +162,7 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { "tag" + id, store.newTagDeletion(), snapshotManager, + store.newBranchManager(), Collections.emptyList()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 3fb1d36ac305..e7c1beacc037 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -435,7 +436,11 @@ public void testDeleteTagWithSnapshot() throws Exception { } tagManager.deleteTag( - "tag1", store.newTagDeletion(), snapshotManager, Collections.emptyList()); + "tag1", + store.newTagDeletion(), + snapshotManager, + store.newBranchManager(), + Collections.emptyList()); // check data files assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0)); @@ -512,7 +517,11 @@ public void testDeleteTagWithOtherTag() throws Exception { } tagManager.deleteTag( - "tag2", store.newTagDeletion(), snapshotManager, Collections.emptyList()); + "tag2", + store.newTagDeletion(), + snapshotManager, + store.newBranchManager(), + Collections.emptyList()); // check data files assertPathExists(fileIO, pathFactory.bucketPath(partition, 0)); @@ -647,6 +656,7 @@ public void testExpireWithDeletingTags() throws Exception { TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 2); tagManager = new TagManager(fileIO, store.options().path()); SnapshotManager snapshotManager = store.snapshotManager(); + BranchManager branchManager = store.newBranchManager(); TestKeyValueGenerator gen = new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); BinaryRow partition = gen.getPartition(gen.next()); @@ -674,7 +684,11 @@ public void testExpireWithDeletingTags() throws Exception { // action: expire snapshot 1 -> delete tag1 -> expire snapshot 2 // result: exist A & B (because of tag2) ExpireSnapshots expireSnapshots = - new ExpireSnapshotsImpl(snapshotManager, store.newSnapshotDeletion(), tagManager); + new ExpireSnapshotsImpl( + snapshotManager, + store.newSnapshotDeletion(), + tagManager, + store.newBranchManager()); expireSnapshots .config( ExpireConfig.builder() @@ -684,7 +698,11 @@ public void testExpireWithDeletingTags() throws Exception { .build()) .expire(); tagManager.deleteTag( - "tag1", store.newTagDeletion(), snapshotManager, Collections.emptyList()); + "tag1", + store.newTagDeletion(), + snapshotManager, + branchManager, + Collections.emptyList()); expireSnapshots .config( ExpireConfig.builder() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 6d27c6019483..7b671c0e956a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SerializableSupplier; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -71,6 +72,8 @@ public class AutoTagForSavepointCommitterOperator private final SerializableSupplier tagManagerFactory; + private final SerializableSupplier branchManagerFactory; + private final SerializableSupplier tagDeletionFactory; private final SerializableSupplier> callbacksSupplier; @@ -83,6 +86,8 @@ public class AutoTagForSavepointCommitterOperator private transient TagManager tagManager; + private transient BranchManager branchManager; + private transient TagDeletion tagDeletion; private transient List callbacks; @@ -93,11 +98,13 @@ public AutoTagForSavepointCommitterOperator( CommitterOperator commitOperator, SerializableSupplier snapshotManagerFactory, SerializableSupplier tagManagerFactory, + SerializableSupplier branchManagerFactory, SerializableSupplier tagDeletionFactory, SerializableSupplier> callbacksSupplier, Duration tagTimeRetained) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; + this.branchManagerFactory = branchManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; this.tagDeletionFactory = tagDeletionFactory; this.callbacksSupplier = callbacksSupplier; @@ -113,6 +120,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) } finally { snapshotManager = snapshotManagerFactory.get(); tagManager = tagManagerFactory.get(); + branchManager = branchManagerFactory.get(); tagDeletion = tagDeletionFactory.get(); callbacks = callbacksSupplier.get(); @@ -161,7 +169,7 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { identifiersForTags.remove(checkpointId); String tagName = SAVEPOINT_TAG_PREFIX + checkpointId; if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks); + tagManager.deleteTag(tagName, tagDeletion, snapshotManager, branchManager, callbacks); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 23202b45077f..bbfc14efbf0e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -115,7 +115,11 @@ private void createTag() { // If the tag already exists, delete the tag if (tagManager.tagExists(tagName)) { tagManager.deleteTag( - tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); + tagName, + tagDeletion, + snapshotManager, + table.branchManager(), + table.store().createTagCallbacks()); } // Create a new tag tagManager.createTag( @@ -128,7 +132,11 @@ private void createTag() { } catch (Exception e) { if (tagManager.tagExists(tagName)) { tagManager.deleteTag( - tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); + tagName, + tagDeletion, + snapshotManager, + table.branchManager(), + table.store().createTagCallbacks()); } } } @@ -148,7 +156,7 @@ private void expireTag() { for (List tagNames : tagManager.tags().values()) { if (tagCount - tagNames.size() >= tagNumRetainedMax) { tagManager.deleteAllTagsOfOneSnapshot( - tagNames, tagDeletion, snapshotManager); + tagNames, tagDeletion, snapshotManager, table.branchManager()); tagCount = tagCount - tagNames.size(); } else { List sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames); @@ -157,6 +165,7 @@ private void expireTag() { toBeDeleted, tagDeletion, snapshotManager, + table.branchManager(), table.store().createTagCallbacks()); tagCount--; if (tagCount == tagNumRetainedMax) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 865b2a939e2e..5648db5ea3ed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -260,6 +260,7 @@ protected DataStreamSink doCommit(DataStream written, String com (CommitterOperator) committerOperator, table::snapshotManager, table::tagManager, + table::branchManager, () -> table.store().newTagDeletion(), () -> table.store().createTagCallbacks(), table.coreOptions().tagDefaultTimeRetained()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 33aca03b862c..34476f2e3cf4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -18,6 +18,12 @@ package org.apache.paimon.flink; +import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.TableBranch; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.SnapshotManager; @@ -35,6 +41,8 @@ /** IT cases for table with branches using SQL. */ public class BranchSqlITCase extends CatalogITCaseBase { + protected final FileIO fileIO = new LocalFileIO(); + @Test public void testAlterBranchTable() throws Exception { sql( @@ -296,6 +304,326 @@ public void testBranchFastForward() throws Exception { checkSnapshots(snapshotManager, 1, 2); } + /** Cleaning up snapshots or deleting tags should skip those referenced by branches. */ + @Test + public void testSnapshotExpireSkipTheReferencedByBranches() throws Exception { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (1, 20, 'hunter')"); + sql("INSERT INTO T VALUES (1, 30, 'hunter')"); + sql("INSERT INTO T VALUES (1, 40, 'hunter')"); + sql("INSERT INTO T VALUES (1, 50, 'hunter')"); + + checkSnapshots(snapshotManager, 1, 5); + + // create 2 branch from the snapshot-1. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test_1', 1)"); + + // create tag2 from snapshot 2. + sql("CALL sys.create_tag('default.T', 'tag2', 2)"); + // create branch from tag2. + sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); + + sql("CALL sys.create_branch('default.T', 'test3', 3)"); + + assertThat( + table.branchManager().branchesCreateSnapshots().keySet().stream() + .map(Snapshot::id)) + .containsExactlyInAnyOrder(1L, 2L, 3L); + + // Only retain 5,6 snapshot, 1-4 will expire. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (1, 60, 'hunter')"); + + checkSnapshots(snapshotManager, 5, 6); + + // Snapshot 1-4 has expired, but still be referenced by branches; + assertThat( + table.branchManager().branchesCreateSnapshots().keySet().stream() + .map(Snapshot::id)) + .containsExactlyInAnyOrder(1L, 2L, 3L); + + checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 1); + + // Created from tag2 and snapshot id is 2. + checkSnapshots(paimonTable("T$branch_test2").snapshotManager(), 2, 2); + + // The data of snapshot 1-3 is still can be read by tag or branch, snapshot-4 had expire. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + + sql("CALL sys.delete_tag('default.T', 'tag2')"); + assertThat(collectResult("SELECT * FROM T$branch_test2")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[1, 20, hunter]"); + + // The branch still can be read. + assertThat(collectResult("SELECT * FROM T$branch_test3")) + .containsExactlyInAnyOrder( + "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]"); + } + + /** + * Let's say snapshot-1 has expired, but the manifest is still referenced by some branches, so + * deleting the tag should skip those snapshots that are still referenced by other branches. + */ + @Test + public void testDeleteTagsSkipTheReferencedByBranches() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); + + checkSnapshots(snapshotManager, 1, 2); + + // create branch from the snapshot-1. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + // create tag from snapshot 1. + sql("CALL sys.create_tag('default.T', 'tag', 1)"); + + // Only retain 2,3 snapshot, snapshot-1 will be expired. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (3, 30, 'hunter3')"); + + checkSnapshots(snapshotManager, 2, 3); + + assertThat(table.branchManager().branchesCreateSnapshots().size()).isEqualTo(1); + Snapshot branchCreateSnapshots = + table.branchManager().branchesCreateSnapshots().keySet().iterator().next(); + assertThat(branchCreateSnapshots.id()).isEqualTo(1L); + + // query branches data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + + // delete tag. + sql("CALL sys.delete_tag('default.T', 'tag')"); + + // Verify that the maniFest file has not been deleted. + assertThat(manifestFileExist("T", branchCreateSnapshots.baseManifestList())).isTrue(); + assertThat(manifestFileExist("T", branchCreateSnapshots.deltaManifestList())).isTrue(); + + // query branches data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + } + + @Test + public void testDeleteBranchSkipTheReferencedByTags() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); + + checkSnapshots(snapshotManager, 1, 2); + + // create branch from the snapshot-1. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + sql("CALL sys.create_tag('default.T', 'tag', 1)"); + + // Only retain 2,3 snapshot, snapshot-1 will be expired. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (3, 30, 'hunter3')"); + + // delete tag. + sql("CALL sys.delete_branch('default.T', 'test')"); + + // tag still can be read. + assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag') */")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + } + + @Test + public void testDeleteBranchSkipTheReferencedByBranches() throws Exception { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); + + checkSnapshots(snapshotManager, 1, 2); + + // create branch from the snapshot-1. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + sql("CALL sys.create_branch('default.T', 'test2', 1)"); + + // Only retain 2,3 snapshot, snapshot-1 will be expired. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (3, 30, 'hunter3')"); + + // delete branch. + sql("CALL sys.delete_branch('default.T', 'test')"); + + // branch still can be read. + assertThat(collectResult("SELECT * FROM T$branch_test2")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + } + + @Test + public void testDeleteBranchTriggerCleanSnapshots() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); + + checkSnapshots(snapshotManager, 1, 2); + + // create branch from the snapshot-1. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + // this branch will be ignored. + sql("CALL sys.create_branch('default.T', 'empty')"); + + // Only retain 2,3 snapshot, snapshot-1 will be expired. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (3, 30, 'hunter3')"); + + assertThat(table.branchManager().branchesCreateSnapshots().size()).isEqualTo(1); + Snapshot branchCreateSnapshots = + table.branchManager().branchesCreateSnapshots().keySet().iterator().next(); + assertThat(branchCreateSnapshots.id()).isEqualTo(1L); + + // query branches data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + + // Verify that the maniFest file has not been deleted. + assertThat(manifestFileExist("T", branchCreateSnapshots.baseManifestList())).isTrue(); + assertThat(manifestFileExist("T", branchCreateSnapshots.deltaManifestList())).isTrue(); + + // delete tag. + sql("CALL sys.delete_branch('default.T', 'test')"); + + // Verify that the manifest file has been deleted. + assertThat(manifestFileExist("T", branchCreateSnapshots.baseManifestList())).isFalse(); + assertThat(manifestFileExist("T", branchCreateSnapshots.deltaManifestList())).isFalse(); + } + + @Test + public void testDeleteBranchCleanUnusedDataFileAndManifest() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); + sql("INSERT INTO T VALUES (3, 30, 'hunter3')"); + sql("INSERT INTO T VALUES (4, 40, 'hunter4')"); + + checkSnapshots(snapshotManager, 1, 4); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + sql("CALL sys.create_tag('default.T', 'tag', 3)"); + + // Only retain 6 snapshot, snapshot 1-5 will be expired. + // here need update all the partition data. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'full-compaction.delta-commits'='1'," + + "'snapshot.num-retained.min'= '1'," + + "'snapshot.num-retained.max'= '1' ) */" + + " VALUES (1, 10, 'hunter'),(2, 20, 'hunter2'),(3, 30, 'hunter3'),(4, 40, 'hunter4')"); + + checkSnapshots(snapshotManager, 6, 6); + + // delete branch should skip the nearest left neighbor snapshot and the nearest right + // neighbor. + sql("CALL sys.delete_branch('default.T', 'test2')"); + + // tag still can be read. + assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag') */")) + .containsExactlyInAnyOrder( + "+I[1, 10, hunter]", "+I[2, 20, hunter2]", "+I[3, 30, hunter3]"); + + // query branches data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + } + @Test public void testFallbackBranchBatchRead() throws Exception { sql( @@ -380,4 +708,8 @@ private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws assertThat(sm.earliestSnapshotId()).isEqualTo(earliest); assertThat(sm.latestSnapshotId()).isEqualTo(latest); } + + private boolean manifestFileExist(String tableName, String manifest) throws IOException { + return fileIO.exists(new Path(getTableDirectory(tableName) + "/manifest/" + manifest)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 3b58c24d16b1..8ad10574ec6a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -207,6 +207,7 @@ protected OneInputStreamOperator createCommitterOperat super.createCommitterOperator(table, commitUser, committableStateManager), table::snapshotManager, table::tagManager, + table::branchManager, () -> table.store().newTagDeletion(), () -> table.store().createTagCallbacks(), table.store().options().tagDefaultTimeRetained()); @@ -224,6 +225,7 @@ protected OneInputStreamOperator createCommitterOperat table, commitUser, committableStateManager, initializeFunction), table::snapshotManager, table::tagManager, + table::branchManager, () -> table.store().newTagDeletion(), () -> table.store().createTagCallbacks(), table.store().options().tagDefaultTimeRetained()); From 700e2da84c060f98b0c9a2e53c6a5fe05b1764a3 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Mon, 29 Jul 2024 14:21:40 +0800 Subject: [PATCH 2/5] [core] OrphanFilesClean#getUsedFiles should consider branches snapshot. --- .../java/org/apache/paimon/operation/OrphanFilesClean.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index b0b06c33d586..cffcf4586201 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -35,6 +35,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.SnapshotManager; @@ -93,6 +94,7 @@ public class OrphanFilesClean { private final SnapshotManager snapshotManager; private final TagManager tagManager; + private final BranchManager branchManager; private final FileIO fileIO; private final Path location; private final int partitionKeysNum; @@ -107,6 +109,7 @@ public class OrphanFilesClean { public OrphanFilesClean(FileStoreTable table) { this.snapshotManager = table.snapshotManager(); this.tagManager = table.tagManager(); + this.branchManager = table.branchManager(); this.fileIO = table.fileIO(); this.location = table.location(); this.partitionKeysNum = table.partitionKeys().size(); @@ -169,13 +172,15 @@ public List clean() throws IOException, ExecutionException, InterruptedExc return deleteFiles; } - /** Get all the files used by snapshots and tags. */ + /** Get all the files used by snapshots and tags and branches. */ private Set getUsedFiles() throws IOException, ExecutionException, InterruptedException { // safely get all snapshots to be read Set readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots()); List taggedSnapshots = tagManager.taggedSnapshots(); + Set branchSnapshots = branchManager.branchesCreateSnapshots().keySet(); readSnapshots.addAll(taggedSnapshots); + readSnapshots.addAll(branchSnapshots); readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs()); return FileUtils.COMMON_IO_FORK_JOIN_POOL From fb5308784bf2555ebf67ea5068f9a1566c3729df Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Mon, 29 Jul 2024 17:19:05 +0800 Subject: [PATCH 3/5] [core] fix comment. --- .../org/apache/paimon/branch/TableBranch.java | 0 .../apache/paimon/operation/TagDeletion.java | 7 + .../apache/paimon/utils/SnapshotManager.java | 14 +- .../paimon/operation/FileDeletionTest.java | 6 +- .../sink/BatchWriteGeneratorTagOperator.java | 6 +- .../apache/paimon/flink/BranchSqlITCase.java | 221 ++++++++++++------ 6 files changed, 177 insertions(+), 77 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java diff --git a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 1788876b718a..e79f194c8d7d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -19,6 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; @@ -92,4 +93,10 @@ public void cleanUnusedManifests(Snapshot taggedSnapshot, Set skippingSe // doesn't clean changelog files because they are handled by SnapshotDeletion cleanUnusedManifests(taggedSnapshot, skippingSet, true, false); } + + @VisibleForTesting + public Collection getDataFilesFromSnapshot(Snapshot snapshot) + throws IOException { + return readMergedDataFiles(snapshot); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 05c7ee03b984..f2d04c834aaf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -706,7 +706,7 @@ public static int findIndex(Snapshot targetSnapshot, List snapshotList } throw new RuntimeException( String.format( - "Didn't find tag with snapshot id '%s'.This is unexpected.", + "Didn't find snapshot id '%s' in the list, this is unexpected.", targetSnapshot.id())); } @@ -715,17 +715,17 @@ public static List findNearestNeighborsSnapshot( List skippedSnapshots = new ArrayList<>(); int index = SnapshotManager.findIndex(targetSnapshot, snapshotList); - // the left neighbor tag + // the nearest left neighbor snapshot. if (index - 1 >= 0) { skippedSnapshots.add(snapshotList.get(index - 1)); } - // the nearest right neighbor - Snapshot right = snapshotManager.earliestSnapshot(); + // the nearest right neighbor snapshot. + Snapshot nearestRight = snapshotManager.earliestSnapshot(); if (index + 1 < snapshotList.size()) { - Snapshot rightTag = snapshotList.get(index + 1); - right = right.id() < rightTag.id() ? right : rightTag; + Snapshot rightSnapshot = snapshotList.get(index + 1); + nearestRight = nearestRight.id() < rightSnapshot.id() ? nearestRight : rightSnapshot; } - skippedSnapshots.add(right); + skippedSnapshots.add(nearestRight); return skippedSnapshots; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index e7c1beacc037..1c08d679c02c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -388,6 +388,7 @@ public void testDeleteTagWithSnapshot() throws Exception { TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3); tagManager = new TagManager(fileIO, store.options().path()); SnapshotManager snapshotManager = store.snapshotManager(); + BranchManager branchManager = store.newBranchManager(); TestKeyValueGenerator gen = new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); BinaryRow partition = gen.getPartition(gen.next()); @@ -439,7 +440,7 @@ public void testDeleteTagWithSnapshot() throws Exception { "tag1", store.newTagDeletion(), snapshotManager, - store.newBranchManager(), + branchManager, Collections.emptyList()); // check data files @@ -466,6 +467,7 @@ public void testDeleteTagWithOtherTag() throws Exception { TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3); tagManager = new TagManager(fileIO, store.options().path()); SnapshotManager snapshotManager = store.snapshotManager(); + BranchManager branchManager = store.newBranchManager(); TestKeyValueGenerator gen = new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); BinaryRow partition = gen.getPartition(gen.next()); @@ -520,7 +522,7 @@ public void testDeleteTagWithOtherTag() throws Exception { "tag2", store.newTagDeletion(), snapshotManager, - store.newBranchManager(), + branchManager, Collections.emptyList()); // check data files diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index bbfc14efbf0e..77874eb53a71 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -150,13 +151,14 @@ private void expireTag() { } TagManager tagManager = table.tagManager(); TagDeletion tagDeletion = table.store().newTagDeletion(); + BranchManager branchManager = table.branchManager(); long tagCount = tagManager.tagCount(); while (tagCount > tagNumRetainedMax) { for (List tagNames : tagManager.tags().values()) { if (tagCount - tagNames.size() >= tagNumRetainedMax) { tagManager.deleteAllTagsOfOneSnapshot( - tagNames, tagDeletion, snapshotManager, table.branchManager()); + tagNames, tagDeletion, snapshotManager, branchManager); tagCount = tagCount - tagNames.size(); } else { List sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames); @@ -165,7 +167,7 @@ private void expireTag() { toBeDeleted, tagDeletion, snapshotManager, - table.branchManager(), + branchManager, table.store().createTagCallbacks()); tagCount--; if (tagCount == tagNumRetainedMax) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 34476f2e3cf4..9b649f7856c8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -24,8 +24,13 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -33,6 +38,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -304,7 +310,7 @@ public void testBranchFastForward() throws Exception { checkSnapshots(snapshotManager, 1, 2); } - /** Cleaning up snapshots or deleting tags should skip those referenced by branches. */ + /** Expiring Snapshots should skip those referenced by branches. */ @Test public void testSnapshotExpireSkipTheReferencedByBranches() throws Exception { @@ -320,6 +326,7 @@ public void testSnapshotExpireSkipTheReferencedByBranches() throws Exception { FileStoreTable table = paimonTable("T"); SnapshotManager snapshotManager = table.snapshotManager(); + BranchManager branchManager = table.branchManager(); sql("INSERT INTO T VALUES (1, 10, 'hunter')"); sql("INSERT INTO T VALUES (1, 20, 'hunter')"); @@ -333,19 +340,19 @@ public void testSnapshotExpireSkipTheReferencedByBranches() throws Exception { sql("CALL sys.create_branch('default.T', 'test', 1)"); sql("CALL sys.create_branch('default.T', 'test_1', 1)"); - // create tag2 from snapshot 2. - sql("CALL sys.create_tag('default.T', 'tag2', 2)"); - // create branch from tag2. - sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); + // create tag from snapshot 2. + sql("CALL sys.create_tag('default.T', 'tag', 2)"); + // create branch2 from tag. + sql("CALL sys.create_branch('default.T', 'test2', 'tag')"); sql("CALL sys.create_branch('default.T', 'test3', 3)"); - assertThat( - table.branchManager().branchesCreateSnapshots().keySet().stream() - .map(Snapshot::id)) + // We have created 4 branches using 3 snapshots. + assertThat(branchManager.branchesCreateSnapshots().keySet().stream().map(Snapshot::id)) .containsExactlyInAnyOrder(1L, 2L, 3L); - // Only retain 5,6 snapshot, 1-4 will expire. + // Only retain snapshot 5,6 and snapshot 1-4 will expire. So all referenced snapshots will + // expire. sql( "INSERT INTO T /*+ OPTIONS(" + "'snapshot.num-retained.min'= '2'," @@ -354,34 +361,26 @@ public void testSnapshotExpireSkipTheReferencedByBranches() throws Exception { checkSnapshots(snapshotManager, 5, 6); + branchManager = table.branchManager(); // Snapshot 1-4 has expired, but still be referenced by branches; - assertThat( - table.branchManager().branchesCreateSnapshots().keySet().stream() - .map(Snapshot::id)) + assertThat(branchManager.branchesCreateSnapshots().keySet().stream().map(Snapshot::id)) .containsExactlyInAnyOrder(1L, 2L, 3L); - checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 1); - - // Created from tag2 and snapshot id is 2. - checkSnapshots(paimonTable("T$branch_test2").snapshotManager(), 2, 2); - - // The data of snapshot 1-3 is still can be read by tag or branch, snapshot-4 had expire. + // The data of snapshot 1-3 is still can be read by branches. assertThat(collectResult("SELECT * FROM T$branch_test")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); - sql("CALL sys.delete_tag('default.T', 'tag2')"); assertThat(collectResult("SELECT * FROM T$branch_test2")) .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[1, 20, hunter]"); - // The branch still can be read. assertThat(collectResult("SELECT * FROM T$branch_test3")) .containsExactlyInAnyOrder( "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]"); } /** - * Let's say snapshot-1 has expired, but the manifest is still referenced by some branches, so - * deleting the tag should skip those snapshots that are still referenced by other branches. + * Let's say snapshot has expired, but the manifest is still referenced by some branches, so + * deleting the tag should skip those snapshots. */ @Test public void testDeleteTagsSkipTheReferencedByBranches() throws Exception { @@ -397,6 +396,7 @@ public void testDeleteTagsSkipTheReferencedByBranches() throws Exception { FileStoreTable table = paimonTable("T"); SnapshotManager snapshotManager = table.snapshotManager(); + BranchManager branchManager = table.branchManager(); sql("INSERT INTO T VALUES (1, 10, 'hunter')"); sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); @@ -406,10 +406,10 @@ public void testDeleteTagsSkipTheReferencedByBranches() throws Exception { // create branch from the snapshot-1. sql("CALL sys.create_branch('default.T', 'test', 1)"); - // create tag from snapshot 1. + // create tag from snapshot-1. sql("CALL sys.create_tag('default.T', 'tag', 1)"); - // Only retain 2,3 snapshot, snapshot-1 will be expired. + // Step1: Expire snapshot, only retain snapshot 2,3 and snapshot-1 will expire. sql( "INSERT INTO T /*+ OPTIONS(" + "'snapshot.num-retained.min'= '2'," @@ -417,24 +417,20 @@ public void testDeleteTagsSkipTheReferencedByBranches() throws Exception { + " VALUES (3, 30, 'hunter3')"); checkSnapshots(snapshotManager, 2, 3); + assertThat(branchManager.branchesCreateSnapshots().size()).isEqualTo(1); - assertThat(table.branchManager().branchesCreateSnapshots().size()).isEqualTo(1); - Snapshot branchCreateSnapshots = - table.branchManager().branchesCreateSnapshots().keySet().iterator().next(); - assertThat(branchCreateSnapshots.id()).isEqualTo(1L); + Snapshot referencedSnapshot = + branchManager.branchesCreateSnapshots().keySet().iterator().next(); + assertThat(referencedSnapshot.id()).isEqualTo(1L); // query branches data. assertThat(collectResult("SELECT * FROM T$branch_test")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); - // delete tag. + // Step2: Delete tag. sql("CALL sys.delete_tag('default.T', 'tag')"); - // Verify that the maniFest file has not been deleted. - assertThat(manifestFileExist("T", branchCreateSnapshots.baseManifestList())).isTrue(); - assertThat(manifestFileExist("T", branchCreateSnapshots.deltaManifestList())).isTrue(); - - // query branches data. + // Step3: Branches data still can be read. assertThat(collectResult("SELECT * FROM T$branch_test")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); } @@ -459,22 +455,22 @@ public void testDeleteBranchSkipTheReferencedByTags() throws Exception { checkSnapshots(snapshotManager, 1, 2); - // create branch from the snapshot-1. + // Create branch from the snapshot 1. sql("CALL sys.create_branch('default.T', 'test', 1)"); sql("CALL sys.create_tag('default.T', 'tag', 1)"); - // Only retain 2,3 snapshot, snapshot-1 will be expired. + // Only retain snapshot 2,3 and snapshot 1 will expire. sql( "INSERT INTO T /*+ OPTIONS(" + "'snapshot.num-retained.min'= '2'," + "'snapshot.num-retained.max'= '2' ) */" + " VALUES (3, 30, 'hunter3')"); - // delete tag. + // Delete branch. sql("CALL sys.delete_branch('default.T', 'test')"); - // tag still can be read. + // The tag still can be read. assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag') */")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); } @@ -500,22 +496,22 @@ public void testDeleteBranchSkipTheReferencedByBranches() throws Exception { checkSnapshots(snapshotManager, 1, 2); - // create branch from the snapshot-1. + // Create branch from the snapshot 1. sql("CALL sys.create_branch('default.T', 'test', 1)"); sql("CALL sys.create_branch('default.T', 'test2', 1)"); - // Only retain 2,3 snapshot, snapshot-1 will be expired. + // Only retain snapshot 2,3 and snapshot 1 will expire. sql( "INSERT INTO T /*+ OPTIONS(" + "'snapshot.num-retained.min'= '2'," + "'snapshot.num-retained.max'= '2' ) */" + " VALUES (3, 30, 'hunter3')"); - // delete branch. + // Delete branch test. sql("CALL sys.delete_branch('default.T', 'test')"); - // branch still can be read. + // branch test2 still can be read. assertThat(collectResult("SELECT * FROM T$branch_test2")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); } @@ -540,11 +536,25 @@ public void testDeleteBranchTriggerCleanSnapshots() throws Exception { checkSnapshots(snapshotManager, 1, 2); - // create branch from the snapshot-1. - sql("CALL sys.create_branch('default.T', 'test', 1)"); // this branch will be ignored. sql("CALL sys.create_branch('default.T', 'empty')"); + // Case1 : Deleting a branch does not affect unexpired snapshots. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + sql("CALL sys.delete_branch('default.T', 'test')"); + + assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + + // Case2 : Deleting a branch does not affect snapshots that are referenced by branches or + // tags. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + // Create branch and tag and reference snapshot 1. + sql("CALL sys.create_branch('default.T', 'test2', 1)"); + sql("CALL sys.create_tag('default.T', 'tag', 1)"); + // Only retain 2,3 snapshot, snapshot-1 will be expired. sql( "INSERT INTO T /*+ OPTIONS(" @@ -552,29 +562,58 @@ public void testDeleteBranchTriggerCleanSnapshots() throws Exception { + "'snapshot.num-retained.max'= '2' ) */" + " VALUES (3, 30, 'hunter3')"); - assertThat(table.branchManager().branchesCreateSnapshots().size()).isEqualTo(1); - Snapshot branchCreateSnapshots = - table.branchManager().branchesCreateSnapshots().keySet().iterator().next(); - assertThat(branchCreateSnapshots.id()).isEqualTo(1L); + sql("CALL sys.delete_branch('default.T', 'test')"); - // query branches data. - assertThat(collectResult("SELECT * FROM T$branch_test")) + // The branch test2 still can be read. + assertThat(collectResult("SELECT * FROM T$branch_test2")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + sql("CALL sys.delete_branch('default.T', 'test2')"); + + // The tag still can be read. + assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag') */")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + sql("CALL sys.delete_tag('default.T', 'tag')"); + + // Case 3: Deleting a branch will also clean up unreferenced and expired snapshots. + checkSnapshots(snapshotManager, 2, 3); + sql("CALL sys.create_branch('default.T', 'test', 2)"); + + // Only retain snapshot 3,4, snapshot 2 will expire. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (4, 40, 'hunter4')"); + checkSnapshots(snapshotManager, 3, 4); - // Verify that the maniFest file has not been deleted. - assertThat(manifestFileExist("T", branchCreateSnapshots.baseManifestList())).isTrue(); - assertThat(manifestFileExist("T", branchCreateSnapshots.deltaManifestList())).isTrue(); + BranchManager branchManager = table.branchManager(); + assertThat(branchManager.branches().keySet().stream().map(TableBranch::getBranchName)) + .containsExactlyInAnyOrder("test", "empty"); - // delete tag. + Snapshot snapshotsToClean = + branchManager.branchesCreateSnapshots().keySet().iterator().next(); + assertThat(snapshotsToClean.id()).isEqualTo(2L); + + // Query branches data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 20, hunter2]"); + + // Verify that the manifest files exists. + assertThat(manifestFileExist("T", snapshotsToClean.baseManifestList())).isTrue(); + assertThat(manifestFileExist("T", snapshotsToClean.deltaManifestList())).isTrue(); + + // Delete branch. sql("CALL sys.delete_branch('default.T', 'test')"); // Verify that the manifest file has been deleted. - assertThat(manifestFileExist("T", branchCreateSnapshots.baseManifestList())).isFalse(); - assertThat(manifestFileExist("T", branchCreateSnapshots.deltaManifestList())).isFalse(); + assertThat(manifestFileExist("T", snapshotsToClean.baseManifestList())).isFalse(); + assertThat(manifestFileExist("T", snapshotsToClean.deltaManifestList())).isFalse(); + + assertThat(branchManager.branchesCreateSnapshots().isEmpty()).isTrue(); } @Test - public void testDeleteBranchCleanUnusedDataFileAndManifest() throws Exception { + public void testDeleteBranchCleanUnusedDataFiles() throws Exception { sql( "CREATE TABLE T (" + " pt INT" @@ -582,11 +621,13 @@ public void testDeleteBranchCleanUnusedDataFileAndManifest() throws Exception { + ", v STRING" + ", PRIMARY KEY (pt, k) NOT ENFORCED" + " ) PARTITIONED BY (pt) WITH (" - + " 'bucket' = '2'" + + " 'bucket' = '1'" + " )"); FileStoreTable table = paimonTable("T"); SnapshotManager snapshotManager = table.snapshotManager(); + TagManager tagManager = table.tagManager(); + TagDeletion tagDeletion = table.store().newTagDeletion(); sql("INSERT INTO T VALUES (1, 10, 'hunter')"); sql("INSERT INTO T VALUES (2, 20, 'hunter2')"); @@ -595,33 +636,63 @@ public void testDeleteBranchCleanUnusedDataFileAndManifest() throws Exception { checkSnapshots(snapshotManager, 1, 4); + // The following three snapshots will expire. sql("CALL sys.create_branch('default.T', 'test', 1)"); sql("CALL sys.create_branch('default.T', 'test2', 2)"); - sql("CALL sys.create_tag('default.T', 'tag', 3)"); + sql("CALL sys.create_tag('default.T', 'tag3', 3)"); - // Only retain 6 snapshot, snapshot 1-5 will be expired. - // here need update all the partition data. + // Only retain snapshot-6 and snapshot 1-5 will expire. + // Here we need to update all the partition data, the snapshot-6 is a compaction snapshot. + // Snapshot 6 records which data files should be deleted. sql( "INSERT INTO T /*+ OPTIONS(" + "'full-compaction.delta-commits'='1'," + "'snapshot.num-retained.min'= '1'," + "'snapshot.num-retained.max'= '1' ) */" - + " VALUES (1, 10, 'hunter'),(2, 20, 'hunter2'),(3, 30, 'hunter3'),(4, 40, 'hunter4')"); + + " VALUES (1, 10, 'hunter')" + + ",(2, 20, 'hunter2')" + + ",(3, 30, 'hunter3')" + + ",(4, 40, 'hunter4')"); checkSnapshots(snapshotManager, 6, 6); // delete branch should skip the nearest left neighbor snapshot and the nearest right - // neighbor. + // neighbor [branch_test, tag3]. sql("CALL sys.delete_branch('default.T', 'test2')"); - // tag still can be read. - assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag') */")) + // The tag still can be read. + assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag3') */")) .containsExactlyInAnyOrder( "+I[1, 10, hunter]", "+I[2, 20, hunter2]", "+I[3, 30, hunter3]"); - // query branches data. + // The branch-test still can be read. assertThat(collectResult("SELECT * FROM T$branch_test")) .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + + sql("CALL sys.delete_branch('default.T', 'test')"); + + // The tag still can be read. + assertThat(collectResult("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag3') */")) + .containsExactlyInAnyOrder( + "+I[1, 10, hunter]", "+I[2, 20, hunter2]", "+I[3, 30, hunter3]"); + + Snapshot taggedSnapshot = tagManager.taggedSnapshot("tag3"); + Collection manifestEntries = + tagDeletion.getDataFilesFromSnapshot(taggedSnapshot); + + assertThat(manifestEntries).allMatch(entry -> dataFileExist("T", entry)); + + sql("CALL sys.delete_tag('default.T', 'tag3')"); + + // All unused datafiles should be deleted. + assertThat(manifestEntries).allMatch(entry -> !dataFileExist("T", entry)); + + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder( + "+I[1, 10, hunter]", + "+I[2, 20, hunter2]", + "+I[3, 30, hunter3]", + "+I[4, 40, hunter4]"); } @Test @@ -712,4 +783,22 @@ private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws private boolean manifestFileExist(String tableName, String manifest) throws IOException { return fileIO.exists(new Path(getTableDirectory(tableName) + "/manifest/" + manifest)); } + + private boolean dataFileExist(String tableName, ManifestEntry entry) { + if (entry.kind() == FileKind.ADD) { + try { + String partition = String.valueOf(entry.partition().getInt(0)); + return fileIO.exists( + new Path( + String.format( + getTableDirectory(tableName) + "/pt=%s/bucket-%s/%s", + partition, + entry.bucket(), + entry.file().fileName()))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return true; + } } From cb8cb7c73c0b3ff5246dd13598c7f62eff62eb94 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Thu, 1 Aug 2024 15:10:09 +0800 Subject: [PATCH 4/5] [core] add method extractDataFilePaths to parent class FileDeletionBase. --- .../paimon/operation/BranchDeletion.java | 18 ++---------------- .../paimon/operation/FileDeletionBase.java | 17 +++++++++++++++++ .../apache/paimon/operation/TagDeletion.java | 16 +--------------- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java index 90f8e8d72e38..cedff581b2b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BranchDeletion.java @@ -20,7 +20,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -33,7 +32,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashSet; import java.util.Set; import java.util.function.Predicate; @@ -68,23 +66,11 @@ public void cleanUnusedDataFiles(Snapshot snapshotToClean, Predicate dataFileToDelete = new HashSet<>(); - for (ManifestEntry entry : manifestEntries) { - if (!skipper.test(entry)) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - dataFileToDelete.add(new Path(bucketPath, entry.file().fileName())); - for (String file : entry.file().extraFiles()) { - dataFileToDelete.add(new Path(bucketPath, file)); - } - - recordDeletionBuckets(entry); - } - } - deleteFiles(dataFileToDelete, fileIO::deleteQuietly); + deleteFiles(extractDataFilePaths(manifestEntries, skipper), fileIO::deleteQuietly); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index e6d7b7acac70..4a3b21b82795 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -488,4 +488,21 @@ protected void deleteFiles(Collection files, Consumer deletion) { throw new RuntimeException(e); } } + + public Set extractDataFilePaths( + Collection manifestEntries, Predicate skipper) { + Set dataFileToDelete = new HashSet<>(); + for (ManifestEntry entry : manifestEntries) { + if (!skipper.test(entry)) { + Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); + dataFileToDelete.add(new Path(bucketPath, entry.file().fileName())); + for (String file : entry.file().extraFiles()) { + dataFileToDelete.add(new Path(bucketPath, file)); + } + + recordDeletionBuckets(entry); + } + } + return dataFileToDelete; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index e79f194c8d7d..1bbd0c902d80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -21,7 +21,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -34,7 +33,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.HashSet; import java.util.Set; import java.util.function.Predicate; @@ -73,19 +71,7 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate dataFileToDelete = new HashSet<>(); - for (ManifestEntry entry : manifestEntries) { - if (!skipper.test(entry)) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - dataFileToDelete.add(new Path(bucketPath, entry.file().fileName())); - for (String file : entry.file().extraFiles()) { - dataFileToDelete.add(new Path(bucketPath, file)); - } - - recordDeletionBuckets(entry); - } - } - deleteFiles(dataFileToDelete, fileIO::deleteQuietly); + deleteFiles(extractDataFilePaths(manifestEntries, skipper), fileIO::deleteQuietly); } @Override From c4557baf7c67b0a642915f4810469053a54d8f7b Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Tue, 6 Aug 2024 10:43:09 +0800 Subject: [PATCH 5/5] [core] rebase master. --- .../org/apache/paimon/branch/TableBranch.java | 0 .../apache/paimon/utils/BranchManager.java | 69 ++++++++----------- .../apache/paimon/flink/BranchSqlITCase.java | 13 ++-- 3 files changed, 37 insertions(+), 45 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java diff --git a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 7762c00b736b..673f5a16bdd8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -19,7 +19,6 @@ package org.apache.paimon.utils; import org.apache.paimon.Snapshot; -import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -27,9 +26,9 @@ import org.apache.paimon.operation.BranchDeletion; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; - import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +37,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Predicate; @@ -229,7 +227,7 @@ public void deleteBranch(String branchName) { if (snapshotToClean != null) { if (!snapshotManager.snapshotExists(snapshotToClean.id())) { - SortedMap> branchReferenceSnapshotsMap = + SortedMap> branchReferenceSnapshotsMap = branchesCreateSnapshots(); // If the snapshotToClean is not referenced by other branches or tags, we need // to do clean the dataFiles and manifestFiles. @@ -399,47 +397,36 @@ public SortedMap> branchesCreateSnapshots() { for (String branchName : branches()) { Snapshot branchCreateSnapshot = snapshotManager.copyWithBranch(branchName).earliestSnapshot(); - if (branchCreateSnapshot == null) { - // Support empty branch. - branchSnapshots.put(new TableBranch(branchName, path.getValue()), null); - continue; - } - FileStoreTable branchTable = - FileStoreTableFactory.create( - fileIO, new Path(branchPath(tablePath, branchName))); - SortedMap> snapshotTags = branchTable.tagManager().tags(); - Snapshot earliestSnapshot = branchTable.snapshotManager().earliestSnapshot(); - if (snapshotTags.isEmpty()) { - // Create based on snapshotId. - branchSnapshots.put( - new TableBranch(branchName, earliestSnapshot.id(), path.getValue()), - earliestSnapshot); - } else { - Snapshot snapshot = snapshotTags.firstKey(); - // current branch is create from tag. - if (earliestSnapshot.id() == snapshot.id()) { - List tags = snapshotTags.get(snapshot); - checkArgument(tags.size() == 1); - branchSnapshots.put( - new TableBranch( - branchName, tags.get(0), snapshot.id(), path.getValue()), - snapshot); - } else { + // Ignore the empty branch. + if (branchCreateSnapshot != null) { + FileStoreTable branchTable = + FileStoreTableFactory.create( + fileIO, new Path(branchPath(tablePath, branchName))); + SortedMap> snapshotTags = branchTable.tagManager().tags(); + Snapshot earliestSnapshot = branchTable.snapshotManager().earliestSnapshot(); + if (snapshotTags.isEmpty()) { // Create based on snapshotId. - branchSnapshots.put( - new TableBranch(branchName, earliestSnapshot.id(), path.getValue()), - earliestSnapshot); + sortedSnapshots + .computeIfAbsent(earliestSnapshot, s -> new ArrayList<>()) + .add(branchName); + } else { + Snapshot snapshot = snapshotTags.firstKey(); + // current branch is create from tag. + if (earliestSnapshot.id() == snapshot.id()) { + List tags = snapshotTags.get(snapshot); + checkArgument(tags.size() == 1); + sortedSnapshots + .computeIfAbsent(snapshot, s -> new ArrayList<>()) + .add(branchName); + } else { + // Create based on snapshotId. + sortedSnapshots + .computeIfAbsent(earliestSnapshot, s -> new ArrayList<>()) + .add(branchName); + } } } } - - for (Map.Entry snapshotEntry : branches().entrySet()) { - if (snapshotEntry.getValue() != null) { - sortedSnapshots - .computeIfAbsent(snapshotEntry.getValue(), s -> new ArrayList<>()) - .add(snapshotEntry.getKey()); - } - } return sortedSnapshots; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 9b649f7856c8..035a3e84e549 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -19,8 +19,6 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; -import org.apache.paimon.branch.TableBranch; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -555,6 +553,15 @@ public void testDeleteBranchTriggerCleanSnapshots() throws Exception { sql("CALL sys.create_branch('default.T', 'test2', 1)"); sql("CALL sys.create_tag('default.T', 'tag', 1)"); + // Snapshot-1 is referenced by branch [test, test2]. + assertThat( + table.branchManager().branchesCreateSnapshots().entrySet().stream() + .filter(x -> x.getKey().id() == 1L) + .findFirst() + .get() + .getValue()) + .containsExactlyInAnyOrder("test", "test2"); + // Only retain 2,3 snapshot, snapshot-1 will be expired. sql( "INSERT INTO T /*+ OPTIONS(" @@ -587,8 +594,6 @@ public void testDeleteBranchTriggerCleanSnapshots() throws Exception { checkSnapshots(snapshotManager, 3, 4); BranchManager branchManager = table.branchManager(); - assertThat(branchManager.branches().keySet().stream().map(TableBranch::getBranchName)) - .containsExactlyInAnyOrder("test", "empty"); Snapshot snapshotsToClean = branchManager.branchesCreateSnapshots().keySet().iterator().next();