Skip to content

Commit

Permalink
Table Operations: Added streaming update operation
Browse files Browse the repository at this point in the history
This operation allows adding multiple consecutive update in a single commit without equality deletes from prior updates affecting inserts that occurred after it.

Before this commit you would  need to do something like this:
```
for batch in batches:
    delta = transaction.newRowDelta()
    delta.add(batch.deletes)
    delta.add(batch.inserts)
    delta.commit()
transaction.commit()
```
Which produces many manifest files and is very IO intensive.

This operation allows:
```
update = table.newStreamingUpdate()
for batch, batchIndex in enumerate(batches):
    update.addUpdate(batch.deleteFiles, batch.dataFiles, batchIndex)
update.commit()
```
  • Loading branch information
jasonf20 committed Dec 11, 2023
1 parent 01762b1 commit 95a9014
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 1 deletion.
99 changes: 99 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;

/**
* API for appending sequential updates to a table
*
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits
* that snapshot as the current.
*
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*/
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> {
/**
* Remove a data file from the current table state.
*
* <p>This rewrite operation may change the size or layout of the data files. When applicable, it
* is also recommended to discard already deleted records while rewriting data files. However, the
* set of live data records must never change.
*
* @param dataFile a rewritten data file
* @return this for method chaining
*/
default StreamingUpdate deleteFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteFile");
}

/**
* Remove a delete file from the table state.
*
* <p>This rewrite operation may change the size or layout of the delete files. When applicable,
* it is also recommended to discard delete records for files that are no longer part of the table
* state. However, the set of applicable delete records must never change.
*
* @param deleteFile a rewritten delete file
* @return this for method chaining
*/
default StreamingUpdate deleteFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteFile");
}

/**
* Add a new data file to a specific. All files in this batch will receive the same data sequence
* number.
*
* <p>This rewrite operation may change the size or layout of the data files. When applicable, it
* is also recommended to discard already deleted records while rewriting data files. However, the
* set of live data records must never change.
*
* @param dataFile a new data file
* @param batchOrdinal The batch ordinal to associate with this data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DataFile dataFile, int batchOrdinal) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file to a specific batch. All files in this batch will receive the same data
* sequence number.
*
* <p>This rewrite operation may change the size or layout of the delete files. When applicable,
* it is also recommended to discard delete records for files that are no longer part of the table
* state. However, the set of applicable delete records must never change.
*
* @param deleteFile a new delete file
* @param batchOrdinal The batch ordinal to associate with this data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DeleteFile deleteFile, int batchOrdinal) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException("Cannot rewrite in a " + descriptor + " table");
}

@Override
public StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException("Cannot update a " + descriptor + " table");
}

@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(
Expand Down
155 changes: 155 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.Comparator;
import java.util.List;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class BaseStreamingUpdate extends MergingSnapshotProducer<StreamingUpdate>
implements StreamingUpdate {
private final List<Batch> batches = Lists.newArrayList();

BaseStreamingUpdate(String tableName, TableOperations ops) {
super(tableName, ops);
}

@Override
protected BaseStreamingUpdate self() {
return this;
}

@Override
protected String operation() {
return DataOperations.OVERWRITE;
}

@Override
protected long nextSnapshotSequenceNumber(TableMetadata base) {
if (batches.isEmpty()) {
return super.nextSnapshotSequenceNumber(base);
}
// Each batch will advance the data sequence number by one, so we should advance the snapshot by
// the same amount.
// Otherwise, we will end up with data files with a sequence number larger than the snapshot
// sequence number.
// The validate method ensures we can use `.size()` here since it validates that there are no
// gaps and that we start at 0.
return super.nextSnapshotSequenceNumber(base) + batches.size() - 1;
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
long startingSequenceNumber = base.nextSequenceNumber();
batches.sort(Comparator.comparingInt(o -> o.ordinal));
for (Batch batch : batches) {
long dataSequenceNumber = startingSequenceNumber + batch.ordinal + 1;
batch.newDataFiles.forEach(f -> add(f, dataSequenceNumber));
batch.newDeleteFiles.forEach(f -> add(f, dataSequenceNumber));
}
return super.apply(base, snapshot);
}

@Override
public StreamingUpdate addFile(DataFile dataFile, int batchOrdinal) {
getBatch(batchOrdinal).add(dataFile);
return this;
}

@Override
public StreamingUpdate addFile(DeleteFile deleteFile, int batchOrdinal) {
getBatch(batchOrdinal).add(deleteFile);
return this;
}

private Batch getBatch(int batchOrdinal) {
Batch batch;
if (batches.size() - 1 < batchOrdinal) {
if (batchOrdinal > batches.size()) {
throw new IllegalArgumentException("batches must be added in order");
}
batch = new Batch(batchOrdinal);
batches.add(batch);
} else {
batch = batches.get(batchOrdinal);
}
return batch;
}

@Override
public BaseStreamingUpdate toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {
if (!batches.isEmpty()) {
int minOrdinal = batches.stream().mapToInt(Batch::getOrdinal).min().getAsInt();
ValidationException.check(
minOrdinal == 0, "Batches must start at ordinal 0. Current min ordinal: %d", minOrdinal);

int maxOrdinal = batches.stream().mapToInt(Batch::getOrdinal).max().getAsInt();

ValidationException.check(
maxOrdinal - minOrdinal == batches.size() - 1,
"Batches must be sequential with no gaps. Current min ordinal: %d current max ordinal: %d with %d batches",
minOrdinal,
maxOrdinal,
batches.size());
}
super.validate(currentMetadata, snapshot);
}

private static class Batch {
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();
private final int ordinal;

/**
* Creates a new set of updates to a specific batch
*
* @param ordinal the batch ordinal
*/
Batch(int ordinal) {
this.ordinal = ordinal;
}

