diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 79f1e51ba214..d8894056a0ea 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -27,6 +27,9 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method ThisT org.apache.iceberg.SnapshotUpdate::scanManifestsWith(java.util.concurrent.ExecutorService)" justification: "Accept all changes prior to introducing API compatibility checks" + - code: "java.method.addedToInterface" + new: "method ThisT org.apache.iceberg.SnapshotUpdate::toBranch(java.lang.String)" + justification: "Adding toBranch API for supporting committing to a branch" - code: "java.method.addedToInterface" new: "method boolean org.apache.iceberg.expressions.BoundTerm::isEquivalentTo(org.apache.iceberg.expressions.BoundTerm)" justification: "new API method" diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index 2c5ab790083e..cc6b02dee474 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -59,4 +59,16 @@ public interface SnapshotUpdate extends PendingUpdate { * @return this for method chaining */ ThisT scanManifestsWith(ExecutorService executorService); + + /** + * Perform operations on a particular branch + * + * @param branch which is name of SnapshotRef of type branch. + */ + default ThisT toBranch(String branch) { + throw new UnsupportedOperationException( + String.format( + "Cannot commit to branch %s: %s does not support branch commits", + branch, this.getClass().getName())); + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index bbb51fdc7e3e..a073d79e5552 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -105,7 +105,7 @@ public OverwriteFiles validateNoConflictingDeletes() { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { if (validateAddedFilesMatchOverwriteFilter) { PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 2847f5ceca6b..dd44505e9d39 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -80,7 +80,7 @@ public ReplacePartitions validateNoConflictingData() { } @Override - public void validate(TableMetadata currentMetadata) { + public void validate(TableMetadata currentMetadata, Snapshot snapshot) { if (validateConflictingData) { if (dataSpec().isUnpartitioned()) { validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue()); @@ -101,14 +101,14 @@ public void validate(TableMetadata currentMetadata) { } @Override - public List apply(TableMetadata base) { + public List apply(TableMetadata base, Snapshot snapshot) { if (dataSpec().fields().size() <= 0) { // replace all data in an unpartitioned table deleteByRowFilter(Expressions.alwaysTrue()); } try { - return super.apply(base); + return super.apply(base, snapshot); } catch (ManifestFilterManager.DeleteException e) { throw new ValidationException( "Cannot commit file that conflicts with existing partition: %s", e.partition()); diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index 1bc846e27602..8a3b137b2d3d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -110,7 +110,7 @@ public RewriteFiles validateFromSnapshot(long snapshotId) { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { if (replacedDataFiles.size() > 0) { // if there are replaced data files, there cannot be any new row-level deletes for those data // files diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index c61b99dcfc65..816bc0c8a7ec 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -168,7 +168,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { } @Override - public List apply(TableMetadata base) { + public List apply(TableMetadata base, Snapshot snapshot) { List currentManifests = base.currentSnapshot().dataManifests(ops.io()); Set currentManifestSet = ImmutableSet.copyOf(currentManifests); diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 35a04ba39493..50a0e26ab368 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -96,7 +96,7 @@ public RowDelta validateNoConflictingDeleteFiles() { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { if (base.currentSnapshot() != null) { if (!referencedDataFiles.isEmpty()) { validateDataFilesExist( diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 77de54279768..3786b1185be6 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -152,7 +152,7 @@ public Object updateEvent() { } @Override - protected void validate(TableMetadata base) { + protected void validate(TableMetadata base, Snapshot snapshot) { // this is only called after apply() passes off to super, but check fast-forward status just in // case if (!isFastForward(base)) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index febdcee633e8..f3955e15f6ce 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -97,6 +97,12 @@ public FastAppend appendFile(DataFile file) { return this; } + @Override + public FastAppend toBranch(String branch) { + targetBranch(branch); + return this; + } + @Override public FastAppend appendManifest(ManifestFile manifest) { Preconditions.checkArgument( @@ -135,7 +141,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { } @Override - public List apply(TableMetadata base) { + public List apply(TableMetadata base, Snapshot snapshot) { List newManifests = Lists.newArrayList(); try { @@ -153,8 +159,8 @@ public List apply(TableMetadata base) { manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); Iterables.addAll(newManifests, appendManifestsWithMetadata); - if (base.currentSnapshot() != null) { - newManifests.addAll(base.currentSnapshot().allManifests(ops.io())); + if (snapshot != null) { + newManifests.addAll(snapshot.allManifests(ops.io())); } return newManifests; diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 789c6c23c32b..b82244f0714f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -758,13 +758,11 @@ protected Map summary() { } @Override - public List apply(TableMetadata base) { - Snapshot current = base.currentSnapshot(); - + public List apply(TableMetadata base, Snapshot snapshot) { // filter any existing manifests List filtered = filterManager.filterManifests( - base.schema(), current != null ? current.dataManifests(ops.io()) : null); + base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null); long minDataSequenceNumber = filtered.stream() .map(ManifestFile::minSequenceNumber) @@ -777,7 +775,7 @@ public List apply(TableMetadata base) { deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber); List filteredDeletes = deleteFilterManager.filterManifests( - base.schema(), current != null ? current.deleteManifests(ops.io()) : null); + base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null); // only keep manifests that have live data files or that were written by this commit Predicate shouldKeep = diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 520c70bcef1a..87aa4126ce8f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -84,6 +84,7 @@ public void accept(String file) { private Consumer deleteFunc = defaultDelete; private ExecutorService workerPool = ThreadPools.getWorkerPool(); + private String targetBranch = SnapshotRef.MAIN_BRANCH; protected SnapshotProducer(TableOperations ops) { this.ops = ops; @@ -113,6 +114,20 @@ public ThisT scanManifestsWith(ExecutorService executorService) { return self(); } + /** + * * A setter for the target branch on which snapshot producer operation should be performed + * + * @param branch to set as target branch + */ + protected void targetBranch(String branch) { + Preconditions.checkArgument(branch != null, "Invalid branch name: null"); + boolean refExists = base.ref(branch) != null; + Preconditions.checkArgument( + !refExists || base.ref(branch).isBranch(), + "%s is a tag, not a branch. Tags cannot be targets for producing snapshots"); + this.targetBranch = branch; + } + protected ExecutorService workerPool() { return this.workerPool; } @@ -150,28 +165,37 @@ public ThisT deleteWith(Consumer deleteCallback) { *

