diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index c1742f82ca84..8df1de705620 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -60,4 +60,10 @@ public interface SnapshotUpdate extends PendingUpdate { * @return this for method chaining */ ThisT scanManifestsWith(ExecutorService executorService); + + /** + * Perform operations on a particular branch + * @param branch which is name of SanshotRef of type branch. + */ + ThisT toBranch(String branch); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index b5655fe1fb1e..d576603d0ec3 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -19,10 +19,12 @@ package org.apache.iceberg; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.SnapshotUtil; class BaseRowDelta extends MergingSnapshotProducer implements RowDelta { private Long startingSnapshotId = null; // check all versions by default @@ -96,20 +98,49 @@ public RowDelta validateNoConflictingDeleteFiles() { } @Override - protected void validate(TableMetadata base) { - if (base.currentSnapshot() != null) { - if (!referencedDataFiles.isEmpty()) { - validateDataFilesExist( - base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); - } + public RowDelta toBranch(String branch) { + Preconditions.checkArgument(branch != null, "branch cannot be null"); + if (this.current().ref(branch) == null) { + super.createNewRef(branch); + } - if (validateNewDataFiles) { - validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter); - } + Preconditions.checkArgument(this.current().ref(branch).type().equals(SnapshotRefType.BRANCH), + "%s is not a ref to type branch", branch); + setTargetBranch(branch); + return self(); + } - if (validateNewDeleteFiles) { - validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter); + private void checkIfSnapshotIsAnAncestor(Snapshot current, TableMetadata base) { + if (this.startingSnapshotId == null || current == null) { + return; + } + + for (Snapshot ancestor : SnapshotUtil.ancestorsOf(current.snapshotId(), base::snapshot)) { + if (ancestor.snapshotId() == this.startingSnapshotId) { + return; } + + } + throw new ValidationException("Snapshot %s is not an ancestor of branch %s", startingSnapshotId, targetBranch()); + } + + @Override + protected void validate(TableMetadata base) { + Snapshot current = base.ref(targetBranch()) != null ? + base.snapshot(base.ref(targetBranch()).snapshotId()) : base.currentSnapshot(); + + checkIfSnapshotIsAnAncestor(current, base); + if (!referencedDataFiles.isEmpty()) { + validateDataFilesExist( + base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); + } + + if (validateNewDataFiles) { + validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter); + } + + if (validateNewDeleteFiles) { + validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter); } } } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e184b50e8284..e95407c806ca 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -164,6 +164,13 @@ protected void failMissingDeletePaths() { deleteFilterManager.failMissingDeletePaths(); } + protected void createNewRef(String branch) { + SnapshotRef branchRef = SnapshotRef.branchBuilder(this.current().currentSnapshot().snapshotId()).build(); + TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(this.current()); + updatedBuilder.setRef(branch, branchRef); + ops.commit(ops.current(), updatedBuilder.build()); + } + /** * Add a filter to match files to delete. A file will be deleted if all of the rows it contains * match this or any other filter passed to this method. @@ -683,7 +690,8 @@ protected Map summary() { @Override public List apply(TableMetadata base) { - Snapshot current = base.currentSnapshot(); + Snapshot current = base.ref(targetBranch()) != null ? + base.snapshot(base.ref(targetBranch()).snapshotId()) : base.currentSnapshot(); // filter any existing manifests List filtered = filterManager.filterManifests( diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index eab82b48ce50..17d4d6277f72 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -89,6 +89,8 @@ public void accept(String file) { private ExecutorService workerPool = ThreadPools.getWorkerPool(); + private String targetBranch = SnapshotRef.MAIN_BRANCH; + protected SnapshotProducer(TableOperations ops) { this.ops = ops; this.base = ops.current(); @@ -104,6 +106,10 @@ protected SnapshotProducer(TableOperations ops) { protected abstract ThisT self(); + protected String targetBranch() { + return targetBranch; + } + @Override public ThisT stageOnly() { this.stageOnly = true; @@ -116,6 +122,15 @@ public ThisT scanManifestsWith(ExecutorService executorService) { return self(); } + @Override + public ThisT toBranch(String branch) { + throw new UnsupportedOperationException("Performing operations on a branch is is only supported for BaseRowDelta"); + } + + protected void setTargetBranch(String branch) { + this.targetBranch = branch; + } + protected ExecutorService workerPool() { return this.workerPool; } @@ -167,8 +182,8 @@ protected void validate(TableMetadata currentMetadata) { @Override public Snapshot apply() { refresh(); - Long parentSnapshotId = base.currentSnapshot() != null ? - base.currentSnapshot().snapshotId() : null; + Long parentSnapshotId = base.ref(targetBranch) != null ? base.ref(targetBranch).snapshotId() : null; + long sequenceNumber = base.nextSequenceNumber(); // run validations from the child operation @@ -298,11 +313,11 @@ public void commit() { TableMetadata.Builder update = TableMetadata.buildFrom(base); if (base.snapshot(newSnapshot.snapshotId()) != null) { // this is a rollback operation - update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH); + update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); } else if (stageOnly) { update.addSnapshot(newSnapshot); } else { - update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH); + update.setBranchSnapshot(newSnapshot, targetBranch); } TableMetadata updated = update.build(); diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 4e317b18891a..e892013d29b4 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -28,6 +28,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; @@ -1323,4 +1324,67 @@ public void testRowDeltaCaseSensitivity() { .validateNoConflictingDeleteFiles() .commit()); } + + @Test + public void testBranchValidationsNotValidAncestor() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + Long firstSnapshot = table.currentSnapshot().snapshotId(); + + table.manageSnapshots().createBranch("newBranch", firstSnapshot).commit(); + + table.newAppend() + .appendFile(FILE_B) + .commit(); + + // This commit will result in validation exception as we start validation from a snapshot which is + // not an ancestor of the branch + RowDelta rowDelta = table.newRowDelta() + .toBranch("newBranch") + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(table.currentSnapshot().snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDeleteFiles(); + + AssertHelpers.assertThrows("No matching ancestor found", ValidationException.class, () -> rowDelta.commit()); + } + + @Test + public void testBranchValidationsValidAncestor() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + Long firstSnapshot = table.currentSnapshot().snapshotId(); + + table.manageSnapshots().createBranch("newBranch", firstSnapshot).commit(); + + table.newAppend() + .appendFile(FILE_B) + .commit(); + + // This commit not result in validation exception as we start validation from a snapshot which is + // not an ancestor of the branch + table.newRowDelta() + .toBranch("newBranch") + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDeleteFiles().commit(); + + List dataManifests = table.ops().current().snapshot(table.ops().current() + .ref("newBranch").snapshotId()).dataManifests(table.io()); + Assert.assertEquals("branch should have 1 data manifest", 1, Iterables.size(dataManifests)); + List deleteManifests = table.ops().current().snapshot(table.ops().current() + .ref("newBranch").snapshotId()).deleteManifests(table.io()); + Assert.assertEquals("branch should have 1 delete manifest", 1, Iterables.size(deleteManifests)); + List mainBranchManifests = table.currentSnapshot().dataManifests(table.io()); + Assert.assertEquals("main branch should have 2 data manifest", 2, Iterables.size(mainBranchManifests)); + } }