Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)"
justification: "Accept all changes prior to introducing API compatibility checks"
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::toBranch(java.lang.String)"
justification: "Adding toBranch API for supporting committing to a branch"
- code: "java.method.addedToInterface"
new: "method boolean org.apache.iceberg.expressions.BoundTerm<T>::isEquivalentTo(org.apache.iceberg.expressions.BoundTerm<?>)"
justification: "new API method"
Expand Down
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,16 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
* @return this for method chaining
*/
ThisT scanManifestsWith(ExecutorService executorService);

/**
* Perform operations on a particular branch
*
* @param branch which is name of SnapshotRef of type branch.
*/
default ThisT toBranch(String branch) {
throw new UnsupportedOperationException(
String.format(
"Cannot commit to branch %s: %s does not support branch commits",
branch, this.getClass().getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public OverwriteFiles validateNoConflictingDeletes() {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ReplacePartitions validateNoConflictingData() {
}

@Override
public void validate(TableMetadata currentMetadata) {
public void validate(TableMetadata currentMetadata, Snapshot snapshot) {
if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(currentMetadata, startingSnapshotId, Expressions.alwaysTrue());
Expand All @@ -101,14 +101,14 @@ public void validate(TableMetadata currentMetadata) {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (dataSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}

try {
return super.apply(base);
return super.apply(base, snapshot);
} catch (ManifestFilterManager.DeleteException e) {
throw new ValidationException(
"Cannot commit file that conflicts with existing partition: %s", e.partition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public RewriteFiles validateFromSnapshot(long snapshotId) {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
if (replacedDataFiles.size() > 0) {
// if there are replaced data files, there cannot be any new row-level deletes for those data
// files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> currentManifests = base.currentSnapshot().dataManifests(ops.io());
Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public RowDelta validateNoConflictingDeleteFiles() {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
if (base.currentSnapshot() != null) {
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Object updateEvent() {
}

@Override
protected void validate(TableMetadata base) {
protected void validate(TableMetadata base, Snapshot snapshot) {
// this is only called after apply() passes off to super, but check fast-forward status just in
// case
if (!isFastForward(base)) {
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ public FastAppend appendFile(DataFile file) {
return this;
}

@Override
public FastAppend toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
public FastAppend appendManifest(ManifestFile manifest) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -135,7 +141,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> newManifests = Lists.newArrayList();

try {
Expand All @@ -153,8 +159,8 @@ public List<ManifestFile> apply(TableMetadata base) {
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
Iterables.addAll(newManifests, appendManifestsWithMetadata);

if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().allManifests(ops.io()));
if (snapshot != null) {
newManifests.addAll(snapshot.allManifests(ops.io()));
}

return newManifests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,13 +758,11 @@ protected Map<String, String> summary() {
}

@Override
public List<ManifestFile> apply(TableMetadata base) {
Snapshot current = base.currentSnapshot();

public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
// filter any existing manifests
List<ManifestFile> filtered =
filterManager.filterManifests(
base.schema(), current != null ? current.dataManifests(ops.io()) : null);
base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
Expand All @@ -777,7 +775,7 @@ public List<ManifestFile> apply(TableMetadata base) {
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
base.schema(), current != null ? current.deleteManifests(ops.io()) : null);
base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null);

// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
Expand Down
46 changes: 35 additions & 11 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void accept(String file) {
private Consumer<String> deleteFunc = defaultDelete;

private ExecutorService workerPool = ThreadPools.getWorkerPool();
private String targetBranch = SnapshotRef.MAIN_BRANCH;

protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -113,6 +114,20 @@ public ThisT scanManifestsWith(ExecutorService executorService) {
return self();
}

/**
* * A setter for the target branch on which snapshot producer operation should be performed
*
* @param branch to set as target branch
*/
protected void targetBranch(String branch) {
Preconditions.checkArgument(branch != null, "Invalid branch name: null");
boolean refExists = base.ref(branch) != null;
Preconditions.checkArgument(
!refExists || base.ref(branch).isBranch(),
"%s is a tag, not a branch. Tags cannot be targets for producing snapshots");
this.targetBranch = branch;
}

protected ExecutorService workerPool() {
return this.workerPool;
}
Expand Down Expand Up @@ -150,28 +165,37 @@ public ThisT deleteWith(Consumer<String> deleteCallback) {
* <p>Child operations can override this to add custom validation.
*
* @param currentMetadata current table metadata to validate
* @param snapshot ending snapshot on the lineage which is being validated
*/
protected void validate(TableMetadata currentMetadata) {}
protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}

/**
* Apply the update's changes to the base table metadata and return the new manifest list.
* Apply the update's changes to the given metadata and snapshot. Return the new manifest list.
*
* @param metadataToUpdate the base table metadata to apply changes to
* @param snapshot snapshot to apply the changes to
* @return a manifest list for the new snapshot.
*/
protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate);
protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate, Snapshot snapshot);

@Override
public Snapshot apply() {
refresh();
Long parentSnapshotId =
base.currentSnapshot() != null ? base.currentSnapshot().snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();
Snapshot parentSnapshot = base.currentSnapshot();
if (targetBranch != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also considering targetBranch will always be set (we have it default to Main, we don't need this check).

SnapshotRef branch = base.ref(targetBranch);
if (branch != null) {
parentSnapshot = base.snapshot(branch.snapshotId());
} else if (base.currentSnapshot() != null) {
parentSnapshot = base.currentSnapshot();
}
}

// run validations from the child operation
validate(base);
long sequenceNumber = base.nextSequenceNumber();
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();

List<ManifestFile> manifests = apply(base);
validate(base, parentSnapshot);
List<ManifestFile> manifests = apply(base, parentSnapshot);

if (base.formatVersion() > 1
|| base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
Expand Down Expand Up @@ -337,11 +361,11 @@ public void commit() {
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
update.setBranchSnapshot(newSnapshot.snapshotId(), targetBranch);
} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
update.setBranchSnapshot(newSnapshot, targetBranch);
}

TableMetadata updated = update.build();
Expand Down
50 changes: 50 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,54 @@ public void testIncludedPartitionSummaryLimit() {
table.currentSnapshot().summary().get(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP);
Assert.assertEquals("Should set changed partition count", "2", changedPartitions);
}

@Test
public void testAppendToExistingBranch() {
table.newFastAppend().appendFile(FILE_A).commit();
table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized I just hardcoded branch throughout the tests, probably cleaner to just define a variable in the tests and use that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay.

int branchSnapshot = 2;

Assert.assertEquals(table.currentSnapshot().snapshotId(), 1);
Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
}

@Test
public void testAppendCreatesBranchIfNeeded() {
table.newFastAppend().appendFile(FILE_A).commit();
table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
int branchSnapshot = 2;

Assert.assertEquals(table.currentSnapshot().snapshotId(), 1);
Assert.assertNotNull(table.ops().current().ref("branch"));
Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
}

@Test
public void testAppendToBranchEmptyTable() {
table.newFastAppend().appendFile(FILE_B).toBranch("branch").commit();
int branchSnapshot = 1;

Assert.assertNull(table.currentSnapshot());
Assert.assertNotNull(table.ops().current().ref("branch"));
Assert.assertEquals(table.ops().current().ref("branch").snapshotId(), branchSnapshot);
}

@Test
public void testAppendToNullBranchFails() {
AssertHelpers.assertThrows(
"Invalid branch",
IllegalArgumentException.class,
() -> table.newFastAppend().appendFile(FILE_A).toBranch(null));
}

@Test
public void testAppendToTagFails() {
table.newFastAppend().appendFile(FILE_A).commit();
table.manageSnapshots().createTag("some-tag", table.currentSnapshot().snapshotId()).commit();
AssertHelpers.assertThrows(
"Invalid branch",
IllegalArgumentException.class,
() -> table.newFastAppend().appendFile(FILE_A).toBranch("some-tag").commit());
}
}
13 changes: 13 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestOverwrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,17 @@ public void testValidatedOverwriteWithAppendSuccess() {
Assert.assertEquals(
"Should not create a new snapshot", baseId, table.currentSnapshot().snapshotId());
}

@Test
public void testOverwriteToBranchUnsupported() {
AssertHelpers.assertThrows(
"Cannot commit to branch someBranch: org.apache.iceberg.BaseOverwriteFiles does not support branch commits",
UnsupportedOperationException.class,
() ->
table
.newOverwrite()
.overwriteByRowFilter(and(equal("date", "2018-06-09"), lessThan("id", 20)))
.addFile(FILE_10_TO_14)
.toBranch("someBranch"));
Comment on lines +306 to +314
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should fix this assertion so it actually checks the expected error message; I think the assertThrows method with 3 params just surfaces this message in case the test fails, it doesn't actually check the expected error message. Like what's being done in TestReplacePartitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is minor because it is going to be replaced very soon.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -742,4 +742,13 @@ public void testValidateOnlyDeletes() {
public void testEmptyPartitionPathWithUnpartitionedTable() {
DataFiles.builder(PartitionSpec.unpartitioned()).withPartitionPath("");
}

@Test
public void testReplacePartitionsOnBranchUnsupported() {
AssertHelpers.assertThrows(
"Should reject committing rewrite manifests to branch",
UnsupportedOperationException.class,
"Cannot commit to branch someBranch: org.apache.iceberg.BaseReplacePartitions does not support branch commits",
() -> table.newReplacePartitions().addFile(FILE_UNPARTITIONED_A).toBranch("someBranch"));
}
}
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,20 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE
Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists());
}

@Test
public void testRewriteManifestsOnBranchUnsupported() {

table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

Assert.assertEquals(1, table.currentSnapshot().allManifests(table.io()).size());

AssertHelpers.assertThrows(
"Should reject committing rewrite manifests to branch",
UnsupportedOperationException.class,
"Cannot commit to branch someBranch: org.apache.iceberg.BaseRewriteManifests does not support branch commits",
() -> table.rewriteManifests().toBranch("someBranch").commit());
}

private void validateSummary(
Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
Map<String, String> summary = snapshot.summary();
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -1412,4 +1412,20 @@ public void testRowDeltaCaseSensitivity() {
.validateNoConflictingDeleteFiles()
.commit());
}

@Test
public void testRowDeltaToBranchUnsupported() {
AssertHelpers.assertThrows(
"Should reject committing row delta to branch",
UnsupportedOperationException.class,
"Cannot commit to branch someBranch: org.apache.iceberg.BaseRowDelta does not support branch commits",
() ->
table
.newRowDelta()
.caseSensitive(false)
.addRows(FILE_B)
.addDeletes(FILE_A2_DELETES)
.toBranch("someBranch")
.commit());
}
}