Child operations can override this to add custom validation. * * @param currentMetadata current table metadata to validate + * @param snapshot ending snapshot on the lineage which is being validated */ - protected void validate(TableMetadata currentMetadata) {} + protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {} /** - * Apply the update's changes to the base table metadata and return the new manifest list. + * Apply the update's changes to the given metadata and snapshot. Return the new manifest list. * * @param metadataToUpdate the base table metadata to apply changes to + * @param snapshot snapshot to apply the changes to * @return a manifest list for the new snapshot. */ - protected abstract List apply(TableMetadata metadataToUpdate); + protected abstract List apply(TableMetadata metadataToUpdate, Snapshot snapshot); @Override public Snapshot apply() { refresh(); - Long parentSnapshotId = - base.currentSnapshot() != null ? base.currentSnapshot().snapshotId() : null; - long sequenceNumber = base.nextSequenceNumber(); + Snapshot parentSnapshot = base.currentSnapshot(); + if (targetBranch != null) { + SnapshotRef branch = base.ref(targetBranch); + if (branch != null) { + parentSnapshot = base.snapshot(branch.snapshotId()); + } else if (base.currentSnapshot() != null) { + parentSnapshot = base.currentSnapshot(); + } + } - // run validations from the child operation - validate(base); + long sequenceNumber = base.nextSequenceNumber(); + Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId(); - List manifests = apply(base); + validate(base, parentSnapshot); + List manifests = apply(base, parentSnapshot); if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) { @@ -337,11 +361,11 @@ public void commit() { TableMetadata.Builder update = TableMetadata.buildFrom(base); if (base.snapshot(newSnapshot.snapshotId()) != null) { // this is a rollback operation - update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH); + update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch); } else if (stageOnly) { update.addSnapshot(newSnapshot); } else { - update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH); + update.setBranchSnapshot(newSnapshot, targetBranch); } TableMetadata updated = update.build(); diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index c04a20b98bd5..508c90255e72 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -487,4 +487,54 @@ public void testIncludedPartitionSummaryLimit() { table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); Assert.assertEquals("Should set changed partition count", "2", changedPartitions); } + + @Test + public void testAppendToExistingBranch() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit(); + int branchSnapshot = 2; + + Assert.assertEquals(table.currentSnapshot().snapshotId(), 1); + Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot); + } + + @Test + public void testAppendCreatesBranchIfNeeded() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit(); + int branchSnapshot = 2; + + Assert.assertEquals(table.currentSnapshot().snapshotId(), 1); + Assert.assertNotNull(table.ops().current().ref("branch")); + Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot); + } + + @Test + public void testAppendToBranchEmptyTable() { + table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit(); + int branchSnapshot = 1; + + Assert.assertNull(table.currentSnapshot()); + Assert.assertNotNull(table.ops().current().ref("branch")); + Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot); + } + + @Test + public void testAppendToNullBranchFails() { + AssertHelpers.assertThrows( + "Invalid branch", + IllegalArgumentException.class, + () -> table.newFastAppend().appendFile(FILE_A).toBranch(null)); + } + + @Test + public void testAppendToTagFails() { + table.newFastAppend().appendFile(FILE_A).commit(); + table.manageSnapshots().createTag("some-tag", table.currentSnapshot().snapshotId()).commit(); + AssertHelpers.assertThrows( + "Invalid branch", + IllegalArgumentException.class, + () -> table.newFastAppend().appendFile(FILE_A).toBranch("some-tag").commit()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java b/core/src/test/java/org/apache/iceberg/TestOverwrite.java index f7788fbe3221..082f6396bde7 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java @@ -300,4 +300,17 @@ public void testValidatedOverwriteWithAppendSuccess() { Assert.assertEquals( "Should not create a new snapshot", baseId, table.currentSnapshot().snapshotId()); } + + @Test + public void testOverwriteToBranchUnsupported() { + AssertHelpers.assertThrows( + "Cannot commit to branch someBranch: org.apache.iceberg.BaseOverwriteFiles does not support branch commits", + UnsupportedOperationException.class, + () -> + table + .newOverwrite() + .overwriteByRowFilter(and(equal("date", "2018-06-09"), lessThan("id", 20))) + .addFile(FILE_10_TO_14) + .toBranch("someBranch")); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java index 9c73d2d9576b..d5007bf6de06 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java +++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java @@ -742,4 +742,13 @@ public void testValidateOnlyDeletes() { public void testEmptyPartitionPathWithUnpartitionedTable() { DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath(""); } + + @Test + public void testReplacePartitionsOnBranchUnsupported() { + AssertHelpers.assertThrows( + "Should reject committing rewrite manifests to branch", + UnsupportedOperationException.class, + "Cannot commit to branch someBranch: org.apache.iceberg.BaseReplacePartitions does not support branch commits", + () -> table.newReplacePartitions().addFile(FILE_UNPARTITIONED_A).toBranch("someBranch")); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index 175c80c4d1e0..633b27241ded 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -1088,6 +1088,20 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists()); } + @Test + public void testRewriteManifestsOnBranchUnsupported() { + + table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + Assert.assertEquals(1, table.currentSnapshot().allManifests(table.io()).size()); + + AssertHelpers.assertThrows( + "Should reject committing rewrite manifests to branch", + UnsupportedOperationException.class, + "Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits", + () -> table.rewriteManifests().toBranch("someBranch").commit()); + } + private void validateSummary( Snapshot snapshot, int replaced, int kept, int created, int entryCount) { Map summary = snapshot.summary(); diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 6ebb92eb865a..e2929b470994 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -1412,4 +1412,20 @@ public void testRowDeltaCaseSensitivity() { .validateNoConflictingDeleteFiles() .commit()); } + + @Test + public void testRowDeltaToBranchUnsupported() { + AssertHelpers.assertThrows( + "Should reject committing row delta to branch", + UnsupportedOperationException.class, + "Cannot commit to branch someBranch: org.apache.iceberg.BaseRowDelta does not support branch commits", + () -> + table + .newRowDelta() + .caseSensitive(false) + .addRows(FILE_B) + .addDeletes(FILE_A2_DELETES) + .toBranch("someBranch") + .commit()); + } }