From 11fc94b32894eff842ecbbad74e70791fb79cb76 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Fri, 19 Jul 2024 21:27:51 +0800 Subject: [PATCH] [core] Expire snapshot should skip branches create snapshot. --- .../org/apache/paimon/AbstractFileStore.java | 7 + .../java/org/apache/paimon/FileStore.java | 3 + .../paimon/operation/FileDeletionBase.java | 2 +- .../paimon/privilege/PrivilegedFileStore.java | 7 + .../paimon/table/AbstractFileStoreTable.java | 7 +- .../paimon/table/ExpireChangelogImpl.java | 18 +- .../paimon/table/ExpireSnapshotsImpl.java | 18 +- .../apache/paimon/utils/BranchManager.java | 14 + .../org/apache/paimon/utils/TagManager.java | 26 +- .../java/org/apache/paimon/TestFileStore.java | 9 +- .../paimon/operation/FileDeletionTest.java | 6 +- .../apache/paimon/flink/BranchSqlITCase.java | 296 ++++++++++++++++-- 12 files changed, 354 insertions(+), 59 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index e33463d577c7..4a4d25ed6d52 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -46,6 +46,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -240,6 +241,12 @@ public TagManager newTagManager() { return new TagManager(fileIO, options.path()); } + @Override + public BranchManager newBranchManager() { + return new BranchManager( + fileIO, options.path(), snapshotManager(), newTagManager(), schemaManager); + } + @Override public TagDeletion newTagDeletion() { return new TagDeletion( diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index e943d38cf5e1..06b0ff78a4cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -86,6 +87,8 @@ public interface FileStore extends Serializable { TagManager newTagManager(); + BranchManager newBranchManager(); + TagDeletion newTagDeletion(); @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 7c8d6656b559..44300a421246 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -329,7 +329,7 @@ protected void cleanUnusedManifests( public Predicate dataFileSkipper( List taggedSnapshots, long expiringSnapshotId) throws Exception { - int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId); + int index = TagManager.findPreviousSnapshot(taggedSnapshots, expiringSnapshotId); // refresh tag data files if (index >= 0) { Snapshot previousTag = taggedSnapshots.get(index); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index ca2ad04a232d..bf1b541bd831 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -157,6 +158,12 @@ public TagManager newTagManager() { return wrapped.newTagManager(); } + @Override + public BranchManager newBranchManager() { + privilegeChecker.assertCanInsert(identifier); + return wrapped.newBranchManager(); + } + @Override public TagDeletion newTagDeletion() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 0171ff677dd5..2c51a55eab41 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -339,13 +339,16 @@ public SnapshotManager snapshotManager() { @Override public ExpireSnapshots newExpireSnapshots() { return new ExpireSnapshotsImpl( - snapshotManager(), store().newSnapshotDeletion(), store().newTagManager()); + snapshotManager(), + store().newSnapshotDeletion(), + store().newTagManager(), + branchManager()); } @Override public ExpireSnapshots newExpireChangelog() { return new ExpireChangelogImpl( - snapshotManager(), tagManager(), store().newChangelogDeletion()); + snapshotManager(), tagManager(), store().newChangelogDeletion(), branchManager()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 6d5e76d5b85b..9a71342d360d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -24,6 +24,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -33,8 +34,11 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.TreeSet; import java.util.function.Predicate; /** Cleanup the changelog in changelog directory. */ @@ -46,15 +50,18 @@ public class ExpireChangelogImpl implements ExpireSnapshots { private final ConsumerManager consumerManager; private final ChangelogDeletion changelogDeletion; private final TagManager tagManager; + private final BranchManager branchManager; private ExpireConfig expireConfig; public ExpireChangelogImpl( SnapshotManager snapshotManager, TagManager tagManager, - ChangelogDeletion changelogDeletion) { + ChangelogDeletion changelogDeletion, + BranchManager branchManager) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; + this.branchManager = branchManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); this.changelogDeletion = changelogDeletion; @@ -131,10 +138,13 @@ public int expireUntil(long earliestId, long endExclusiveId) { LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); } - List taggedSnapshots = tagManager.taggedSnapshots(); + TreeSet sortedSnapshots = new TreeSet<>(Comparator.comparingLong(Snapshot::id)); + sortedSnapshots.addAll(tagManager.taggedSnapshots()); + sortedSnapshots.addAll(branchManager.branchSnapshots()); + List retainedSnapshots = new ArrayList<>(sortedSnapshots); List skippingSnapshots = - TagManager.findOverlappedSnapshots(taggedSnapshots, earliestId, endExclusiveId); + TagManager.findOverlappedSnapshots(retainedSnapshots, earliestId, endExclusiveId); skippingSnapshots.add(snapshotManager.changelog(endExclusiveId)); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { @@ -144,7 +154,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { Changelog changelog = snapshotManager.longLivedChangelog(id); Predicate skipper; try { - skipper = changelogDeletion.dataFileSkipper(taggedSnapshots, id); + skipper = changelogDeletion.dataFileSkipper(retainedSnapshots, id); } catch (Exception e) { LOG.info( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 73fac37e6780..710cc12e5fda 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -34,8 +35,11 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.TreeSet; import java.util.function.Predicate; /** An implementation for {@link ExpireSnapshots}. */ @@ -47,18 +51,21 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { private final ConsumerManager consumerManager; private final SnapshotDeletion snapshotDeletion; private final TagManager tagManager; + private final BranchManager branchManager; private ExpireConfig expireConfig; public ExpireSnapshotsImpl( SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion, - TagManager tagManager) { + TagManager tagManager, + BranchManager branchManager) { this.snapshotManager = snapshotManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); this.snapshotDeletion = snapshotDeletion; this.tagManager = tagManager; + this.branchManager = branchManager; this.expireConfig = ExpireConfig.builder().build(); } @@ -150,7 +157,10 @@ public int expireUntil(long earliestId, long endExclusiveId) { "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); } - List taggedSnapshots = tagManager.taggedSnapshots(); + TreeSet sortedSnapshots = new TreeSet<>(Comparator.comparingLong(Snapshot::id)); + sortedSnapshots.addAll(tagManager.taggedSnapshots()); + sortedSnapshots.addAll(branchManager.branchSnapshots()); + List retainedSnapshots = new ArrayList<>(sortedSnapshots); // delete merge tree files // deleted merge tree files in a snapshot are not used by the next snapshot, so the range of @@ -163,7 +173,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { // expire merge tree files and collect changed buckets Predicate skipper; try { - skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, id); + skipper = snapshotDeletion.dataFileSkipper(retainedSnapshots, id); } catch (Exception e) { LOG.info( String.format( @@ -196,7 +206,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { // delete manifests and indexFiles List skippingSnapshots = TagManager.findOverlappedSnapshots( - taggedSnapshots, beginInclusiveId, endExclusiveId); + retainedSnapshots, beginInclusiveId, endExclusiveId); skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId)); Set skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots); for (long id = beginInclusiveId; id < endExclusiveId; id++) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 6ff8d4c2a2e0..f70bc3eb533d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -364,4 +364,18 @@ public List branches() { throw new RuntimeException(e); } } + + /** Get all branch createdFromSnapshots. */ + public List branchSnapshots() { + ArrayList snapshotList = new ArrayList<>(); + branches() + .forEach( + b -> { + long createdFromSnapshot = b.getCreatedFromSnapshot(); + if (snapshotManager.snapshotExists(createdFromSnapshot)) { + snapshotList.add(snapshotManager.snapshot(createdFromSnapshot)); + } + }); + return snapshotList; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 65c6c232dafd..0ceaafdb7992 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -368,31 +368,31 @@ private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { } public static List findOverlappedSnapshots( - List taggedSnapshots, long beginInclusive, long endExclusive) { - List snapshots = new ArrayList<>(); - int right = findPreviousTag(taggedSnapshots, endExclusive); + List snapshots, long beginInclusive, long endExclusive) { + List overlappedSnapshots = new ArrayList<>(); + int right = findPreviousSnapshot(snapshots, endExclusive); if (right >= 0) { - int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0); + int left = Math.max(findPreviousOrEqualSnapshot(snapshots, beginInclusive), 0); for (int i = left; i <= right; i++) { - snapshots.add(taggedSnapshots.get(i)); + overlappedSnapshots.add(snapshots.get(i)); } } - return snapshots; + return overlappedSnapshots; } - public static int findPreviousTag(List taggedSnapshots, long targetSnapshotId) { - for (int i = taggedSnapshots.size() - 1; i >= 0; i--) { - if (taggedSnapshots.get(i).id() < targetSnapshotId) { + public static int findPreviousSnapshot(List snapshots, long targetSnapshotId) { + for (int i = snapshots.size() - 1; i >= 0; i--) { + if (snapshots.get(i).id() < targetSnapshotId) { return i; } } return -1; } - private static int findPreviousOrEqualTag( - List taggedSnapshots, long targetSnapshotId) { - for (int i = taggedSnapshots.size() - 1; i >= 0; i--) { - if (taggedSnapshots.get(i).id() <= targetSnapshotId) { + private static int findPreviousOrEqualSnapshot( + List snapshots, long targetSnapshotId) { + for (int i = snapshots.size() - 1; i >= 0; i--) { + if (snapshots.get(i).id() <= targetSnapshotId) { return i; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index db822d7be57a..26455b78b675 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -158,7 +158,8 @@ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long mi return new ExpireSnapshotsImpl( snapshotManager(), newSnapshotDeletion(), - new TagManager(fileIO, options.path())) + new TagManager(fileIO, options.path()), + newBranchManager()) .config( ExpireConfig.builder() .snapshotRetainMax(numRetainedMax) @@ -171,7 +172,8 @@ public ExpireSnapshots newExpire(ExpireConfig expireConfig) { return new ExpireSnapshotsImpl( snapshotManager(), newSnapshotDeletion(), - new TagManager(fileIO, options.path())) + new TagManager(fileIO, options.path()), + newBranchManager()) .config(expireConfig); } @@ -180,7 +182,8 @@ public ExpireSnapshots newChangelogExpire(ExpireConfig config) { new ExpireChangelogImpl( snapshotManager(), new TagManager(fileIO, options.path()), - newChangelogDeletion()); + newChangelogDeletion(), + newBranchManager()); impl.config(config); return impl; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index b6c80ad74983..b89d7cbb675b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -673,7 +673,11 @@ public void testExpireWithDeletingTags() throws Exception { // action: expire snapshot 1 -> delete tag1 -> expire snapshot 2 // result: exist A & B (because of tag2) ExpireSnapshots expireSnapshots = - new ExpireSnapshotsImpl(snapshotManager, store.newSnapshotDeletion(), tagManager); + new ExpireSnapshotsImpl( + snapshotManager, + store.newSnapshotDeletion(), + tagManager, + store.newBranchManager()); expireSnapshots .config( ExpireConfig.builder() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index be0a2e8058c2..c08829bac456 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -18,55 +18,77 @@ package org.apache.paimon.flink; -import org.apache.paimon.flink.util.AbstractTestBase; +import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.TableBranch; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for table with branches using SQL. */ -public class BranchSqlITCase extends AbstractTestBase { - - @TempDir java.nio.file.Path tempDir; +public class BranchSqlITCase extends CatalogITCaseBase { @Test - public void testAlterTable() throws Exception { - TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); - tEnv.executeSql( - "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + tempDir + "' )"); - tEnv.executeSql("USE CATALOG mycat"); - tEnv.executeSql( - "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) " - + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )"); - - tEnv.executeSql( - "INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana'), (2, 10, 'cat'), (2, 20, 'dog')") - .await(); - tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)"); - tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20, 'DOG'), (2, 30, 'horse')") - .await(); - - tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await(); - tEnv.executeSql( - "INSERT INTO `t$branch_test` VALUES " - + "(1, 10, 'cherry', 100), (2, 20, 'bird', 200), (2, 40, 'wolf', 400)") - .await(); - - assertThat(collectResult(tEnv, "SELECT * FROM t")) + public void testAlterBranchTable() throws Exception { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + sql( + "INSERT INTO T VALUES" + + " (1, 10, 'apple')," + + " (1, 20, 'banana')," + + " (2, 10, 'cat')," + + " (2, 20, 'dog')"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + FileStoreTable branchTable = paimonTable("T$branch_test"); + assertThat(branchTable.schema().fields().size()).isEqualTo(3); + + sql( + "INSERT INTO T VALUES" + + " (1, 10, 'APPLE')," + + " (2, 20, 'DOG')," + + " (2, 30, 'horse')"); + + // Add v2 column for branch table. + sql("ALTER TABLE `T$branch_test` ADD (v2 INT)"); + + branchTable = paimonTable("T$branch_test"); + assertThat(branchTable.schema().fields().size()).isEqualTo(4); + + sql( + "INSERT INTO `T$branch_test` VALUES " + + "(1, 10, 'cherry', 100)" + + ", (2, 20, 'bird', 200)" + + ", (2, 40, 'wolf', 400)"); + + assertThat(collectResult("SELECT * FROM T")) .containsExactlyInAnyOrder( "+I[1, 10, APPLE]", "+I[1, 20, banana]", "+I[2, 30, horse]", "+I[2, 10, cat]", "+I[2, 20, DOG]"); - assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test")) + + assertThat(collectResult("SELECT * FROM T$branch_test")) .containsExactlyInAnyOrder( "+I[1, 10, cherry, 100]", "+I[1, 20, banana, null]", @@ -75,7 +97,213 @@ public void testAlterTable() throws Exception { "+I[2, 40, wolf, 400]"); } - private List collectResult(TableEnvironment tEnv, String sql) throws Exception { + @Test + public void testCreateBranchFromTag() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')"); + // snapshot 2. + sql("INSERT INTO T VALUES" + " (2, 10, 'cat')," + " (2, 20, 'dog')"); + + sql("CALL sys.create_tag('default.T', 'tag1', 1)"); + sql("CALL sys.create_tag('default.T', 'tag2', 2)"); + + sql("CALL sys.create_branch('default.T', 'test', 'tag1')"); + sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); + + FileStoreTable branchTable = paimonTable("T$branch_test"); + assertThat(branchTable.tagManager().tagExists("tag1")).isEqualTo(true); + + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, banana]"); + + FileStoreTable branchTable2 = paimonTable("T$branch_test2"); + assertThat(branchTable2.tagManager().tagExists("tag2")).isEqualTo(true); + + assertThat(collectResult("SELECT * FROM T$branch_test2")) + .containsExactlyInAnyOrder( + "+I[1, 10, apple]", + "+I[1, 20, banana]", + "+I[2, 10, cat]", + "+I[2, 20, dog]"); + } + + @Test + public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES(1, 10, 'apple')"); + + // snapshot 2. + sql("INSERT INTO T VALUES(1, 20, 'dog')"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + + FileStoreTable table = paimonTable("T"); + + assertThat(table.branchManager().branchSnapshots().stream().map(Snapshot::id)) + .containsExactlyInAnyOrder(1L, 2L); + + assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1)) + .isEqualTo(true); + + assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2)) + .isEqualTo(true); + } + + @Test + public void testDeleteBranchTable() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES(1, 10, 'apple')"); + + // snapshot 2. + sql("INSERT INTO T VALUES(1, 20, 'dog')"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + + FileStoreTable table = paimonTable("T"); + + assertThat(table.branchManager().branchSnapshots().stream().map(Snapshot::id)) + .containsExactlyInAnyOrder(1L, 2L); + + assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName)) + .containsExactlyInAnyOrder("test", "test2"); + + sql("CALL sys.delete_branch('default.T', 'test')"); + + assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName)) + .containsExactlyInAnyOrder("test2"); + } + + @Test + public void testBranchManagerGetBranchSnapshotsList() + throws Catalog.TableNotExistException, IOException { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + sql("INSERT INTO T VALUES (1, 10, 'hxh')"); + sql("INSERT INTO T VALUES (1, 20, 'hxh')"); + sql("INSERT INTO T VALUES (1, 30, 'hxh')"); + + FileStoreTable table = paimonTable("T"); + checkSnapshots(table.snapshotManager(), 1, 3); + + sql("CALL sys.create_branch('default.T', 'test1', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + sql("CALL sys.create_branch('default.T', 'test3', 3)"); + + assertThat(table.branchManager().branchSnapshots().stream().map(Snapshot::id)) + .containsExactlyInAnyOrder(1L, 2L, 3L); + } + + /** Expire snapshots should skip branches createdFromSnapshots. */ + @Test + public void testBranchesCreateSnapshotExpire() throws Exception { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (1, 20, 'hunter')"); + sql("INSERT INTO T VALUES (1, 30, 'hunter')"); + sql("INSERT INTO T VALUES (1, 40, 'hunter')"); + sql("INSERT INTO T VALUES (1, 50, 'hunter')"); + + checkSnapshots(snapshotManager, 1, 5); + + // create branch from snapshot 1,3. + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test3', 3)"); + + // create tag2 from snapshot 2. + sql("CALL sys.create_tag('default.T', 'tag2', 2)"); + // create branch from tag2. + sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); + + assertThat(table.branchManager().branchSnapshots().stream().map(Snapshot::id)) + .containsExactlyInAnyOrder(1L, 2L, 3L); + + // Only retain 5,6 snapshot, 1-4 will expire. + sql( + "INSERT INTO T /*+ OPTIONS(" + + "'snapshot.num-retained.min'= '2'," + + "'snapshot.num-retained.max'= '2' ) */" + + " VALUES (1, 60, 'hunter')"); + + checkSnapshots(snapshotManager, 5, 6); + + // Snapshot 1-4 has expired. + assertThat(table.branchManager().branchSnapshots().stream().map(Snapshot::id)).isEmpty(); + + assertThat(paimonTable("T$branch_test").snapshotManager().latestSnapshotId()).isEqualTo(1L); + assertThat(paimonTable("T$branch_test3").snapshotManager().latestSnapshotId()) + .isEqualTo(3L); + + // Created from tag2 and snapshot id is 2. + assertThat(paimonTable("T$branch_test2").snapshotManager().latestSnapshotId()) + .isEqualTo(2L); + + // query data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]"); + + assertThat(collectResult("SELECT * FROM T$branch_test2")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[1, 20, hunter]"); + + assertThat(collectResult("SELECT * FROM T$branch_test3")) + .containsExactlyInAnyOrder( + "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]"); + } + + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) { while (it.hasNext()) { @@ -84,4 +312,10 @@ private List collectResult(TableEnvironment tEnv, String sql) throws Exc } return result; } + + private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException { + assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1); + assertThat(sm.earliestSnapshotId()).isEqualTo(earliest); + assertThat(sm.latestSnapshotId()).isEqualTo(latest); + } }