Skip to content

Commit

Permalink
[core] Expire snapshot should skip branches create snapshot.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Jul 20, 2024
1 parent f0a082c commit 11fc94b
Show file tree
Hide file tree
Showing 12 changed files with 354 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -240,6 +241,12 @@ public TagManager newTagManager() {
return new TagManager(fileIO, options.path());
}

@Override
public BranchManager newBranchManager() {
return new BranchManager(
fileIO, options.path(), snapshotManager(), newTagManager(), schemaManager);
}

@Override
public TagDeletion newTagDeletion() {
return new TagDeletion(
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -86,6 +87,8 @@ public interface FileStore<T> extends Serializable {

TagManager newTagManager();

BranchManager newBranchManager();

TagDeletion newTagDeletion();

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ protected void cleanUnusedManifests(

public Predicate<ManifestEntry> dataFileSkipper(
List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws Exception {
int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId);
int index = TagManager.findPreviousSnapshot(taggedSnapshots, expiringSnapshotId);
// refresh tag data files
if (index >= 0) {
Snapshot previousTag = taggedSnapshots.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -157,6 +158,12 @@ public TagManager newTagManager() {
return wrapped.newTagManager();
}

@Override
public BranchManager newBranchManager() {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newBranchManager();
}

@Override
public TagDeletion newTagDeletion() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,16 @@ public SnapshotManager snapshotManager() {
@Override
public ExpireSnapshots newExpireSnapshots() {
return new ExpireSnapshotsImpl(
snapshotManager(), store().newSnapshotDeletion(), store().newTagManager());
snapshotManager(),
store().newSnapshotDeletion(),
store().newTagManager(),
branchManager());
}

@Override
public ExpireSnapshots newExpireChangelog() {
return new ExpireChangelogImpl(
snapshotManager(), tagManager(), store().newChangelogDeletion());
snapshotManager(), tagManager(), store().newChangelogDeletion(), branchManager());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand All @@ -33,8 +34,11 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;

/** Cleanup the changelog in changelog directory. */
Expand All @@ -46,15 +50,18 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
private final ConsumerManager consumerManager;
private final ChangelogDeletion changelogDeletion;
private final TagManager tagManager;
private final BranchManager branchManager;

private ExpireConfig expireConfig;

public ExpireChangelogImpl(
SnapshotManager snapshotManager,
TagManager tagManager,
ChangelogDeletion changelogDeletion) {
ChangelogDeletion changelogDeletion,
BranchManager branchManager) {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.branchManager = branchManager;
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
this.changelogDeletion = changelogDeletion;
Expand Down Expand Up @@ -131,10 +138,13 @@ public int expireUntil(long earliestId, long endExclusiveId) {
LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")");
}

List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
TreeSet<Snapshot> sortedSnapshots = new TreeSet<>(Comparator.comparingLong(Snapshot::id));
sortedSnapshots.addAll(tagManager.taggedSnapshots());
sortedSnapshots.addAll(branchManager.branchSnapshots());
List<Snapshot> retainedSnapshots = new ArrayList<>(sortedSnapshots);

List<Snapshot> skippingSnapshots =
TagManager.findOverlappedSnapshots(taggedSnapshots, earliestId, endExclusiveId);
TagManager.findOverlappedSnapshots(retainedSnapshots, earliestId, endExclusiveId);
skippingSnapshots.add(snapshotManager.changelog(endExclusiveId));
Set<String> manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
Expand All @@ -144,7 +154,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
Changelog changelog = snapshotManager.longLivedChangelog(id);
Predicate<ManifestEntry> skipper;
try {
skipper = changelogDeletion.dataFileSkipper(taggedSnapshots, id);
skipper = changelogDeletion.dataFileSkipper(retainedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand All @@ -34,8 +35,11 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;

/** An implementation for {@link ExpireSnapshots}. */
Expand All @@ -47,18 +51,21 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
private final ConsumerManager consumerManager;
private final SnapshotDeletion snapshotDeletion;
private final TagManager tagManager;
private final BranchManager branchManager;

private ExpireConfig expireConfig;

public ExpireSnapshotsImpl(
SnapshotManager snapshotManager,
SnapshotDeletion snapshotDeletion,
TagManager tagManager) {
TagManager tagManager,
BranchManager branchManager) {
this.snapshotManager = snapshotManager;
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
this.snapshotDeletion = snapshotDeletion;
this.tagManager = tagManager;
this.branchManager = branchManager;
this.expireConfig = ExpireConfig.builder().build();
}

Expand Down Expand Up @@ -150,7 +157,10 @@ public int expireUntil(long earliestId, long endExclusiveId) {
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
TreeSet<Snapshot> sortedSnapshots = new TreeSet<>(Comparator.comparingLong(Snapshot::id));
sortedSnapshots.addAll(tagManager.taggedSnapshots());
sortedSnapshots.addAll(branchManager.branchSnapshots());
List<Snapshot> retainedSnapshots = new ArrayList<>(sortedSnapshots);

// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
Expand All @@ -163,7 +173,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// expire merge tree files and collect changed buckets
Predicate<ManifestEntry> skipper;
try {
skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, id);
skipper = snapshotDeletion.dataFileSkipper(retainedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
Expand Down Expand Up @@ -196,7 +206,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// delete manifests and indexFiles
List<Snapshot> skippingSnapshots =
TagManager.findOverlappedSnapshots(
taggedSnapshots, beginInclusiveId, endExclusiveId);
retainedSnapshots, beginInclusiveId, endExclusiveId);
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,18 @@ public List<TableBranch> branches() {
throw new RuntimeException(e);
}
}

/** Get all branch createdFromSnapshots. */
public List<Snapshot> branchSnapshots() {
ArrayList<Snapshot> snapshotList = new ArrayList<>();
branches()
.forEach(
b -> {
long createdFromSnapshot = b.getCreatedFromSnapshot();
if (snapshotManager.snapshotExists(createdFromSnapshot)) {
snapshotList.add(snapshotManager.snapshot(createdFromSnapshot));
}
});
return snapshotList;
}
}
26 changes: 13 additions & 13 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,31 +368,31 @@ private int findIndex(Snapshot taggedSnapshot, List<Snapshot> taggedSnapshots) {
}

public static List<Snapshot> findOverlappedSnapshots(
List<Snapshot> taggedSnapshots, long beginInclusive, long endExclusive) {
List<Snapshot> snapshots = new ArrayList<>();
int right = findPreviousTag(taggedSnapshots, endExclusive);
List<Snapshot> snapshots, long beginInclusive, long endExclusive) {
List<Snapshot> overlappedSnapshots = new ArrayList<>();
int right = findPreviousSnapshot(snapshots, endExclusive);
if (right >= 0) {
int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0);
int left = Math.max(findPreviousOrEqualSnapshot(snapshots, beginInclusive), 0);
for (int i = left; i <= right; i++) {
snapshots.add(taggedSnapshots.get(i));
overlappedSnapshots.add(snapshots.get(i));
}
}
return snapshots;
return overlappedSnapshots;
}

public static int findPreviousTag(List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
if (taggedSnapshots.get(i).id() < targetSnapshotId) {
public static int findPreviousSnapshot(List<Snapshot> snapshots, long targetSnapshotId) {
for (int i = snapshots.size() - 1; i >= 0; i--) {
if (snapshots.get(i).id() < targetSnapshotId) {
return i;
}
}
return -1;
}

private static int findPreviousOrEqualTag(
List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
private static int findPreviousOrEqualSnapshot(
List<Snapshot> snapshots, long targetSnapshotId) {
for (int i = snapshots.size() - 1; i >= 0; i--) {
if (snapshots.get(i).id() <= targetSnapshotId) {
return i;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long mi
return new ExpireSnapshotsImpl(
snapshotManager(),
newSnapshotDeletion(),
new TagManager(fileIO, options.path()))
new TagManager(fileIO, options.path()),
newBranchManager())
.config(
ExpireConfig.builder()
.snapshotRetainMax(numRetainedMax)
Expand All @@ -171,7 +172,8 @@ public ExpireSnapshots newExpire(ExpireConfig expireConfig) {
return new ExpireSnapshotsImpl(
snapshotManager(),
newSnapshotDeletion(),
new TagManager(fileIO, options.path()))
new TagManager(fileIO, options.path()),
newBranchManager())
.config(expireConfig);
}

Expand All @@ -180,7 +182,8 @@ public ExpireSnapshots newChangelogExpire(ExpireConfig config) {
new ExpireChangelogImpl(
snapshotManager(),
new TagManager(fileIO, options.path()),
newChangelogDeletion());
newChangelogDeletion(),
newBranchManager());
impl.config(config);
return impl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,11 @@ public void testExpireWithDeletingTags() throws Exception {
// action: expire snapshot 1 -> delete tag1 -> expire snapshot 2
// result: exist A & B (because of tag2)
ExpireSnapshots expireSnapshots =
new ExpireSnapshotsImpl(snapshotManager, store.newSnapshotDeletion(), tagManager);
new ExpireSnapshotsImpl(
snapshotManager,
store.newSnapshotDeletion(),
tagManager,
store.newBranchManager());
expireSnapshots
.config(
ExpireConfig.builder()
Expand Down
Loading

0 comments on commit 11fc94b

Please sign in to comment.