public void add(DataFile dataFile) {
newDataFiles.add(dataFile);
}

public void add(DeleteFile deleteFile) {
newDeleteFiles.add(deleteFile);
}

public List<DataFile> getNewDataFiles() {
return newDataFiles;
}

public List<DeleteFile> getNewDeleteFiles() {
return newDeleteFiles;
}

public int getOrdinal() {
return ordinal;
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public RewriteFiles newRewrite() {
return new BaseRewriteFiles(name, ops).reportWith(reporter);
}

@Override
public StreamingUpdate newStreamingUpdate() {
return new BaseStreamingUpdate(name, ops).reportWith(reporter);
}

@Override
public RewriteManifests rewriteManifests() {
return new BaseRewriteManifests(ops).reportWith(reporter);
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ public RewriteFiles newRewrite() {
return rewrite;
}

@Override
public StreamingUpdate newStreamingUpdate() {
checkLastOperationCommitted("StreamingUpdate");
StreamingUpdate streamingUpdate =
new BaseStreamingUpdate(tableName, transactionOps).reportWith(reporter);
streamingUpdate.deleteWith(enqueueDelete);
updates.add(streamingUpdate);
return streamingUpdate;
}

@Override
public RewriteManifests rewriteManifests() {
checkLastOperationCommitted("RewriteManifests");
Expand Down Expand Up @@ -703,6 +713,11 @@ public RewriteFiles newRewrite() {
return BaseTransaction.this.newRewrite();
}

@Override
public StreamingUpdate newStreamingUpdate() {
return BaseTransaction.this.newStreamingUpdate();
}

@Override
public RewriteManifests rewriteManifests() {
return BaseTransaction.this.rewriteManifests();
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public RewriteFiles newRewrite() {
throw new UnsupportedOperationException(errorMsg("newRewrite"));
}

@Override
public StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException("newStreamingWrite");
}

@Override
public RewriteManifests rewriteManifests() {
throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,16 @@ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
*/
protected abstract List<ManifestFile> apply(TableMetadata metadataToUpdate, Snapshot snapshot);

protected long nextSnapshotSequenceNumber(TableMetadata metadataToUpdate) {
return metadataToUpdate.nextSequenceNumber();
}

@Override
public Snapshot apply() {
refresh();
Snapshot parentSnapshot = SnapshotUtil.latestSnapshot(base, targetBranch);

long sequenceNumber = base.nextSequenceNumber();
long sequenceNumber = nextSnapshotSequenceNumber(base);
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();

validate(base, parentSnapshot);
Expand Down

0 comments on commit 95a9014

Please sign in to comment.