Skip to content

Commit

Permalink
Table Operations: Added streaming update operation
Browse files Browse the repository at this point in the history
This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it.

Before this commit you would  need to do something like this:
```
for batch in batches:
    delta = transaction.newRowDelta()
    delta.add(batch.deletes)
    delta.add(batch.inserts)
    delta.commit()
transaction.commit()
```
Which produces many manifest files and is very IO intensive.

This operation allows:
```
update = table.newStreamingUpdate()
for batch, batchIndex in enumerate(batches):
    update.addUpdate(batch.deleteFiles, batch.dataFiles, batchIndex)
update.commit()
```
  • Loading branch information
jasonf20 committed Dec 11, 2023
1 parent 01762b1 commit c00eeb8
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 2 deletions.
71 changes: 71 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;

/**
* API for appending sequential updates to a table
*
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits
* that snapshot as the current.
*
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*/
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> {

/**
* Start a new batch of changes. The changes in this batch will have a sequence number larger than
* the changes in the previous batches.
*
* @return this for method chaining
*/
default StreamingUpdate newBatch() {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement newBatch");
}

/**
* Add a new data file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param dataFile a new data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param deleteFile a new delete file
* @return this for method chaining
*/
default StreamingUpdate addFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table");
}

@Override
public StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException("Cannot update a " + descriptor + " table");
}

@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(
Expand Down
144 changes: 144 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class BaseStreamingUpdate extends MergingSnapshotProducer<StreamingUpdate>
implements StreamingUpdate {
private final List<Batch> batches = Lists.newArrayList();

private boolean requiresApply = true;

BaseStreamingUpdate(String tableName, TableOperations ops) {
super(tableName, ops);
}

@Override
protected BaseStreamingUpdate self() {
return this;
}

@Override
protected String operation() {
return DataOperations.OVERWRITE;
}

@Override
protected long nextSnapshotSequenceNumber(TableMetadata base) {
if (batches.isEmpty()) {
return super.nextSnapshotSequenceNumber(base);
}
// Each batch will advance the data sequence number by one, so we should advance the snapshot by
// the same amount.
// Otherwise, we will end up with data files with a sequence number larger than the snapshot
// sequence number.
return super.nextSnapshotSequenceNumber(base) + batches.size() - 1;
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (requiresApply && !batches.isEmpty()) {
long startingSequenceNumber = base.nextSequenceNumber();
int i = 0;
while (i < batches.size()) {
Batch batch = batches.get(i);
long dataSequenceNumber = startingSequenceNumber + i;
batch.getNewDataFiles().forEach(f -> add(f, dataSequenceNumber));
batch.getNewDeleteFiles().forEach(f -> add(f, dataSequenceNumber));
i += 1;
}
requiresApply = false;
}
return super.apply(base, snapshot);
}

@Override
public StreamingUpdate newBatch() {
if (batches.isEmpty() || !batches.get(batches.size() - 1).isEmpty()) {
// Only add a new batch if the there isn't one or there is one, and it's not empty
// Otherwise, we will have empty batches.
batches.add(new Batch());
}
return this;
}

@Override
public StreamingUpdate addFile(DataFile dataFile) {
getBatch().add(dataFile);
return this;
}

@Override
public StreamingUpdate addFile(DeleteFile deleteFile) {
getBatch().add(deleteFile);
return this;
}

private Batch getBatch() {
if (batches.isEmpty()) {
newBatch();
}
return batches.get(batches.size() - 1);
}

@Override
public BaseStreamingUpdate toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
// This is called when the commit fails and the caches are cleared, reset the state here so
// calling apply again will re-add the files
requiresApply = true;
super.cleanUncommitted(committed);
}

private static class Batch {
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();

/** Creates a new set of updates to a specific batch */
Batch() {}

void add(DataFile dataFile) {
newDataFiles.add(dataFile);
}

void add(DeleteFile deleteFile) {
newDeleteFiles.add(deleteFile);
}

List<DataFile> getNewDataFiles() {
return newDataFiles;
}

List<DeleteFile> getNewDeleteFiles() {
return newDeleteFiles;
}

boolean isEmpty() {
return newDataFiles.isEmpty() && newDeleteFiles.isEmpty();
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public RewriteFiles newRewrite() {
return new BaseRewriteFiles(name, ops).reportWith(reporter);
}

@Override
public StreamingUpdate newStreamingUpdate() {
return new BaseStreamingUpdate(name, ops).reportWith(reporter);
}

@Override
public RewriteManifests rewriteManifests() {
return new BaseRewriteManifests(ops).reportWith(reporter);
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ public RewriteFiles newRewrite() {
return rewrite;
}

@Override
public StreamingUpdate newStreamingUpdate() {
checkLastOperationCommitted("StreamingUpdate");
StreamingUpdate streamingUpdate =
new BaseStreamingUpdate(tableName, transactionOps).reportWith(reporter);
streamingUpdate.deleteWith(enqueueDelete);
updates.add(streamingUpdate);
return streamingUpdate;
}

@Override
public RewriteManifests rewriteManifests() {
checkLastOperationCommitted("RewriteManifests");
Expand Down Expand Up @@ -703,6 +713,11 @@ public RewriteFiles newRewrite() {
return BaseTransaction.this.newRewrite();
}

@Override
public StreamingUpdate newStreamingUpdate() {
return BaseTransaction.this.newStreamingUpdate();
}

@Override
public RewriteManifests rewriteManifests() {
return BaseTransaction.this.rewriteManifests();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException(errorMsg("newRewrite"));
}

@Override
public StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException("newStreamingWrite");
}

@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,16 @@ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
*/
protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate, Snapshot snapshot);

protected long nextSnapshotSequenceNumber(TableMetadata metadataToUpdate) {
return metadataToUpdate.nextSequenceNumber();
}

@Override
public Snapshot apply() {
refresh();
Snapshot parentSnapshot = SnapshotUtil.latestSnapshot(base, targetBranch);

long sequenceNumber = base.nextSequenceNumber();
long sequenceNumber = nextSnapshotSequenceNumber(base);
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();

validate(base, parentSnapshot);
Expand Down
17 changes: 16 additions & 1 deletion core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -670,14 +670,29 @@ static void validateManifestEntries(
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
Iterator<ManifestEntry.Status> expectedStatuses) {
validateManifestEntries(manifest, ids, expectedFiles, expectedStatuses, null);
}

static void validateManifestEntries(
ManifestFile manifest,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
Iterator<ManifestEntry.Status> expectedStatuses,
Iterator<Long> expectedSequenceNumbers) {
for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
DataFile file = entry.file();
DataFile expected = expectedFiles.next();
ContentFile<?> expected = expectedFiles.next();
final ManifestEntry.Status expectedStatus = expectedStatuses.next();
Assert.assertEquals(
"Path should match expected", expected.path().toString(), file.path().toString());
Assert.assertEquals("Snapshot ID should match expected ID", ids.next(), entry.snapshotId());
Assert.assertEquals("Entry status should match expected ID", expectedStatus, entry.status());
if (expectedSequenceNumbers != null) {
Assert.assertEquals(
"Entry status should match expected ID",
expectedSequenceNumbers.next(),
entry.dataSequenceNumber());
}
}

Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
Expand Down
Loading

0 comments on commit c00eeb8

Please sign in to comment.